Coverage for src/zenossapi/apiclient.py: 72%

72 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-25 05:47 +0000

1# -*- coding: utf-8 -*- 

2 

3""" 

4Zenoss API Client Class 

5""" 

6 

7import inspect 

8import os 

9import urllib3 

10 

11class ZenossAPIClientError(Exception): 

12 pass 

13 

14 

15class ZenossAPIClientAuthenticationError(Exception): 

16 pass 

17 

18 

19def strtobool(_string): 

20 """Converts a string to a boolean, replaces distutils.utils.strtobool (deprecated per PEP 632)""" 

21 if _string in ['y', 'yes', 't', 'true', 'on', '1', 1]: 

22 return True 

23 elif _string in ['n', 'no', 'f', 'false', 'off', '0', 0]: 

24 return False 

25 else: 

26 raise ZenossAPIClientError(f"Invalid boolean value: {_string}") 

27 

28 

29class Client(object): 

30 """Client class to access the Zenoss JSON API""" 

31 

32 def __init__(self, host=None, user=None, password=None, ssl_verify=None, collection_zone=None, api_key=None): 

33 """ 

34 Create the client object to communicate with Zenoss. The authentication 

35 and ssl parameters can be pulled from the environment so that they 

36 don't have to be passed in from code or command line args. 

37 

38 Arguments: 

39 host (str): FQDN used to access the Zenoss server 

40 user (str): Zenoss username 

41 password (str): Zenoss user's password 

42 ssl_verify (bool): Whether to verify the SSL certificate or not. 

43 Set to false when using servers with self-signed certs. 

44 collection_zone (str): Zenoss Cloud collection zone 

45 api_key (str): Zenoss Cloud api key 

46 """ 

47 if not host: 

48 if 'ZENOSS_HOST' in os.environ: 

49 host = os.environ['ZENOSS_HOST'] 

50 

51 if not user: 

52 if 'ZENOSS_USER' in os.environ: 

53 user = os.environ['ZENOSS_USER'] 

54 

55 if not password: 

56 if 'ZENOSS_PASSWD' in os.environ: 

57 password = os.environ['ZENOSS_PASSWD'] 

58 

59 if not collection_zone: 

60 if 'ZENOSS_COLLECTION_ZONE' in os.environ: 

61 collection_zone = os.environ['ZENOSS_COLLECTION_ZONE'] 

62 

63 if not api_key: 

64 if 'ZENOSS_API_KEY' in os.environ: 

65 api_key = os.environ['ZENOSS_API_KEY'] 

66 

67 if ssl_verify is None: 

68 if 'ZENOSS_SSL_VERIFY' in os.environ: 

69 ssl_verify = os.environ['ZENOSS_SSL_VERIFY'] 

70 else: 

71 ssl_verify = True 

72 

73 if isinstance(ssl_verify, str): 

74 ssl_verify = bool(strtobool(ssl_verify)) 

75 

76 if collection_zone: 

77 host = '{0}/{1}'.format(host, collection_zone) 

78 

79 self.api_url = '{0}/zport/dmd'.format(host) 

80 self.ssl_verify = ssl_verify 

81 self.api_headers = {"Content-Type": "application/json"} 

82 self.routers = dict() 

83 

84 for router in self.get_routers(): 

85 self.routers[router] = __import__( 

86 'zenossapi.routers.{0}'.format(router), 

87 fromlist=[router]) 

88 

89 if user and password: 

90 self.api_headers.update(urllib3.make_headers( 

91 basic_auth='{0}:{1}'.format(user, password))) 

92 

93 if api_key: 

94 self.api_headers['z-api-key'] = api_key 

95 

96 def get_routers(self): 

97 """ 

98 Gets the list of availble Zenoss API routers 

99 

100 Returns: 

101 list: 

102 """ 

103 router_list = [] 

104 routers_path = os.path.join( 

105 os.path.dirname(os.path.abspath(__file__)), 'routers') 

106 file_list = os.listdir(routers_path) 

107 file_list = [filename for filename in file_list if '.py' in filename] 

108 for fname in file_list: 

109 name, ext = fname.split('.') 

110 if name == "__init__": 

111 continue 

112 if ext == "py": 

113 router_list.append(name) 

114 

115 return router_list 

116 

117 def get_router(self, router): 

118 """ 

119 Instantiates and returns a Zenoss router object 

120 

121 Arguments: 

122 router (str): The API router to use 

123 """ 

124 router_class = getattr( 

125 self.routers[router], 

126 '__router__', 

127 '{0}Router'.format(router.capitalize()) 

128 ) 

129 

130 api_router = getattr( 

131 self.routers[router], 

132 router_class, 

133 )( 

134 self.api_url, 

135 self.api_headers, 

136 self.ssl_verify, 

137 ) 

138 

139 return api_router 

140 

141 def get_router_methods(self, router): 

142 """ 

143 List all available methods for an API router 

144 

145 Arguments: 

146 router (str): The router to get methods from 

147 

148 Returns: 

149 list: 

150 """ 

151 router_methods = [] 

152 router_class = getattr( 

153 self.routers[router], 

154 '__router__', 

155 '{0}Router'.format(router.capitalize()) 

156 ) 

157 

158 for method in inspect.getmembers( 

159 getattr(self.routers[router], 

160 router_class), 

161 predicate=inspect.isroutine 

162 ): 

163 if method[0].startswith('__'): 

164 continue 

165 router_methods.append(method[0]) 

166 

167 return router_methods