Coverage for src/zenossapi/apiclient.py: 72%
72 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-25 05:47 +0000
1# -*- coding: utf-8 -*-
3"""
4Zenoss API Client Class
5"""
7import inspect
8import os
9import urllib3
11class ZenossAPIClientError(Exception):
12 pass
15class ZenossAPIClientAuthenticationError(Exception):
16 pass
19def strtobool(_string):
20 """Converts a string to a boolean, replaces distutils.utils.strtobool (deprecated per PEP 632)"""
21 if _string in ['y', 'yes', 't', 'true', 'on', '1', 1]:
22 return True
23 elif _string in ['n', 'no', 'f', 'false', 'off', '0', 0]:
24 return False
25 else:
26 raise ZenossAPIClientError(f"Invalid boolean value: {_string}")
29class Client(object):
30 """Client class to access the Zenoss JSON API"""
32 def __init__(self, host=None, user=None, password=None, ssl_verify=None, collection_zone=None, api_key=None):
33 """
34 Create the client object to communicate with Zenoss. The authentication
35 and ssl parameters can be pulled from the environment so that they
36 don't have to be passed in from code or command line args.
38 Arguments:
39 host (str): FQDN used to access the Zenoss server
40 user (str): Zenoss username
41 password (str): Zenoss user's password
42 ssl_verify (bool): Whether to verify the SSL certificate or not.
43 Set to false when using servers with self-signed certs.
44 collection_zone (str): Zenoss Cloud collection zone
45 api_key (str): Zenoss Cloud api key
46 """
47 if not host:
48 if 'ZENOSS_HOST' in os.environ:
49 host = os.environ['ZENOSS_HOST']
51 if not user:
52 if 'ZENOSS_USER' in os.environ:
53 user = os.environ['ZENOSS_USER']
55 if not password:
56 if 'ZENOSS_PASSWD' in os.environ:
57 password = os.environ['ZENOSS_PASSWD']
59 if not collection_zone:
60 if 'ZENOSS_COLLECTION_ZONE' in os.environ:
61 collection_zone = os.environ['ZENOSS_COLLECTION_ZONE']
63 if not api_key:
64 if 'ZENOSS_API_KEY' in os.environ:
65 api_key = os.environ['ZENOSS_API_KEY']
67 if ssl_verify is None:
68 if 'ZENOSS_SSL_VERIFY' in os.environ:
69 ssl_verify = os.environ['ZENOSS_SSL_VERIFY']
70 else:
71 ssl_verify = True
73 if isinstance(ssl_verify, str):
74 ssl_verify = bool(strtobool(ssl_verify))
76 if collection_zone:
77 host = '{0}/{1}'.format(host, collection_zone)
79 self.api_url = '{0}/zport/dmd'.format(host)
80 self.ssl_verify = ssl_verify
81 self.api_headers = {"Content-Type": "application/json"}
82 self.routers = dict()
84 for router in self.get_routers():
85 self.routers[router] = __import__(
86 'zenossapi.routers.{0}'.format(router),
87 fromlist=[router])
89 if user and password:
90 self.api_headers.update(urllib3.make_headers(
91 basic_auth='{0}:{1}'.format(user, password)))
93 if api_key:
94 self.api_headers['z-api-key'] = api_key
96 def get_routers(self):
97 """
98 Gets the list of availble Zenoss API routers
100 Returns:
101 list:
102 """
103 router_list = []
104 routers_path = os.path.join(
105 os.path.dirname(os.path.abspath(__file__)), 'routers')
106 file_list = os.listdir(routers_path)
107 file_list = [filename for filename in file_list if '.py' in filename]
108 for fname in file_list:
109 name, ext = fname.split('.')
110 if name == "__init__":
111 continue
112 if ext == "py":
113 router_list.append(name)
115 return router_list
117 def get_router(self, router):
118 """
119 Instantiates and returns a Zenoss router object
121 Arguments:
122 router (str): The API router to use
123 """
124 router_class = getattr(
125 self.routers[router],
126 '__router__',
127 '{0}Router'.format(router.capitalize())
128 )
130 api_router = getattr(
131 self.routers[router],
132 router_class,
133 )(
134 self.api_url,
135 self.api_headers,
136 self.ssl_verify,
137 )
139 return api_router
141 def get_router_methods(self, router):
142 """
143 List all available methods for an API router
145 Arguments:
146 router (str): The router to get methods from
148 Returns:
149 list:
150 """
151 router_methods = []
152 router_class = getattr(
153 self.routers[router],
154 '__router__',
155 '{0}Router'.format(router.capitalize())
156 )
158 for method in inspect.getmembers(
159 getattr(self.routers[router],
160 router_class),
161 predicate=inspect.isroutine
162 ):
163 if method[0].startswith('__'):
164 continue
165 router_methods.append(method[0])
167 return router_methods