Coverage for tests/routers/test_devicemanagement_router.py: 100%
299 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
1import json
2import pytest
3from zenossapi.apiclient import ZenossAPIClientError
4from zenossapi.routers.devicemanagement import DeviceManagementRouter, ZenossMaintenanceWindow, ZenossUserCommand, ZenossDeviceManagementAdmin
5import devicemanagement_resp
8pytest_plugins = "pytest-responses"
9url = 'https://zenoss/zport/dmd'
10headers = dict(
11 ContentType='application/json'
12)
13resp_body_template = dict(
14 result=dict(
15 success=True,
16 data=[],
17 )
18)
21def request_callback(request):
22 rdata = json.loads(request.body)
23 resp_headers = dict(ContentType='application/json')
25 def getTimeZone(rdata):
26 return devicemanagement_resp.tz
28 def getMaintWindows(rdata):
29 return devicemanagement_resp.windows
31 def addMaintWindow(rdata):
32 return devicemanagement_resp.add_maint
34 def deleteMaintWindow(rdata):
35 return devicemanagement_resp.delete_maint
37 def editMaintWindow(rdata):
38 return devicemanagement_resp.update_maint
40 def getUserCommands(rdata):
41 if rdata['uid'] == "Devices/Server/TEST/devices/test.example.com":
42 return devicemanagement_resp.added_uc
43 elif rdata['uid'] == "Devices/Server/TEST/devices/test2.example.com":
44 return devicemanagement_resp.updated_uc
45 else:
46 return devicemanagement_resp.user_commands
48 def addUserCommand(rdata):
49 if rdata['password'] == "zenoss":
50 return devicemanagement_resp.add_user_command
51 else:
52 return devicemanagement_resp.add_uc_bad_password
54 def deleteUserCommand(rdata):
55 return devicemanagement_resp.delete_uc
57 def updateUserCommand(rdata):
58 if rdata['params']['password'] == "zenoss":
59 devicemanagement_resp.updated_uc['result']['data'][0]['description'] = rdata['params']['description']
60 devicemanagement_resp.updated_uc['result']['data'][0]['command'] = rdata['params']['command']
61 return devicemanagement_resp.update_uc
62 else:
63 return devicemanagement_resp.update_uc_bad_passwd
65 def getUserList(rdata):
66 return devicemanagement_resp.user_list
68 def getRolesList(rdata):
69 return devicemanagement_resp.roles_list
71 def getAdminRoles(rdata):
72 return devicemanagement_resp.admin_roles
74 def addAdminRole(rdata):
75 devicemanagement_resp.admin_roles['result']['data'].append(
76 {
77 "description": "",
78 "name": "lenny",
79 "id": "lenny",
80 "meta_type": "AdministrativeRole",
81 "role": "ZenUser",
82 "inspector_type": "AdministrativeRole",
83 "pager": "",
84 "email": "lenny@example.com",
85 "uid": "/zport/dmd/Devices/Server/TEST/devices/test.example.com/adminRoles/lenny"
86 }
87 )
88 return devicemanagement_resp.add_admin
90 def updateAdminRole(rdata):
91 devicemanagement_resp.admin_roles['result']['data'][1]['role'] = rdata['params']['role']
92 return devicemanagement_resp.update_admin
94 def removeAdmin(rdata):
95 devicemanagement_resp.admin_roles['result']['data'].pop(1)
96 return devicemanagement_resp.delete_admin
98 method = locals()[rdata['method']]
99 resp_body = method(rdata['data'][0])
101 return (200, resp_headers, json.dumps(resp_body))
104def responses_callback(responses):
105 responses.add_callback(
106 responses.POST,
107 '{0}/devicemanagement_router'.format(url),
108 callback=request_callback,
109 content_type='application/json',
110 )
113class TestDeviceManagementRouter(object):
115 def test_devicemanagement_router_init(self):
116 dmr = DeviceManagementRouter(url, headers, False)
117 assert dmr.uid is None
119 def test_devicemanagement_router_timezone(self, responses):
120 responses_callback(responses)
122 dmr = DeviceManagementRouter(url, headers, False)
123 resp = dmr.timezone()
124 assert resp == "UTC"
126 def test_devicemanagement_router_list_maintenance_windows(self, responses):
127 responses_callback(responses)
129 dmr = DeviceManagementRouter(url, headers, False)
130 resp = dmr.list_maintenance_windows('Server/TEST')
131 assert len(resp) == 3
132 assert resp[0]['name'] == "TestWindow"
133 assert resp[0]['uid'] == "Devices/Server/TEST/maintenanceWindows/TestWindow"
134 assert resp[1]['name'] == "TestWindowRepeat"
135 assert resp[1]['uid'] == "Devices/Server/TEST/maintenanceWindows/TestWindowRepeat"
137 def test_devicemanagement_router_get_maintenance_windows(self, responses):
138 responses_callback(responses)
140 dmr = DeviceManagementRouter(url, headers, False)
141 resp = dmr.get_maintenance_windows('Server/TEST')
142 assert len(resp) == 3
143 assert isinstance(resp[0], ZenossMaintenanceWindow)
144 assert isinstance(resp[1], ZenossMaintenanceWindow)
145 assert resp[0].name == "TestWindow"
146 assert resp[1].name == "TestWindowRepeat"
147 assert resp[0].repeat == "Never"
148 assert resp[1].repeat == "Monthly: day of week"
149 assert resp[0].parent == "Devices/Server/TEST"
150 assert resp[1].parent == "Devices/Server/TEST"
151 assert resp[0].startDate == "2017/11/17"
152 assert resp[0].startHours == "15"
153 assert resp[0].startMinutes == "00"
154 assert resp[0].durationDays is None
155 assert resp[0].durationHours == "01"
156 assert resp[0].durationMinutes == "00"
158 def test_devicemanagement_router_add_maintenance_window(self, responses):
159 responses_callback(responses)
161 dmr = DeviceManagementRouter(url, headers, False)
162 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00')
163 assert isinstance(resp, ZenossMaintenanceWindow)
164 assert resp.name == "TestAddWindow"
165 assert resp.parent == "Devices/Server/TEST"
167 def test_devicemanagement_router_add_maintenance_window_bad_repeat(self):
168 dmr = DeviceManagementRouter(url, headers, False)
169 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window repetition: Bogus"):
170 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', repeat='Bogus')
172 def test_devicemanagement_router_add_maintenance_window_bad_occurrence(self):
173 dmr = DeviceManagementRouter(url, headers, False)
174 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window occurrence: Bogus"):
175 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', occurrence='Bogus')
177 def test_devicemanagement_router_add_maintenance_window_bad_days(self):
178 dmr = DeviceManagementRouter(url, headers, False)
179 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window days: Bogus"):
180 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', days='Bogus')
182 def test_devicemanagement_router_list_user_commands(self, responses):
183 responses_callback(responses)
185 dmr = DeviceManagementRouter(url, headers, False)
186 resp = dmr.list_user_commands('Server/TEST')
187 assert len(resp) == 6
188 assert resp[0]['id'] == "DNS forward"
189 assert resp[1]['id'] == "DNS reverse"
190 assert resp[2]['id'] == "ping"
191 assert resp[3]['id'] == "snmpwalk"
192 assert resp[4]['id'] == "snmpwalk_v3"
193 assert resp[5]['id'] == "traceroute"
195 def test_devicemanagement_router_get_user_commands(self, responses):
196 responses_callback(responses)
198 dmr = DeviceManagementRouter(url, headers, False)
199 resp = dmr.get_user_commands('Server/TEST')
200 assert len(resp) == 6
201 assert resp[0].id == "DNS forward"
202 assert resp[0].parent == "Devices/Server/TEST"
203 assert resp[1].id == "DNS reverse"
204 assert resp[1].parent == "Devices/Server/TEST"
205 assert resp[2].id == "ping"
206 assert resp[2].parent == "Devices/Server/TEST"
207 assert resp[3].id == "snmpwalk"
208 assert resp[3].parent == "Devices/Server/TEST"
209 assert resp[4].id == "snmpwalk_v3"
210 assert resp[4].parent == "Devices/Server/TEST"
211 assert resp[5].id == "traceroute"
212 assert resp[5].parent == "Devices/Server/TEST"
214 def test_devicemanagement_router_get_user_command_by_id(self, responses):
215 responses_callback(responses)
217 dmr = DeviceManagementRouter(url, headers, False)
218 resp = dmr.get_user_command_by_id('Server/TEST', 'DNS forward')
219 assert isinstance(resp, ZenossUserCommand)
220 assert resp.id == "DNS forward"
221 assert resp.description == "Name to IP address lookup"
222 assert resp.meta_type == "UserCommand"
223 assert resp.name == "DNS forward"
224 assert resp.command == "host ${device/id}"
225 assert resp.uid == "userCommands/DNS forward"
226 assert resp.parent == "Devices/Server/TEST"
228 def test_devicemanagement_router_get_user_command_by_name(self, responses):
229 responses_callback(responses)
231 dmr = DeviceManagementRouter(url, headers, False)
232 resp = dmr.get_user_command_by_name('Server/TEST', 'ping')
233 assert isinstance(resp, ZenossUserCommand)
234 assert resp.id == "ping"
235 assert resp.description == "Is the device responding to ping?"
236 assert resp.meta_type == "UserCommand"
237 assert resp.name == "ping"
238 assert resp.command == "${device/pingCommand} -c2 ${device/manageIp}"
239 assert resp.uid == "userCommands/ping"
240 assert resp.parent == "Devices/Server/TEST"
242 def test_devicemanagement_router_add_user_command(self, responses):
243 responses_callback(responses)
245 dmr = DeviceManagementRouter(url, headers, False)
246 resp = dmr.add_user_command('Server/TEST/devices/test.example.com', 'uname_a', 'uname -a', 'uname -a', 'zenoss')
247 assert isinstance(resp, ZenossUserCommand)
248 assert resp.id == "uname_a"
249 assert resp.command == "uname -a"
251 def test_devicemanagement_router_add_user_command_bad_password(self, responses):
252 responses_callback(responses)
254 dmr = DeviceManagementRouter(url, headers, False)
255 with pytest.raises(ZenossAPIClientError, match="Request failed: Exception: Add new command failed. Incorrect or missing password."):
256 resp = dmr.add_user_command('Server/TEST/devices/test.example.com', 'uname_a', 'uname -a', 'uname -a', 'bogus')
258 def test_devicemanagement_router_list_admin_users(self, responses):
259 responses_callback(responses)
261 dmr = DeviceManagementRouter(url, headers, False)
262 resp = dmr.list_users('Server/TEST/devices/test.example.com')
263 assert len(resp) == 4
264 assert resp == ['admin', 'ccuser', 'lenny', 'zenoss_system']
266 def test_devicemanagement_router_list_available_roles(self, responses):
267 responses_callback(responses)
269 dmr = DeviceManagementRouter(url, headers, False)
270 resp = dmr.list_available_roles('Server/TEST/devices/test.example.com')
271 assert len(resp) == 4
272 assert resp == ['Manager', 'ZenManager', 'ZenOperator', 'ZenUser']
274 def test_devicemanagement_router_list_admin_roles(self, responses):
275 responses_callback(responses)
277 dmr = DeviceManagementRouter(url, headers, False)
278 resp = dmr.list_admin_roles('Server/TEST/devices/test.example.com')
279 assert len(resp) == 1
280 assert resp[0]['name'] == "admin"
282 def test_devicemanagement_router_get_admins(self, responses):
283 responses_callback(responses)
285 dmr = DeviceManagementRouter(url, headers, False)
286 resp = dmr.get_admins('Server/TEST/devices/test.example.com')
287 assert len(resp) == 1
288 assert isinstance(resp[0], ZenossDeviceManagementAdmin)
289 assert resp[0].uid == "Devices/Server/TEST/devices/test.example.com/adminRoles/admin"
291 def test_devicemanagement_router_list_admins_by_role(self, responses):
292 responses_callback(responses)
294 dmr = DeviceManagementRouter(url, headers, False)
295 resp = dmr.list_admins_by_role('Server/TEST/devices/test.example.com', 'ZenManager')
296 assert len(resp) == 1
298 def test_devicemanagement_router_get_admins_by_role(self, responses):
299 responses_callback(responses)
301 dmr = DeviceManagementRouter(url, headers, False)
302 resp = dmr.get_admins_by_role('Server/TEST/devices/test.example.com', 'ZenManager')
303 assert len(resp) == 1
304 assert isinstance(resp[0], ZenossDeviceManagementAdmin)
306 def test_devicemanagement_router_get_admin_by_name(self, responses):
307 responses_callback(responses)
309 dmr = DeviceManagementRouter(url, headers, False)
310 resp = dmr.get_admin_by_name('Server/TEST/devices/test.example.com', 'admin')
311 assert isinstance(resp, ZenossDeviceManagementAdmin)
312 assert resp.name == "admin"
314 def test_devicemanagement_router_get_admin_by_id(self, responses):
315 responses_callback(responses)
317 dmr = DeviceManagementRouter(url, headers, False)
318 resp = dmr.get_admin_by_id('Server/TEST/devices/test.example.com', 'admin')
319 assert isinstance(resp, ZenossDeviceManagementAdmin)
320 assert resp.name == "admin"
322 def test_devicemanagement_router_add_admin(self, responses):
323 responses_callback(responses)
325 dmr = DeviceManagementRouter(url, headers, False)
326 resp = dmr.add_admin('Server/TEST/devices/test.example.com', 'lenny', 'ZenUser')
327 assert isinstance(resp, ZenossDeviceManagementAdmin)
328 assert resp.name == "lenny"
329 assert resp.role == "ZenUser"
330 assert resp.uid == "Devices/Server/TEST/devices/test.example.com/adminRoles/lenny"
332 def test_devicemanagement_router_zenossmaintenancewindow_delete(self, responses):
333 responses_callback(responses)
335 dmr = DeviceManagementRouter(url, headers, False)
336 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow')
337 resp = mw.delete()
338 assert resp['data'] is None
340 def test_devicemanagement_router_zenossmaintenancewindow_update(self, responses):
341 responses_callback(responses)
343 dmr = DeviceManagementRouter(url, headers, False)
344 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow')
345 resp = mw.update(start_date='2017/11/21', start_hours='12', start_minutes='30', duration_minutes='30', occurrence='2nd', days='Monday')
346 assert resp
347 assert mw.duration == "01:30:00"
348 assert mw.start == 1511267400
349 assert mw.startTime == "2017/11/21 12:30:00.000 UTC"
351 def test_devicemanagement_router_zenossmaintenancewindow_enable(self, responses):
352 responses_callback(responses)
354 dmr = DeviceManagementRouter(url, headers, False)
355 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow')
356 resp = mw.enable()
357 assert resp
358 assert mw.enabled
360 def test_devicemanagement_router_zenossmaintenancewindow_disable(self, responses):
361 responses_callback(responses)
363 dmr = DeviceManagementRouter(url, headers, False)
364 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow')
365 mw.enabled = True
366 resp = mw.disable()
367 assert resp
368 assert mw.enabled is False
370 def test_devicemanagement_router_zenossusercommand_delete(self, responses):
371 responses_callback(responses)
373 dmr = DeviceManagementRouter(url, headers, False)
374 uc = dmr.get_user_command_by_name('Server/TEST/devices/test.example.com', 'uname_a')
375 resp = uc.delete()
376 assert resp['data'] is None
378 def test_devicemanagement_router_zenossusercommand_update(self, responses):
379 responses_callback(responses)
381 dmr = DeviceManagementRouter(url, headers, False)
382 uc = dmr.get_user_command_by_name('Server/TEST/devices/test2.example.com', 'uname_a')
383 resp = uc.update(description="Run uname -a command", password="zenoss")
384 assert resp
385 assert uc.description == "Run uname -a command"
387 def test_devicemanagement_router_zenossusercommand_update_bad_passwd(self, responses):
388 responses_callback(responses)
390 dmr = DeviceManagementRouter(url, headers, False)
391 uc = dmr.get_user_command_by_name('Server/TEST/devices/test2.example.com', 'uname_a')
392 with pytest.raises(ZenossAPIClientError, match="Request failed: Exception: Update failed. Incorrect or missing password."):
393 resp = uc.update(description="Run uname -a command", password="bogus")
395 def test_devicemanagement_router_zenossdevicemanagementadmin_update(self, responses):
396 responses_callback(responses)
398 dmr = DeviceManagementRouter(url, headers, False)
399 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny')
400 resp = au.update('ZenManager')
401 assert resp
402 assert au.role == "ZenManager"
404 def test_devicemanagement_router_zenossdevicemanagementadmin_delete(self, responses):
405 responses_callback(responses)
407 dmr = DeviceManagementRouter(url, headers, False)
408 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny')
409 resp = au.delete()
410 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny')
411 assert au is None