Coverage for tests/routers/test_jobs_router.py: 99%

96 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-25 05:47 +0000

1import json 

2import pytest 

3from zenossapi.apiclient import ZenossAPIClientError 

4from zenossapi.routers.jobs import JobsRouter, ZenossJob 

5import jobs_resp 

6 

7pytest_plugins = "pytest-responses" 

8url = 'https://zenoss/zport/dmd' 

9headers = dict( 

10 ContentType='application/json' 

11) 

12 

13 

14def request_callback(request): 

15 rdata = json.loads(request.body) 

16 resp_headers = dict(ContentType='application/json') 

17 

18 def getJobs(rdata): 

19 return jobs_resp.get_jobs 

20 

21 def getInfo(rdata): 

22 if rdata['jobid'] == "721739ae-2b1d-4613-91e9-681f134a2c49": 

23 return jobs_resp.job1 

24 elif rdata['jobid'] == "9ba5c8d7-58de-4f18-96fe-d362841910d3": 

25 return jobs_resp.job2 

26 else: 

27 fail = jobs_resp.fail 

28 fail['result']['msg'] = "NoSuchJobException: {0}".format(rdata['jobid']) 

29 return fail 

30 

31 def detail(rdata): 

32 if rdata['jobid'] == "721739ae-2b1d-4613-91e9-681f134a2c49": 

33 return jobs_resp.nolog 

34 elif rdata['jobid'] == "9ba5c8d7-58de-4f18-96fe-d362841910d3": 

35 return jobs_resp.detail 

36 

37 def abort(rdata): 

38 return jobs_resp.abort 

39 

40 def deleteJobs(rdata): 

41 if rdata['jobids'] == ["721739ae-2b1d-4613-91e9-681f134a2c49"]: 

42 return jobs_resp.delete 

43 elif rdata['jobids'] == ["9ba5c8d7-58de-4f18-96fe-d362841910d3"]: 

44 return jobs_resp.abort 

45 else: 

46 return jobs_resp.fail 

47 

48 method = locals()[rdata['method']] 

49 resp_body = method(rdata['data'][0]) 

50 

51 return (200, resp_headers, json.dumps(resp_body)) 

52 

53 

54def responses_callback(responses): 

55 responses.add_callback( 

56 responses.POST, 

57 '{0}/jobs_router'.format(url), 

58 callback=request_callback, 

59 content_type='application/json', 

60 ) 

61 

62 

63class TestJobsRouter(object): 

64 

65 def test_jobs_router_init(self): 

66 jr = JobsRouter(url, headers, True) 

67 assert jr.uuid is None 

68 

69 def test_jobs_router_list_jobs(self, responses): 

70 responses_callback(responses) 

71 

72 jr = JobsRouter(url, headers, True) 

73 resp = jr.list_jobs() 

74 assert resp['total'] == 585 

75 assert len(resp['jobs']) == 2 

76 

77 def test_jobs_router_get_jobs(self, responses): 

78 responses_callback(responses) 

79 

80 jr = JobsRouter(url, headers, True) 

81 resp = jr.get_jobs() 

82 assert isinstance(resp[0], ZenossJob) 

83 

84 def test_jobs_router_get_job(self, responses): 

85 responses_callback(responses) 

86 

87 jr = JobsRouter(url, headers, True) 

88 resp = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49') 

89 assert isinstance(resp, ZenossJob) 

90 assert resp.uuid == "721739ae-2b1d-4613-91e9-681f134a2c49" 

91 assert resp.status == "PENDING" 

92 

93 def test_jobs_router_get_job_not_found(self, responses): 

94 responses_callback(responses) 

95 

96 jr = JobsRouter(url, headers, True) 

97 with pytest.raises(ZenossAPIClientError, match="Request failed: NoSuchJobException: 0a39a730-9d41-48fb-878e-a5cbbbc1116a"): 

98 resp = jr.get_job('0a39a730-9d41-48fb-878e-a5cbbbc1116a') 

99 

100 def test_jobs_router_get_log(self, responses): 

101 responses_callback(responses) 

102 

103 jr = JobsRouter(url, headers, True) 

104 job = jr.get_job('9ba5c8d7-58de-4f18-96fe-d362841910d3') 

105 resp = job.get_log() 

106 assert resp['logfile'] == "/opt/zenoss/log/jobs/9ba5c8d7-58de-4f18-96fe-d362841910d3.log" 

107 assert len(resp['content']) == 29 

108 

109 def test_jobs_router_get_log_no_log(self, responses): 

110 responses_callback(responses) 

111 

112 jr = JobsRouter(url, headers, True) 

113 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49') 

114 resp = job.get_log() 

115 assert resp['logfile'] == "The log file for this job either does not exist or cannot be accessed." 

116 

117 def test_jobs_router_abort(self, responses): 

118 responses_callback(responses) 

119 

120 jr = JobsRouter(url, headers, True) 

121 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49') 

122 resp = job.abort() 

123 

124 def test_jobs_router_delete(self, responses): 

125 responses_callback(responses) 

126 

127 jr = JobsRouter(url, headers, True) 

128 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49') 

129 resp = job.delete() 

130 assert resp[0] == "721739ae-2b1d-4613-91e9-681f134a2c49" 

131 

132 def test_jobs_router_delete_already_deleted(self, responses): 

133 responses_callback(responses) 

134 

135 jr = JobsRouter(url, headers, True) 

136 job = jr.get_job('9ba5c8d7-58de-4f18-96fe-d362841910d3') 

137 resp = job.delete() 

138 assert resp == []