Coverage for tests/routers/test_devicemanagement_router.py: 100%

299 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-25 05:47 +0000

1import json 

2import pytest 

3from zenossapi.apiclient import ZenossAPIClientError 

4from zenossapi.routers.devicemanagement import DeviceManagementRouter, ZenossMaintenanceWindow, ZenossUserCommand, ZenossDeviceManagementAdmin 

5import devicemanagement_resp 

6 

7 

8pytest_plugins = "pytest-responses" 

9url = 'https://zenoss/zport/dmd' 

10headers = dict( 

11 ContentType='application/json' 

12) 

13resp_body_template = dict( 

14 result=dict( 

15 success=True, 

16 data=[], 

17 ) 

18) 

19 

20 

21def request_callback(request): 

22 rdata = json.loads(request.body) 

23 resp_headers = dict(ContentType='application/json') 

24 

25 def getTimeZone(rdata): 

26 return devicemanagement_resp.tz 

27 

28 def getMaintWindows(rdata): 

29 return devicemanagement_resp.windows 

30 

31 def addMaintWindow(rdata): 

32 return devicemanagement_resp.add_maint 

33 

34 def deleteMaintWindow(rdata): 

35 return devicemanagement_resp.delete_maint 

36 

37 def editMaintWindow(rdata): 

38 return devicemanagement_resp.update_maint 

39 

40 def getUserCommands(rdata): 

41 if rdata['uid'] == "Devices/Server/TEST/devices/test.example.com": 

42 return devicemanagement_resp.added_uc 

43 elif rdata['uid'] == "Devices/Server/TEST/devices/test2.example.com": 

44 return devicemanagement_resp.updated_uc 

45 else: 

46 return devicemanagement_resp.user_commands 

47 

48 def addUserCommand(rdata): 

49 if rdata['password'] == "zenoss": 

50 return devicemanagement_resp.add_user_command 

51 else: 

52 return devicemanagement_resp.add_uc_bad_password 

53 

54 def deleteUserCommand(rdata): 

55 return devicemanagement_resp.delete_uc 

56 

57 def updateUserCommand(rdata): 

58 if rdata['params']['password'] == "zenoss": 

59 devicemanagement_resp.updated_uc['result']['data'][0]['description'] = rdata['params']['description'] 

60 devicemanagement_resp.updated_uc['result']['data'][0]['command'] = rdata['params']['command'] 

61 return devicemanagement_resp.update_uc 

62 else: 

63 return devicemanagement_resp.update_uc_bad_passwd 

64 

65 def getUserList(rdata): 

66 return devicemanagement_resp.user_list 

67 

68 def getRolesList(rdata): 

69 return devicemanagement_resp.roles_list 

70 

71 def getAdminRoles(rdata): 

72 return devicemanagement_resp.admin_roles 

73 

74 def addAdminRole(rdata): 

75 devicemanagement_resp.admin_roles['result']['data'].append( 

76 { 

77 "description": "", 

78 "name": "lenny", 

79 "id": "lenny", 

80 "meta_type": "AdministrativeRole", 

81 "role": "ZenUser", 

82 "inspector_type": "AdministrativeRole", 

83 "pager": "", 

84 "email": "lenny@example.com", 

85 "uid": "/zport/dmd/Devices/Server/TEST/devices/test.example.com/adminRoles/lenny" 

86 } 

87 ) 

88 return devicemanagement_resp.add_admin 

89 

90 def updateAdminRole(rdata): 

91 devicemanagement_resp.admin_roles['result']['data'][1]['role'] = rdata['params']['role'] 

92 return devicemanagement_resp.update_admin 

93 

94 def removeAdmin(rdata): 

95 devicemanagement_resp.admin_roles['result']['data'].pop(1) 

96 return devicemanagement_resp.delete_admin 

97 

98 method = locals()[rdata['method']] 

99 resp_body = method(rdata['data'][0]) 

100 

101 return (200, resp_headers, json.dumps(resp_body)) 

102 

103 

104def responses_callback(responses): 

105 responses.add_callback( 

106 responses.POST, 

107 '{0}/devicemanagement_router'.format(url), 

108 callback=request_callback, 

109 content_type='application/json', 

110 ) 

111 

112 

113class TestDeviceManagementRouter(object): 

114 

115 def test_devicemanagement_router_init(self): 

116 dmr = DeviceManagementRouter(url, headers, False) 

117 assert dmr.uid is None 

118 

119 def test_devicemanagement_router_timezone(self, responses): 

120 responses_callback(responses) 

121 

122 dmr = DeviceManagementRouter(url, headers, False) 

123 resp = dmr.timezone() 

124 assert resp == "UTC" 

125 

126 def test_devicemanagement_router_list_maintenance_windows(self, responses): 

127 responses_callback(responses) 

128 

129 dmr = DeviceManagementRouter(url, headers, False) 

130 resp = dmr.list_maintenance_windows('Server/TEST') 

131 assert len(resp) == 3 

132 assert resp[0]['name'] == "TestWindow" 

133 assert resp[0]['uid'] == "Devices/Server/TEST/maintenanceWindows/TestWindow" 

134 assert resp[1]['name'] == "TestWindowRepeat" 

135 assert resp[1]['uid'] == "Devices/Server/TEST/maintenanceWindows/TestWindowRepeat" 

136 

137 def test_devicemanagement_router_get_maintenance_windows(self, responses): 

138 responses_callback(responses) 

139 

140 dmr = DeviceManagementRouter(url, headers, False) 

141 resp = dmr.get_maintenance_windows('Server/TEST') 

142 assert len(resp) == 3 

143 assert isinstance(resp[0], ZenossMaintenanceWindow) 

144 assert isinstance(resp[1], ZenossMaintenanceWindow) 

145 assert resp[0].name == "TestWindow" 

146 assert resp[1].name == "TestWindowRepeat" 

147 assert resp[0].repeat == "Never" 

148 assert resp[1].repeat == "Monthly: day of week" 

149 assert resp[0].parent == "Devices/Server/TEST" 

150 assert resp[1].parent == "Devices/Server/TEST" 

151 assert resp[0].startDate == "2017/11/17" 

152 assert resp[0].startHours == "15" 

153 assert resp[0].startMinutes == "00" 

154 assert resp[0].durationDays is None 

155 assert resp[0].durationHours == "01" 

156 assert resp[0].durationMinutes == "00" 

157 

158 def test_devicemanagement_router_add_maintenance_window(self, responses): 

159 responses_callback(responses) 

160 

161 dmr = DeviceManagementRouter(url, headers, False) 

162 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00') 

163 assert isinstance(resp, ZenossMaintenanceWindow) 

164 assert resp.name == "TestAddWindow" 

165 assert resp.parent == "Devices/Server/TEST" 

166 

167 def test_devicemanagement_router_add_maintenance_window_bad_repeat(self): 

168 dmr = DeviceManagementRouter(url, headers, False) 

169 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window repetition: Bogus"): 

170 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', repeat='Bogus') 

171 

172 def test_devicemanagement_router_add_maintenance_window_bad_occurrence(self): 

173 dmr = DeviceManagementRouter(url, headers, False) 

174 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window occurrence: Bogus"): 

175 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', occurrence='Bogus') 

176 

177 def test_devicemanagement_router_add_maintenance_window_bad_days(self): 

178 dmr = DeviceManagementRouter(url, headers, False) 

179 with pytest.raises(ZenossAPIClientError, match="Invalid maintenance window days: Bogus"): 

180 resp = dmr.add_maintenance_window('Server/TEST', 'TestAddWindow', '1511290393', '01:00:00', days='Bogus') 

181 

182 def test_devicemanagement_router_list_user_commands(self, responses): 

183 responses_callback(responses) 

184 

185 dmr = DeviceManagementRouter(url, headers, False) 

186 resp = dmr.list_user_commands('Server/TEST') 

187 assert len(resp) == 6 

188 assert resp[0]['id'] == "DNS forward" 

189 assert resp[1]['id'] == "DNS reverse" 

190 assert resp[2]['id'] == "ping" 

191 assert resp[3]['id'] == "snmpwalk" 

192 assert resp[4]['id'] == "snmpwalk_v3" 

193 assert resp[5]['id'] == "traceroute" 

194 

195 def test_devicemanagement_router_get_user_commands(self, responses): 

196 responses_callback(responses) 

197 

198 dmr = DeviceManagementRouter(url, headers, False) 

199 resp = dmr.get_user_commands('Server/TEST') 

200 assert len(resp) == 6 

201 assert resp[0].id == "DNS forward" 

202 assert resp[0].parent == "Devices/Server/TEST" 

203 assert resp[1].id == "DNS reverse" 

204 assert resp[1].parent == "Devices/Server/TEST" 

205 assert resp[2].id == "ping" 

206 assert resp[2].parent == "Devices/Server/TEST" 

207 assert resp[3].id == "snmpwalk" 

208 assert resp[3].parent == "Devices/Server/TEST" 

209 assert resp[4].id == "snmpwalk_v3" 

210 assert resp[4].parent == "Devices/Server/TEST" 

211 assert resp[5].id == "traceroute" 

212 assert resp[5].parent == "Devices/Server/TEST" 

213 

214 def test_devicemanagement_router_get_user_command_by_id(self, responses): 

215 responses_callback(responses) 

216 

217 dmr = DeviceManagementRouter(url, headers, False) 

218 resp = dmr.get_user_command_by_id('Server/TEST', 'DNS forward') 

219 assert isinstance(resp, ZenossUserCommand) 

220 assert resp.id == "DNS forward" 

221 assert resp.description == "Name to IP address lookup" 

222 assert resp.meta_type == "UserCommand" 

223 assert resp.name == "DNS forward" 

224 assert resp.command == "host ${device/id}" 

225 assert resp.uid == "userCommands/DNS forward" 

226 assert resp.parent == "Devices/Server/TEST" 

227 

228 def test_devicemanagement_router_get_user_command_by_name(self, responses): 

229 responses_callback(responses) 

230 

231 dmr = DeviceManagementRouter(url, headers, False) 

232 resp = dmr.get_user_command_by_name('Server/TEST', 'ping') 

233 assert isinstance(resp, ZenossUserCommand) 

234 assert resp.id == "ping" 

235 assert resp.description == "Is the device responding to ping?" 

236 assert resp.meta_type == "UserCommand" 

237 assert resp.name == "ping" 

238 assert resp.command == "${device/pingCommand} -c2 ${device/manageIp}" 

239 assert resp.uid == "userCommands/ping" 

240 assert resp.parent == "Devices/Server/TEST" 

241 

242 def test_devicemanagement_router_add_user_command(self, responses): 

243 responses_callback(responses) 

244 

245 dmr = DeviceManagementRouter(url, headers, False) 

246 resp = dmr.add_user_command('Server/TEST/devices/test.example.com', 'uname_a', 'uname -a', 'uname -a', 'zenoss') 

247 assert isinstance(resp, ZenossUserCommand) 

248 assert resp.id == "uname_a" 

249 assert resp.command == "uname -a" 

250 

251 def test_devicemanagement_router_add_user_command_bad_password(self, responses): 

252 responses_callback(responses) 

253 

254 dmr = DeviceManagementRouter(url, headers, False) 

255 with pytest.raises(ZenossAPIClientError, match="Request failed: Exception: Add new command failed. Incorrect or missing password."): 

256 resp = dmr.add_user_command('Server/TEST/devices/test.example.com', 'uname_a', 'uname -a', 'uname -a', 'bogus') 

257 

258 def test_devicemanagement_router_list_admin_users(self, responses): 

259 responses_callback(responses) 

260 

261 dmr = DeviceManagementRouter(url, headers, False) 

262 resp = dmr.list_users('Server/TEST/devices/test.example.com') 

263 assert len(resp) == 4 

264 assert resp == ['admin', 'ccuser', 'lenny', 'zenoss_system'] 

265 

266 def test_devicemanagement_router_list_available_roles(self, responses): 

267 responses_callback(responses) 

268 

269 dmr = DeviceManagementRouter(url, headers, False) 

270 resp = dmr.list_available_roles('Server/TEST/devices/test.example.com') 

271 assert len(resp) == 4 

272 assert resp == ['Manager', 'ZenManager', 'ZenOperator', 'ZenUser'] 

273 

274 def test_devicemanagement_router_list_admin_roles(self, responses): 

275 responses_callback(responses) 

276 

277 dmr = DeviceManagementRouter(url, headers, False) 

278 resp = dmr.list_admin_roles('Server/TEST/devices/test.example.com') 

279 assert len(resp) == 1 

280 assert resp[0]['name'] == "admin" 

281 

282 def test_devicemanagement_router_get_admins(self, responses): 

283 responses_callback(responses) 

284 

285 dmr = DeviceManagementRouter(url, headers, False) 

286 resp = dmr.get_admins('Server/TEST/devices/test.example.com') 

287 assert len(resp) == 1 

288 assert isinstance(resp[0], ZenossDeviceManagementAdmin) 

289 assert resp[0].uid == "Devices/Server/TEST/devices/test.example.com/adminRoles/admin" 

290 

291 def test_devicemanagement_router_list_admins_by_role(self, responses): 

292 responses_callback(responses) 

293 

294 dmr = DeviceManagementRouter(url, headers, False) 

295 resp = dmr.list_admins_by_role('Server/TEST/devices/test.example.com', 'ZenManager') 

296 assert len(resp) == 1 

297 

298 def test_devicemanagement_router_get_admins_by_role(self, responses): 

299 responses_callback(responses) 

300 

301 dmr = DeviceManagementRouter(url, headers, False) 

302 resp = dmr.get_admins_by_role('Server/TEST/devices/test.example.com', 'ZenManager') 

303 assert len(resp) == 1 

304 assert isinstance(resp[0], ZenossDeviceManagementAdmin) 

305 

306 def test_devicemanagement_router_get_admin_by_name(self, responses): 

307 responses_callback(responses) 

308 

309 dmr = DeviceManagementRouter(url, headers, False) 

310 resp = dmr.get_admin_by_name('Server/TEST/devices/test.example.com', 'admin') 

311 assert isinstance(resp, ZenossDeviceManagementAdmin) 

312 assert resp.name == "admin" 

313 

314 def test_devicemanagement_router_get_admin_by_id(self, responses): 

315 responses_callback(responses) 

316 

317 dmr = DeviceManagementRouter(url, headers, False) 

318 resp = dmr.get_admin_by_id('Server/TEST/devices/test.example.com', 'admin') 

319 assert isinstance(resp, ZenossDeviceManagementAdmin) 

320 assert resp.name == "admin" 

321 

322 def test_devicemanagement_router_add_admin(self, responses): 

323 responses_callback(responses) 

324 

325 dmr = DeviceManagementRouter(url, headers, False) 

326 resp = dmr.add_admin('Server/TEST/devices/test.example.com', 'lenny', 'ZenUser') 

327 assert isinstance(resp, ZenossDeviceManagementAdmin) 

328 assert resp.name == "lenny" 

329 assert resp.role == "ZenUser" 

330 assert resp.uid == "Devices/Server/TEST/devices/test.example.com/adminRoles/lenny" 

331 

332 def test_devicemanagement_router_zenossmaintenancewindow_delete(self, responses): 

333 responses_callback(responses) 

334 

335 dmr = DeviceManagementRouter(url, headers, False) 

336 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow') 

337 resp = mw.delete() 

338 assert resp['data'] is None 

339 

340 def test_devicemanagement_router_zenossmaintenancewindow_update(self, responses): 

341 responses_callback(responses) 

342 

343 dmr = DeviceManagementRouter(url, headers, False) 

344 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow') 

345 resp = mw.update(start_date='2017/11/21', start_hours='12', start_minutes='30', duration_minutes='30', occurrence='2nd', days='Monday') 

346 assert resp 

347 assert mw.duration == "01:30:00" 

348 assert mw.start == 1511267400 

349 assert mw.startTime == "2017/11/21 12:30:00.000 UTC" 

350 

351 def test_devicemanagement_router_zenossmaintenancewindow_enable(self, responses): 

352 responses_callback(responses) 

353 

354 dmr = DeviceManagementRouter(url, headers, False) 

355 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow') 

356 resp = mw.enable() 

357 assert resp 

358 assert mw.enabled 

359 

360 def test_devicemanagement_router_zenossmaintenancewindow_disable(self, responses): 

361 responses_callback(responses) 

362 

363 dmr = DeviceManagementRouter(url, headers, False) 

364 mw = dmr.get_maintenance_window('Server/TEST', 'TestAddWindow') 

365 mw.enabled = True 

366 resp = mw.disable() 

367 assert resp 

368 assert mw.enabled is False 

369 

370 def test_devicemanagement_router_zenossusercommand_delete(self, responses): 

371 responses_callback(responses) 

372 

373 dmr = DeviceManagementRouter(url, headers, False) 

374 uc = dmr.get_user_command_by_name('Server/TEST/devices/test.example.com', 'uname_a') 

375 resp = uc.delete() 

376 assert resp['data'] is None 

377 

378 def test_devicemanagement_router_zenossusercommand_update(self, responses): 

379 responses_callback(responses) 

380 

381 dmr = DeviceManagementRouter(url, headers, False) 

382 uc = dmr.get_user_command_by_name('Server/TEST/devices/test2.example.com', 'uname_a') 

383 resp = uc.update(description="Run uname -a command", password="zenoss") 

384 assert resp 

385 assert uc.description == "Run uname -a command" 

386 

387 def test_devicemanagement_router_zenossusercommand_update_bad_passwd(self, responses): 

388 responses_callback(responses) 

389 

390 dmr = DeviceManagementRouter(url, headers, False) 

391 uc = dmr.get_user_command_by_name('Server/TEST/devices/test2.example.com', 'uname_a') 

392 with pytest.raises(ZenossAPIClientError, match="Request failed: Exception: Update failed. Incorrect or missing password."): 

393 resp = uc.update(description="Run uname -a command", password="bogus") 

394 

395 def test_devicemanagement_router_zenossdevicemanagementadmin_update(self, responses): 

396 responses_callback(responses) 

397 

398 dmr = DeviceManagementRouter(url, headers, False) 

399 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny') 

400 resp = au.update('ZenManager') 

401 assert resp 

402 assert au.role == "ZenManager" 

403 

404 def test_devicemanagement_router_zenossdevicemanagementadmin_delete(self, responses): 

405 responses_callback(responses) 

406 

407 dmr = DeviceManagementRouter(url, headers, False) 

408 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny') 

409 resp = au.delete() 

410 au = dmr.get_admin_by_name('Devices/Server/TEST/devices/test.example.com', 'lenny') 

411 assert au is None