Coverage for src/zenossapi/routers/devicemanagement.py: 92%
225 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
1# -*- coding: utf-8 -*-
3"""
4Zenoss devicemanagement_router
5"""
7from datetime import datetime as dt
8from zenossapi.apiclient import ZenossAPIClientError
9from zenossapi.routers import ZenossRouter
11__router__ = 'DeviceManagementRouter'
14class DeviceManagementRouter(ZenossRouter):
15 """
16 Class for interacting with the Zenoss devicemanagement router
17 """
19 def __init__(self, url, headers, ssl_verify):
20 super(DeviceManagementRouter, self).__init__(url, headers, ssl_verify,
21 'devicemanagement_router',
22 'DeviceManagementRouter')
23 self.uid = None
25 def __repr__(self):
26 if self.uid:
27 identifier = self.uid
28 else:
29 identifier = hex(id(self))
31 return '<{0} object at {1}>'.format(
32 type(self).__name__, identifier
33 )
35 def timezone(self):
36 """
37 Returns the configured timezone.
39 Returns:
40 str:
41 """
42 tz_data = self._router_request(
43 self._make_request_data(
44 'getTimeZone',
45 data=dict()
46 )
47 )
49 return tz_data['data']
51 def list_maintenance_windows(self, uid):
52 """
53 Returns the list of maintenance windows configured for a device
54 or device class.
56 Arguments:
57 uid (str): The UID of the device or device class
59 Returns:
60 list(dict):
61 """
62 uid = self._check_uid(uid)
64 mw_data = self._router_request(
65 self._make_request_data(
66 'getMaintWindows',
67 data=dict(
68 uid=uid
69 )
70 )
71 )
73 for mw in mw_data['data']:
74 mw['uid'] = mw['uid'].replace('/zport/dmd/', '', 1)
76 return mw_data['data']
78 def get_maintenance_windows(self, uid):
79 """
80 Returns a list of ZenossMaintenanceWindow objects for the
81 maintenance windows configured for a device or device class
83 Arguments:
84 uid (str): The UID of the device or device class
86 Returns:
87 list(ZenossMaintenanceWindow):
88 """
89 mw_data = self.list_maintenance_windows(uid)
90 windows = []
91 for mw in mw_data:
92 windows.append(
93 ZenossMaintenanceWindow(
94 self.api_url,
95 self.api_headers,
96 self.ssl_verify,
97 mw,
98 parent=self._check_uid(uid),
99 )
100 )
102 return windows
104 def get_maintenance_window(self, uid, name):
105 """
106 Get a maintenance window object for the named window.
108 Arguments:
109 uid (str): The UID of the device or device class
110 name (str): Name of the maintenance window
112 Returns:
113 ZenossMaintenanceWindow:
114 """
115 mw_data = self.list_maintenance_windows(uid)
117 for mw in mw_data:
118 if mw['name'] == name:
119 return ZenossMaintenanceWindow(
120 self.api_url,
121 self.api_headers,
122 self.ssl_verify,
123 mw,
124 parent=self._check_uid(uid),
125 )
127 return None
129 def add_maintenance_window(self, uid, name, start, duration, enabled=False, start_state=300, repeat='Never', occurrence='1st', days='Sunday'):
130 """
131 Add a new maintenance window for device or device class.
133 Arguments:
134 uid (str): The UID of the device or device class
135 start (str): Window start time in UNIX epoch timestamp format,
136 e.g. "1511290393"
137 duration (str): Duration of the window in HH:MM:SS format
138 start_state (int): Production state for the maintenance window,
139 default is 300 (Maintenance)
140 repeat (str): Maintenance window repeat interval, default is
141 'Never'. Other valid choices are: 'Daily', 'Every Weekday',
142 'Weekly', 'Monthly: day of month', 'Monthly: day of week'
143 occurrence (str): For 'Monthly: day of week' repeats, options are
144 '1st', '2nd', '3rd', '4th', '5th', 'Last'
145 days (str): For 'Monthly: day of week' repeats, options are
146 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
147 'Saturday', 'Sunday'
149 Returns:
150 ZenossMaintenanceWindow:
151 """
152 if repeat not in ['Never', 'Daily', 'Every Weekday', 'Weekly',
153 'Monthly: day of month', 'Monthly: day of week']:
154 raise ZenossAPIClientError(
155 'Invalid maintenance window repetition: {0}'.format(repeat))
157 if occurrence not in ['1st', '2nd', '3rd', '4th', '5th', 'Last']:
158 raise ZenossAPIClientError(
159 'Invalid maintenance window occurrence: {0}'.format(occurrence))
161 if days not in ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
162 'Saturday', 'Sunday']:
163 raise ZenossAPIClientError(
164 'Invalid maintenance window days: {0}'.format(days))
166 self._router_request(
167 self._make_request_data(
168 'addMaintWindow',
169 data=dict(
170 params=dict(
171 uid=uid,
172 name=name,
173 start=start,
174 duration=duration,
175 enabled=enabled,
176 startState=start_state,
177 repeat=repeat,
178 occurrence=occurrence,
179 days=days,
180 )
181 )
182 )
183 )
185 return self.get_maintenance_window(uid, name)
187 def list_user_commands(self, uid):
188 """
189 Get the list of user commands configured for a device or device class.
191 Arguments:
192 uid (str): The UID of the device or device class
194 Returns:
195 list(dict):
196 """
197 uid = self._check_uid(uid)
199 uc_data = self._router_request(
200 self._make_request_data(
201 'getUserCommands',
202 data=dict(
203 uid=uid,
204 )
205 )
206 )
208 return uc_data['data']
210 def get_user_commands(self, uid):
211 """
212 Get a list of user commands objects configured for a device
213 or device class.
215 Arguments:
216 uid (str): The UID of the device or device class
218 Returns:
219 list(ZenossUserCommand):
220 """
221 uc_data = self.list_user_commands(uid)
222 user_commands = []
223 for uc in uc_data:
224 user_commands.append(
225 ZenossUserCommand(
226 self.api_url,
227 self.api_headers,
228 self.ssl_verify,
229 uc,
230 parent=self._check_uid(uid)
231 )
232 )
234 return user_commands
236 def get_user_command_by_id(self, uid, command_id):
237 """
238 Get a configured user command by its id
240 Arguments:
241 uid (str): The UID of the device or device class
242 command_id (str): The ID of the user command
243 """
244 uc_data = self.list_user_commands(uid)
245 for uc in uc_data:
246 if uc['id'] == command_id:
247 return ZenossUserCommand(
248 self.api_url,
249 self.api_headers,
250 self.ssl_verify,
251 uc,
252 parent=self._check_uid(uid)
253 )
255 return None
257 def get_user_command_by_name(self, uid, command_name):
258 """
259 Get a configured user command by its id
261 Arguments:
262 uid (str): The UID of the device or device class
263 command_name (str): The name of the user command
264 """
265 uc_data = self.list_user_commands(uid)
266 for uc in uc_data:
267 if uc['name'] == command_name:
268 return ZenossUserCommand(
269 self.api_url,
270 self.api_headers,
271 self.ssl_verify,
272 uc,
273 parent=self._check_uid(uid)
274 )
276 return None
278 def add_user_command(self, uid, name, description, command, password):
279 """
280 Add a new user command to a device or device class.
282 Arguments:
283 uid (str): The UID of the device or device class
284 name (str): Name for the new command
285 description (str): Description of the new command
286 command (str): Command line of the new command, can include TALES
287 expressions
288 password (str): Password of the user adding the command.
289 """
290 uid = self._check_uid(uid)
291 self._router_request(
292 self._make_request_data(
293 'addUserCommand',
294 data=dict(
295 uid=uid,
296 name=name,
297 description=description,
298 command=command,
299 password=password
300 )
301 )
302 )
304 return self.get_user_command_by_name(uid, name)
306 def list_users(self, uid):
307 """
308 List the users available to associate with a device or device class.
310 Arguments:
311 uid (str): the UID of the device or device class
313 Returns:
314 list(str):
315 """
316 uid = self._check_uid(uid)
317 user_data = self._router_request(
318 self._make_request_data(
319 'getUserList',
320 data=dict(
321 uid=uid
322 )
323 )
324 )
325 return user_data['data']
327 def list_available_roles(self, uid):
328 """
329 List the admin roles available to associate with a device or device class.
331 Arguments:
332 uid (str): The UID of the device or device class
334 Returns:
335 list(str):
336 """
337 uid = self._check_uid(uid)
338 role_data = self._router_request(
339 self._make_request_data(
340 'getRolesList',
341 data=dict(
342 uid=uid
343 )
344 )
345 )
347 return role_data['data']
349 def list_admin_roles(self, uid):
350 """
351 List the admin roles associated with a device or device class.
353 Arguments:
354 uid (str): The UID of the device or device class
356 Returns:
357 list(dict):
358 """
359 uid = self._check_uid(uid)
360 role_data = self._router_request(
361 self._make_request_data(
362 'getAdminRoles',
363 data=dict(
364 uid=uid
365 )
366 )
367 )
369 for r in role_data['data']:
370 r['uid'] = r['uid'].replace('/zport/dmd/', '', 1)
372 return role_data['data']
374 def get_admins(self, uid):
375 """
376 Get ZenossDeviceManagementAdmin objects for the configured admin
377 users for a device or device class.
379 Arguments:
380 uid (str): The UID of the device or device class
382 Returns:
383 list(ZenossDeviceManagementAdmin):
384 """
385 admin_data = self.list_admin_roles(uid)
386 admins = []
387 for admin in admin_data:
388 admins.append(
389 ZenossDeviceManagementAdmin(
390 self.api_url,
391 self.api_headers,
392 self.ssl_verify,
393 admin
394 )
395 )
397 return admins
399 def list_admins_by_role(self, uid, role):
400 """
401 List configured admin users for a device or device class by role.
403 Arguments:
404 uid (str): The UID of the device or device class
405 role (str): The role to filter on
407 Returns:
408 list(dict):
409 """
410 admin_data = self.list_admin_roles(uid)
411 admins = []
412 for admin in admin_data:
413 if admin['role'] == role:
414 admins.append(admin)
416 return admins
418 def get_admins_by_role(self, uid, role):
419 """
420 Get ZenossDeviceManagementAdmin objects for the configured admin users
421 of a device or device class by role.
423 Arguments:
424 uid (str): The UID of the device or device class
425 role (str): The role to filter on
427 Returns:
428 list(ZenossDeviceManagementAdmin):
429 """
430 admin_data = self.list_admin_roles(uid)
431 admins = []
432 for admin in admin_data:
433 if admin['role'] == role:
434 admins.append(
435 ZenossDeviceManagementAdmin(
436 self.api_url,
437 self.api_headers,
438 self.ssl_verify,
439 admin
440 )
441 )
443 return admins
445 def get_admin_by_name(self, uid, name):
446 """
447 Get an admin user for a device or device class by name.
449 Arguments:
450 uid (str): The UID of the device or device class
451 name (str): The name of the admin user
453 Returns:
454 ZenossDeviceManagementAdmin:
455 """
456 admin_data = self.list_admin_roles(uid)
457 for admin in admin_data:
458 if admin['name'] == name:
459 return ZenossDeviceManagementAdmin(
460 self.api_url,
461 self.api_headers,
462 self.ssl_verify,
463 admin
464 )
466 return None
468 def get_admin_by_id(self, uid, admin_id):
469 """
470 Get and admin user for a device or device class by id.
472 Arguments:
473 uid (str): The UID of the device or device class
474 admin_id (str): The ID of the admin user
476 Returns:
477 ZenossDeviceManagementAdmin:
478 """
479 admin_data = self.list_admin_roles(uid)
480 for admin in admin_data:
481 if admin['id'] == admin_id:
482 return ZenossDeviceManagementAdmin(
483 self.api_url,
484 self.api_headers,
485 self.ssl_verify,
486 admin
487 )
489 return None
491 def add_admin(self, uid, name, role=None):
492 """
493 Add an admin user to a device or device class.
495 Arguments:
496 uid (str): The UID of the device or device class
497 name (str): The name of the user to add
498 role (str): The role to associate with the user for this
499 device or device class
501 Returns:
502 ZenossDeviceManagementAdmin:
503 """
504 uid = self._check_uid(uid)
505 self._router_request(
506 self._make_request_data(
507 'addAdminRole',
508 data=dict(
509 params=dict(
510 uid=uid,
511 name=name,
512 role=role,
513 )
514 )
515 )
516 )
518 return self.get_admin_by_name(uid, name)
521class ZenossMaintenanceWindow(DeviceManagementRouter):
522 """
523 Class for Zenoss maintenance window objects
524 """
526 def __init__(self, url, headers, ssl_verify, window_data, parent=None):
527 super(ZenossMaintenanceWindow, self).__init__(url, headers, ssl_verify)
529 self.uid = window_data['uid'].replace('/zport/dmd/', '', 1)
530 self.id = window_data['id']
531 self.name = window_data['name']
532 self.description = window_data['description']
533 self.meta_type = window_data['meta_type']
534 self.inspector_type = window_data['inspector_type']
535 self.startState = window_data['startState']
536 self.startProdState = window_data['startProdState']
537 self.enabled = window_data['enabled']
538 self.started = window_data['started']
539 self.start = window_data['start']
540 self.startTime = window_data['startTime']
541 self.duration = window_data['duration']
542 self.skip = window_data['skip']
543 self.repeat = window_data['repeat']
544 self.niceRepeat = window_data['niceRepeat']
545 self.occurrence = window_data['occurrence']
546 self.days = window_data['days']
547 self.parent = parent
549 self.startDate, start_ts, tz = self.startTime.split()
550 self.startHours, self.startMinutes = start_ts.split(':')[:2]
551 duration_parts = self.duration.split()
552 if len(duration_parts) > 1:
553 self.durationDays = duration_parts[0]
554 else:
555 self.durationDays = None
556 self.durationHours, self.durationMinutes = duration_parts[-1].split(':')[:2]
558 def _build_update_params(self, params):
559 """
560 Build the params dict for updating a maintenance window
562 Arguments:
563 params (dict): The update parameters as a dict
565 Returns:
566 dict:
567 """
569 def delete(self):
570 """
571 Delete a maintenance window from a device or device class.
573 Returns:
574 dict:
575 """
576 return self._router_request(
577 self._make_request_data(
578 'deleteMaintWindow',
579 data=dict(
580 uid=self.parent,
581 id=self.id,
582 )
583 )
584 )
586 def update(self, start_timestamp=None, start_datetime=None, start_date=None,
587 start_hours=None, start_minutes=None, duration_days=None,
588 duration_time=None, duration_hours=None, duration_minutes=None,
589 production_state=None, enabled=None, repeat=None,
590 occurrence=None, days=None):
591 """
592 Update the settings for a maintenance window, with flexible options for
593 specifying the start date/time and duration.
595 Arguments:
596 start_timestamp (float): Start date and time in UNIX timestamp format
597 start_datetime (datetime): Start date and time as a datetime.datetime object
598 start_date (str): Start date as a string
599 start_hours (str): Start hours as a string
600 start_minutes (str): Start minutes as a string
601 duration_days (str): Duration days
602 duration_time (str): Duration time in "HH:MM" format
603 duration_hours (str): Duration hours
604 duration_minutes (str): Duration minutes
605 production_state (int): Production state for the window
606 enabled (bool): Enabled state of the window
607 occurrence (str): Repeat occurrence
608 days (str): Repeat days
610 Returns:
611 bool:
612 """
613 new_params = dict()
614 if start_timestamp:
615 new_start = dt.fromtimestamp(start_timestamp)
616 new_params['startDate'] = str(new_start.date())
617 new_params['startHours'] = str(new_start.hour)
618 new_params['startMinutes'] = str(new_start.minute)
619 elif start_datetime:
620 new_params['startDate'] = str(start_datetime.date())
621 new_params['startHours'] = str(start_datetime.hour)
622 new_params['startMinutes'] = str(start_datetime.minute)
623 if start_date:
624 new_params['startDate'] = start_date
625 if start_hours:
626 new_params['startHours'] = start_hours
627 if start_minutes:
628 new_params['startMinutes'] = start_minutes
630 new_params['durationDays'] = duration_days
631 if duration_time:
632 new_params['durationHours'], new_params['durationMinutes'] = duration_time.split(':')[:2]
633 if duration_hours:
634 new_params['durationHours'] = duration_hours
635 if duration_minutes:
636 new_params['durationMinutes'] = duration_minutes
638 update_resp = self._router_request(
639 self._make_request_data(
640 'editMaintWindow',
641 data=dict(
642 params=dict(
643 uid=self.parent,
644 id=self.id,
645 startDate=new_params.get('startDate', self.startDate),
646 startHours=new_params.get('startHours', self.startHours),
647 startMinutes=new_params.get('startMinutes', self.startMinutes),
648 durationDays=new_params.get('durationDays', self.durationDays),
649 durationHours=new_params.get('durationHours', self.durationHours),
650 durationMinutes=new_params.get('durationMinutes', self.durationMinutes),
651 startProductionState=production_state if production_state else self.startProdState,
652 repeat=repeat if repeat else self.repeat,
653 enabled=enabled if enabled else self.enabled,
654 occurrence=occurrence if occurrence else self.occurrence,
655 days=days if days else self.days
656 )
657 )
658 )
659 )
661 self.__init__(self.api_url, self.api_headers, self.ssl_verify, update_resp['data'][0], parent=self.parent)
663 return True
665 def enable(self):
666 """
667 Set maintenance window to enabled.
669 Returns:
670 bool:
671 """
672 if not self.enabled:
673 self._router_request(
674 self._make_request_data(
675 'editMaintWindow',
676 data=dict(
677 params=dict(
678 uid=self.parent,
679 id=self.id,
680 startDate=self.startDate,
681 startHours=self.startHours,
682 startMinutes=self.startMinutes,
683 durationDays=self.durationDays,
684 durationHours=self.durationHours,
685 startProductionState=self.startProdState,
686 repeat=self.repeat,
687 enabled=True,
688 occurrence=self.occurrence,
689 days=self.days,
690 )
691 )
692 )
693 )
694 self.enabled = True
696 return True
698 def disable(self):
699 """
700 Set maintenance window to disabled.
702 Returns:
703 bool:
704 """
705 if self.enabled:
706 self._router_request(
707 self._make_request_data(
708 'editMaintWindow',
709 data=dict(
710 uid=self.parent,
711 id=self.id,
712 params=dict(
713 startDate=self.startDate,
714 startHours=self.startHours,
715 startMinutes=self.startMinutes,
716 durationDays=self.durationDays,
717 durationHours=self.durationHours,
718 startProductionState=self.startProdState,
719 repeat=self.repeat,
720 enabled=False,
721 occurrence=self.occurrence,
722 days=self.days,
723 )
724 )
725 )
726 )
727 self.enabled = False
729 return True
732class ZenossUserCommand(DeviceManagementRouter):
733 """
734 Class for Zenoss user command objects
735 """
737 def __init__(self, url, headers, ssl_verify, command_data, parent=None):
738 super(ZenossUserCommand, self).__init__(url, headers, ssl_verify)
740 self.description = command_data['description']
741 self.name = command_data['name']
742 self.meta_type = command_data['meta_type']
743 self.command = command_data['command']
744 self.inspector_type = command_data['inspector_type']
745 self.id = command_data['id']
746 self.uid = command_data['uid'].replace('/zport/dmd/', '', 1)
747 self.parent = parent
749 def delete(self):
750 """
751 Delete a user command for a device or device class.
753 Returns:
754 dict:
755 """
756 return self._router_request(
757 self._make_request_data(
758 'deleteUserCommand',
759 data=dict(
760 uid=self.parent,
761 id=self.id
762 )
763 )
764 )
766 def update(self, description=None, command=None, password=None):
767 """
768 Update a user command.
770 Arguments:
771 description (str): Description of the user command
772 command (str): Command line of the command
773 password (str): Password of the user updating the command.
774 """
775 self._router_request(
776 self._make_request_data(
777 'updateUserCommand',
778 data=dict(
779 params=dict(
780 uid=self.parent,
781 id=self.id,
782 description=description if description else self.description,
783 command=command if command else self.command,
784 password=password,
785 )
786 )
787 )
788 )
790 uc_data = self.list_user_commands(self.parent)
791 for uc in uc_data:
792 if uc['id'] == self.id:
793 self.__init__(self.api_url, self.api_headers, self.ssl_verify, uc, parent=self.parent)
795 return True
798class ZenossDeviceManagementAdmin(DeviceManagementRouter):
799 """
800 Class for Zenoss user command objects
801 """
803 def __init__(self, url, headers, ssl_verify, admin_data):
804 super(ZenossDeviceManagementAdmin, self).__init__(url, headers, ssl_verify)
806 self.uid = admin_data['uid'].replace('/zport/dmd/', '', 1)
807 self.name = admin_data['name']
808 self.id = admin_data['id']
809 self.meta_type = admin_data['meta_type']
810 self.role = admin_data['role']
811 self.inspector_type = admin_data['inspector_type']
812 self.pager = admin_data['pager']
813 self.email=admin_data['email']
815 def update(self, role):
816 """
817 Update the admin user's role.
819 Arguments:
820 role (str): New role for the user
822 Returns:
823 bool:
824 """
825 self._router_request(
826 self._make_request_data(
827 'updateAdminRole',
828 data=dict(
829 params=dict(
830 uid=self.uid,
831 name=self.name,
832 role=role
833 )
834 )
835 )
836 )
838 self.role = role
840 return True
842 def delete(self):
843 """
844 Delete an admin user from a device or device class.
846 :return:
847 """
848 return self._router_request(
849 self._make_request_data(
850 'removeAdmin',
851 data=dict(
852 uid=self.uid,
853 id=self.id,
854 )
855 )
856 )