ctools 2.1.0
Loading...
Searching...
No Matches
csworkflow.py
Go to the documentation of this file.
1#! /usr/bin/env python
2# ==========================================================================
3# Executes analysis workflow
4#
5# Copyright (C) 2016-2022 Juergen Knoedlseder
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19#
20# ==========================================================================
21import sys
22import gammalib
23import ctools
24import cscripts
25
26
27# ================ #
28# csworkflow class #
29# ================ #
30class csworkflow(ctools.cscript):
31 """
32 Executes an analysis workflow.
33
34 The ``csworkflow`` script executes an analysis workflow defined in an
35 XML file.
36 """
37
38 # Constructor
39 def __init__(self, *argv):
40 """
41 Constructor.
42
43 Parameters
44 ----------
45 argv : list of str
46 List of IRAF command line parameter strings of the form
47 ``parameter=3``.
48
49 Raises
50 ------
51 TypeError
52 An invalid number of command line arguments was provided.
53 """
54 # Initialise application by calling the base class constructor
55 self._init_cscript(self.__class__.__name__, ctools.__version__, argv)
56
57 # Set members
58 self._workflow = gammalib.GXml()
59 self._actors = []
60
61 # Return
62 return
63
64
65 # Private methods
66 #
67 # Query all user parameters
68 def _get_parameters(self):
69
70 # Load workflow XML file
71 xmlfile = self['inflow'].filename()
72 self._workflow = gammalib.GXml(xmlfile.url())
73
74 # Write input parameters into logger
75 self._log_parameters(gammalib.TERSE)
76
77 # Return
78 return
79
80 # Parse the workflow XML file
81 def _parse_workflow(self):
82
83 # Get workflow
84 workflow = self._workflow.element('workflow')
85
86 # Get number of actors
87 num_actors = workflow.elements('actor')
88
89 # Initialise actors
90 self._actors = []
91
92 # Loop over all actors
93 for i in range(num_actors):
94
95 # Get actor
96 actor = workflow.element('actor', i)
97
98 # Initialise parameters and input actors
99 input_parameters = []
100 output_parameters = []
101 input_actors = []
102 output_actors = []
103
104 # Get actor attributes
105 name = actor.attribute('name')
106 tool = actor.attribute('tool')
107
108 # Get actor input parameters
109 if actor.elements('input') > 0:
110 actor_inputs = actor.element('input')
111 num_inputs = actor_inputs.elements('parameter')
112 for k in range(num_inputs):
113 input_par = actor_inputs.element('parameter', k)
114 input_name = input_par.attribute('name')
115 input_value = input_par.attribute('value')
116 input_actor = input_par.attribute('actor')
117 parameter = {'name': input_name, \
118 'value': input_value, \
119 'actor': input_actor}
120 input_parameters.append(parameter)
121 if input_actor != '':
122 if input_actor not in input_actors:
123 input_actors.append(input_actor)
124
125 # Get actor output parameters
126 if actor.elements('output') > 0:
127 actor_output = actor.element('output')
128 num_outputs = actor_output.elements('parameter')
129 for k in range(num_outputs):
130 output_par = actor_output.element('parameter', k)
131 output_name = output_par.attribute('name')
132 output_value = output_par.attribute('value')
133 output_actor = output_par.attribute('actor')
134 parameter = {'name': output_name, \
135 'value': output_value, \
136 'actor': output_actor}
137 output_parameters.append(parameter)
138 if output_actor != '':
139 if output_actor not in output_actors:
140 output_actors.append(output_actor)
141
142 # Determine number of dependencies
143 num_inputs = len(input_actors)
144
145 # Set actor status
146 if num_inputs > 0:
147 status = 'waiting for input'
148 else:
149 status = 'ready'
150
151 # Create actor entry
152 entry = {'name': name,
153 'tool': tool,
154 'input_parameters': input_parameters,
155 'input_actors': input_actors,
156 'output_parameters': output_parameters,
157 'output_actors': output_actors,
158 'status': status}
159
160 # Append entry
161 self._actors.append(entry)
162
163 # Log information about actors
164 self._log_value(gammalib.NORMAL, 'Actor "%s"' % name, tool)
165
166 # Compute list of predecessors
167 if num_inputs == 0:
168 predecessors = 'none'
169 else:
170 predecessors = ''
171 for k in range(num_inputs):
172 if k > 0:
173 predecessors += ', '
174 predecessors += '"'+input_actors[k]+'"'
175
176 # Log predecessors
177 self._log_value(gammalib.NORMAL,
178 gammalib.number(' Predecessor', num_inputs),
179 predecessors)
180
181 # Return
182 return
183
184 # Execute a workflow
186
187 # Continue while there are actors
188 while len(self._actors) > 0:
189
190 # Find actors which are ready
191 num_executed = 0
192 for actor in self._actors:
193
194 # If actor is ready then execute it
195 if actor['status'] == 'ready':
196
197 # Log execution start
198 self._log_value(gammalib.NORMAL, 'Execute actor',
199 '"%s"' % actor['name'])
200
201 # Execute actor
202 self._execute_actor(actor)
203
204 # Log execution finish
205 self._log_value(gammalib.NORMAL, 'Finished actor execution',
206 '"%s"' % actor['name'])
207
208 # Set actor status to finished
209 actor['status'] = 'finished'
210
211 # Increment number of executed actors
212 num_executed += 1
213
214 # Break if no actors have been executed
215 if num_executed < 1:
216 break
217
218 # Update actor status
219 for actor in self._actors:
220 if actor['status'] != 'finished':
221 input_actors = actor['input_actors']
222 ready = True
223 for input_actor in input_actors:
224 a = self._get_actor(input_actor)
225 if a['status'] != 'finished':
226 ready = False
227 break
228 if ready:
229 actor['status'] = 'ready'
230
231 # Return
232 return
233
234 # Execute an actor
235 def _execute_actor(self, actor):
236
237 # Log input parameters
238 pars = actor['input_parameters']
239 for par in pars:
240 self._log_value(gammalib.NORMAL, ' Input parameter', '%s=%s' %
241 (par['name'], self._get_parameter_value(par)))
242
243 # Log output parameters
244 pars = actor['output_parameters']
245 for par in pars:
246 self._log_value(gammalib.NORMAL, ' Output parameter', '%s=%s' %
247 (par['name'], self._get_parameter_value(par)))
248
249 # Set actor tool
250 if 'tool' in actor:
251 tool = actor['tool']
252 if tool[:2] == 'ct':
253 tool_eval = 'ctools.'+tool+'()'
254 elif tool[:2] == 'cs':
255 tool_eval = 'cscripts.'+tool+'()'
256 else:
257 tool_eval = ''
258 if tool_eval == '':
259 return
260
261 # Create actor tool object
262 tool = eval(tool_eval)
263
264 # Set actor input parameters
265 pars = actor['input_parameters']
266 for par in pars:
267 app_par = tool[par['name']]
268 app_par.value(self._get_parameter_value(par))
269 tool[par['name']] = app_par
270
271 # Set actor output parameters
272 pars = actor['output_parameters']
273 for par in pars:
274 app_par = tool[par['name']]
275 app_par.value(self._get_parameter_value(par))
276 tool[par['name']] = app_par
277
278 # Execute actor
279 tool.logFileOpen()
280 tool.execute()
281
282 # Return
283 return
284
285 # Get the parameter value from the respective actor
286 def _get_parameter_value(self, par):
287
288 # If the actor is empty then simply return the value
289 if (par['actor'] == ''):
290 value = par['value']
291
292 # ... otherwise get the value from the actor
293 else:
294 found = False
295 actor = self._get_actor(par['actor'])
296 for output in actor['output_parameters']:
297 if output['name'] == par['value']:
298 value = output['value']
299 found = True
300 break
301 if not found:
302 msg = 'Parameter "'+par['value']+'" is not an output '+ \
303 'parameter of actor "'+par['actor']+'".'
304 raise RuntimeError(msg)
305
306 # Return value
307 return value
308
309 # Get the actor with the specified name
310 def _get_actor(self, name):
311
312 # Return actor with specified name
313 for actor in self._actors:
314 if actor['name'] == name:
315 return actor
316
317 # Throw an exception
318 msg = 'Actor "'+name+'" not found.'
319 raise RuntimeError(msg)
320
321 # Return
322 return
323
324
325 # Public methods
326 def process(self):
327 """
328 Process the script
329 """
330 # Get parameters
331 self._get_parameters()
332
333 # Parse XML file
334 if self._logTerse():
335 self._log('\n')
336 self._log.header1('Parse workflow XML file')
337 self._parse_workflow()
338
339 # Execute workflow
340 if self._logTerse():
341 self._log('\n')
342 self._log.header1('Execute workflow')
343 self._execute_workflow()
344
345 # Return
346 return
347
348
349# ======================== #
350# Main routine entry point #
351# ======================== #
352if __name__ == '__main__':
353
354 # Create instance of application
355 app = csworkflow(sys.argv)
356
357 # Execute application
358 app.execute()