Coverage for tests/routers/test_jobs_router.py: 99%
96 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
1import json
2import pytest
3from zenossapi.apiclient import ZenossAPIClientError
4from zenossapi.routers.jobs import JobsRouter, ZenossJob
5import jobs_resp
7pytest_plugins = "pytest-responses"
8url = 'https://zenoss/zport/dmd'
9headers = dict(
10 ContentType='application/json'
11)
14def request_callback(request):
15 rdata = json.loads(request.body)
16 resp_headers = dict(ContentType='application/json')
18 def getJobs(rdata):
19 return jobs_resp.get_jobs
21 def getInfo(rdata):
22 if rdata['jobid'] == "721739ae-2b1d-4613-91e9-681f134a2c49":
23 return jobs_resp.job1
24 elif rdata['jobid'] == "9ba5c8d7-58de-4f18-96fe-d362841910d3":
25 return jobs_resp.job2
26 else:
27 fail = jobs_resp.fail
28 fail['result']['msg'] = "NoSuchJobException: {0}".format(rdata['jobid'])
29 return fail
31 def detail(rdata):
32 if rdata['jobid'] == "721739ae-2b1d-4613-91e9-681f134a2c49":
33 return jobs_resp.nolog
34 elif rdata['jobid'] == "9ba5c8d7-58de-4f18-96fe-d362841910d3":
35 return jobs_resp.detail
37 def abort(rdata):
38 return jobs_resp.abort
40 def deleteJobs(rdata):
41 if rdata['jobids'] == ["721739ae-2b1d-4613-91e9-681f134a2c49"]:
42 return jobs_resp.delete
43 elif rdata['jobids'] == ["9ba5c8d7-58de-4f18-96fe-d362841910d3"]:
44 return jobs_resp.abort
45 else:
46 return jobs_resp.fail
48 method = locals()[rdata['method']]
49 resp_body = method(rdata['data'][0])
51 return (200, resp_headers, json.dumps(resp_body))
54def responses_callback(responses):
55 responses.add_callback(
56 responses.POST,
57 '{0}/jobs_router'.format(url),
58 callback=request_callback,
59 content_type='application/json',
60 )
63class TestJobsRouter(object):
65 def test_jobs_router_init(self):
66 jr = JobsRouter(url, headers, True)
67 assert jr.uuid is None
69 def test_jobs_router_list_jobs(self, responses):
70 responses_callback(responses)
72 jr = JobsRouter(url, headers, True)
73 resp = jr.list_jobs()
74 assert resp['total'] == 585
75 assert len(resp['jobs']) == 2
77 def test_jobs_router_get_jobs(self, responses):
78 responses_callback(responses)
80 jr = JobsRouter(url, headers, True)
81 resp = jr.get_jobs()
82 assert isinstance(resp[0], ZenossJob)
84 def test_jobs_router_get_job(self, responses):
85 responses_callback(responses)
87 jr = JobsRouter(url, headers, True)
88 resp = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49')
89 assert isinstance(resp, ZenossJob)
90 assert resp.uuid == "721739ae-2b1d-4613-91e9-681f134a2c49"
91 assert resp.status == "PENDING"
93 def test_jobs_router_get_job_not_found(self, responses):
94 responses_callback(responses)
96 jr = JobsRouter(url, headers, True)
97 with pytest.raises(ZenossAPIClientError, match="Request failed: NoSuchJobException: 0a39a730-9d41-48fb-878e-a5cbbbc1116a"):
98 resp = jr.get_job('0a39a730-9d41-48fb-878e-a5cbbbc1116a')
100 def test_jobs_router_get_log(self, responses):
101 responses_callback(responses)
103 jr = JobsRouter(url, headers, True)
104 job = jr.get_job('9ba5c8d7-58de-4f18-96fe-d362841910d3')
105 resp = job.get_log()
106 assert resp['logfile'] == "/opt/zenoss/log/jobs/9ba5c8d7-58de-4f18-96fe-d362841910d3.log"
107 assert len(resp['content']) == 29
109 def test_jobs_router_get_log_no_log(self, responses):
110 responses_callback(responses)
112 jr = JobsRouter(url, headers, True)
113 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49')
114 resp = job.get_log()
115 assert resp['logfile'] == "The log file for this job either does not exist or cannot be accessed."
117 def test_jobs_router_abort(self, responses):
118 responses_callback(responses)
120 jr = JobsRouter(url, headers, True)
121 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49')
122 resp = job.abort()
124 def test_jobs_router_delete(self, responses):
125 responses_callback(responses)
127 jr = JobsRouter(url, headers, True)
128 job = jr.get_job('721739ae-2b1d-4613-91e9-681f134a2c49')
129 resp = job.delete()
130 assert resp[0] == "721739ae-2b1d-4613-91e9-681f134a2c49"
132 def test_jobs_router_delete_already_deleted(self, responses):
133 responses_callback(responses)
135 jr = JobsRouter(url, headers, True)
136 job = jr.get_job('9ba5c8d7-58de-4f18-96fe-d362841910d3')
137 resp = job.delete()
138 assert resp == []