Coverage for src/zenossapi/routers/jobs.py: 92%

49 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-25 05:47 +0000

1# -*- coding: utf-8 -*- 

2 

3""" 

4Zenoss jobs_router 

5""" 

6 

7from zenossapi.routers import ZenossRouter 

8 

9 

10class JobsRouter(ZenossRouter): 

11 """ 

12 Class for interacting with the Zenoss device router 

13 """ 

14 

15 def __init__(self, url, headers, ssl_verify): 

16 super(JobsRouter, self).__init__(url, headers, ssl_verify, 'jobs_router', 'JobsRouter') 

17 self.uuid = None 

18 

19 def __repr__(self): 

20 if self.uuid: 

21 identifier = self.uuid 

22 else: 

23 identifier = hex(id(self)) 

24 

25 return '<{0} object at {1}>'.format( 

26 type(self).__name__, identifier 

27 ) 

28 

29 def abort_jobs_by_uuid(self, jobs): 

30 """ 

31 Aborts the jobs specified in the uuid list. 

32 

33 Arguments: 

34 jobs (list): List of job uuids 

35 

36 Returns: 

37 bool: 

38 """ 

39 jobs_abort_response = self._router_request( 

40 self._make_request_data( 

41 'abort', 

42 dict(jobids=jobs), 

43 ) 

44 ) 

45 

46 return True 

47 

48 def delete_jobs_by_uuid(self, jobs): 

49 """ 

50 Deletes the jobs specified in the uuid list. 

51 

52 Arguments: 

53 jobs (list): List of jobs uuids 

54 

55 Returns: 

56 list: List of uuids deleted 

57 """ 

58 deletes_response = self._router_request( 

59 self._make_request_data( 

60 'deleteJobs', 

61 dict( 

62 jobids=jobs 

63 ) 

64 ) 

65 ) 

66 

67 if deletes_response is None: 

68 return [] 

69 else: 

70 return deletes_response['deletedJobs'] 

71 

72 def list_jobs(self, start=0, limit=50, sort='scheduled', dir='DESC'): 

73 """ 

74 List all Job Manager jobs, supports pagination. 

75 

76 Arguments: 

77 start (int): Offset to start device list from, default 0 

78 limit (int): The number of results to return, default 50 

79 sort (str): Sort key for the list, default is 'scheduled'. Other 

80 sort keys are 'started, 'finished', 'status', 'type' and 'user' 

81 dir (str): Sort order, either 'ASC' or 'DESC', default is 'DESC' 

82 

83 Returns: 

84 dict(int, dict(str, int, int, int, str, str, str, str, str)): :: 

85 

86 { 

87 'total': (int) Total number of jobs, 

88 'jobs': { 

89 'description': (str) Job description, 

90 'finished': (int) Time the job finished in timestamp format, 

91 'scheduled': (int) Time the job was scheduled in timestamp format, 

92 'started': (int) Time the job started in timestamp format, 

93 'status': (str) Status of the job, 

94 'type': (str) Job type, 

95 'uid': (str) JobManager UID - /zport/dmd/JobManager, 

96 'user': (str) User who scheduled the job, 

97 'uuid': (str) UUID of the job, 

98 } 

99 } 

100 

101 """ 

102 jobs_data = self._router_request( 

103 self._make_request_data( 

104 'getJobs', 

105 dict( 

106 start=start, 

107 limit=limit, 

108 sort=sort, 

109 dir=dir, 

110 page=0, 

111 ) 

112 ) 

113 ) 

114 

115 return dict( 

116 total=jobs_data['totalCount'], 

117 jobs=jobs_data['jobs'], 

118 ) 

119 

120 def get_jobs(self, start=0, limit=50, sort='scheduled', dir='ASC'): 

121 """ 

122 Get ZenossJob objects for Job Manager jobs. Supports pagination. 

123 

124 Arguments: 

125 start (int): Offset to start device list from, default 0 

126 limit (int): The number of results to return, default 50 

127 sort (str): Sort key for the list, default is 'scheduled'. Other 

128 sort keys are 'started, 'finished', 'status', 'type' and 'user' 

129 dir (str): Sort order, either 'ASC' or 'DESC', default is 'ASC' 

130 

131 Returns: 

132 list(ZenossJob): 

133 """ 

134 jobs_data = self.list_jobs(start=start, limit=limit, sort=sort, dir=dir) 

135 

136 jobs = [] 

137 for job in jobs_data['jobs']: 

138 jobs.append(self.get_job(job['uuid'])) 

139 

140 return jobs 

141 

142 def get_job(self, job): 

143 """ 

144 Get a ZenossJob object by the job's uuid 

145 

146 Arguments: 

147 job (str): uuid of the job 

148 

149 Returns: 

150 ZenossJob: 

151 """ 

152 job_data = self._router_request( 

153 self._make_request_data( 

154 'getInfo', 

155 dict(jobid=job) 

156 ) 

157 ) 

158 

159 return ZenossJob( 

160 self.api_url, 

161 self.api_headers, 

162 self.ssl_verify, 

163 job_data['data'] 

164 ) 

165 

166 

167class ZenossJob(JobsRouter): 

168 """ 

169 Class for Zenoss job objects 

170 """ 

171 

172 def __init__(self, url, headers, ssl_verify, job_data): 

173 super(ZenossJob, self).__init__(url, headers, ssl_verify) 

174 

175 self.description = job_data['description'] 

176 self.duration = job_data['duration'] 

177 self.finished = job_data['finished'] 

178 self.logfile = job_data['logfile'] 

179 self.scheduled = job_data['scheduled'] 

180 self.started = job_data['started'] 

181 self.status = job_data['status'] 

182 self.type = job_data['type'] 

183 self.user = job_data['user'] 

184 self.uuid = job_data['uuid'] 

185 

186 def abort(self): 

187 """ 

188 Abort the job. 

189 

190 Returns: 

191 bool: 

192 """ 

193 return self.abort_jobs_by_uuid([self.uuid]) 

194 

195 def delete(self): 

196 """ 

197 Delete the job. 

198 

199 Returns: 

200 list: Job ID 

201 """ 

202 return self.delete_jobs_by_uuid([self.uuid]) 

203 

204 def get_log(self): 

205 """ 

206 Get the log for the job. 

207 

208 Returns: 

209 dict(str, bool, list): :: 

210 

211 { 

212 'logfile': Filesystem path of the log file, 

213 'maxLimit': True or False, 

214 'content': Log file lines 

215 } 

216 

217 """ 

218 return self._router_request( 

219 self._make_request_data( 

220 'detail', 

221 dict( 

222 jobid=self.uuid, 

223 ) 

224 ) 

225 ) 

226