ctools  2.0.0
 All Classes Namespaces Files Functions Variables Macros Pages
csworkflow.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 # ==========================================================================
3 # Executes analysis workflow
4 #
5 # Copyright (C) 2016-2022 Juergen Knoedlseder
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #
20 # ==========================================================================
21 import sys
22 import gammalib
23 import ctools
24 import cscripts
25 
26 
27 # ================ #
28 # csworkflow class #
29 # ================ #
30 class csworkflow(ctools.cscript):
31  """
32  Executes an analysis workflow.
33 
34  The ``csworkflow`` script executes an analysis workflow defined in an
35  XML file.
36  """
37 
38  # Constructor
39  def __init__(self, *argv):
40  """
41  Constructor.
42 
43  Parameters
44  ----------
45  argv : list of str
46  List of IRAF command line parameter strings of the form
47  ``parameter=3``.
48 
49  Raises
50  ------
51  TypeError
52  An invalid number of command line arguments was provided.
53  """
54  # Initialise application by calling the base class constructor
55  self._init_cscript(self.__class__.__name__, ctools.__version__, argv)
56 
57  # Set members
58  self._workflow = gammalib.GXml()
59  self._actors = []
60 
61  # Return
62  return
63 
64 
65  # Private methods
66  #
67  # Query all user parameters
68  def _get_parameters(self):
69 
70  # Load workflow XML file
71  xmlfile = self['inflow'].filename()
72  self._workflow = gammalib.GXml(xmlfile.url())
73 
74  # Write input parameters into logger
75  self._log_parameters(gammalib.TERSE)
76 
77  # Return
78  return
79 
80  # Parse the workflow XML file
81  def _parse_workflow(self):
82 
83  # Get workflow
84  workflow = self._workflow.element('workflow')
85 
86  # Get number of actors
87  num_actors = workflow.elements('actor')
88 
89  # Initialise actors
90  self._actors = []
91 
92  # Loop over all actors
93  for i in range(num_actors):
94 
95  # Get actor
96  actor = workflow.element('actor', i)
97 
98  # Initialise parameters and input actors
99  input_parameters = []
100  output_parameters = []
101  input_actors = []
102  output_actors = []
103 
104  # Get actor attributes
105  name = actor.attribute('name')
106  tool = actor.attribute('tool')
107 
108  # Get actor input parameters
109  if actor.elements('input') > 0:
110  actor_inputs = actor.element('input')
111  num_inputs = actor_inputs.elements('parameter')
112  for k in range(num_inputs):
113  input_par = actor_inputs.element('parameter', k)
114  input_name = input_par.attribute('name')
115  input_value = input_par.attribute('value')
116  input_actor = input_par.attribute('actor')
117  parameter = {'name': input_name, \
118  'value': input_value, \
119  'actor': input_actor}
120  input_parameters.append(parameter)
121  if input_actor != '':
122  if input_actor not in input_actors:
123  input_actors.append(input_actor)
124 
125  # Get actor output parameters
126  if actor.elements('output') > 0:
127  actor_output = actor.element('output')
128  num_outputs = actor_output.elements('parameter')
129  for k in range(num_outputs):
130  output_par = actor_output.element('parameter', k)
131  output_name = output_par.attribute('name')
132  output_value = output_par.attribute('value')
133  output_actor = output_par.attribute('actor')
134  parameter = {'name': output_name, \
135  'value': output_value, \
136  'actor': output_actor}
137  output_parameters.append(parameter)
138  if output_actor != '':
139  if output_actor not in output_actors:
140  output_actors.append(output_actor)
141 
142  # Determine number of dependencies
143  num_inputs = len(input_actors)
144 
145  # Set actor status
146  if num_inputs > 0:
147  status = 'waiting for input'
148  else:
149  status = 'ready'
150 
151  # Create actor entry
152  entry = {'name': name,
153  'tool': tool,
154  'input_parameters': input_parameters,
155  'input_actors': input_actors,
156  'output_parameters': output_parameters,
157  'output_actors': output_actors,
158  'status': status}
159 
160  # Append entry
161  self._actors.append(entry)
162 
163  # Log information about actors
164  self._log_value(gammalib.NORMAL, 'Actor "%s"' % name, tool)
165 
166  # Compute list of predecessors
167  if num_inputs == 0:
168  predecessors = 'none'
169  else:
170  predecessors = ''
171  for k in range(num_inputs):
172  if k > 0:
173  predecessors += ', '
174  predecessors += '"'+input_actors[k]+'"'
175 
176  # Log predecessors
177  self._log_value(gammalib.NORMAL,
178  gammalib.number(' Predecessor', num_inputs),
179  predecessors)
180 
181  # Return
182  return
183 
184  # Execute a workflow
185  def _execute_workflow(self):
186 
187  # Continue while there are actors
188  while len(self._actors) > 0:
189 
190  # Find actors which are ready
191  num_executed = 0
192  for actor in self._actors:
193 
194  # If actor is ready then execute it
195  if actor['status'] == 'ready':
196 
197  # Log execution start
198  self._log_value(gammalib.NORMAL, 'Execute actor',
199  '"%s"' % actor['name'])
200 
201  # Execute actor
202  self._execute_actor(actor)
203 
204  # Log execution finish
205  self._log_value(gammalib.NORMAL, 'Finished actor execution',
206  '"%s"' % actor['name'])
207 
208  # Set actor status to finished
209  actor['status'] = 'finished'
210 
211  # Increment number of executed actors
212  num_executed += 1
213 
214  # Break if no actors have been executed
215  if num_executed < 1:
216  break
217 
218  # Update actor status
219  for actor in self._actors:
220  if actor['status'] != 'finished':
221  input_actors = actor['input_actors']
222  ready = True
223  for input_actor in input_actors:
224  a = self._get_actor(input_actor)
225  if a['status'] != 'finished':
226  ready = False
227  break
228  if ready:
229  actor['status'] = 'ready'
230 
231  # Return
232  return
233 
234  # Execute an actor
235  def _execute_actor(self, actor):
236 
237  # Log input parameters
238  pars = actor['input_parameters']
239  for par in pars:
240  self._log_value(gammalib.NORMAL, ' Input parameter', '%s=%s' %
241  (par['name'], self._get_parameter_value(par)))
242 
243  # Log output parameters
244  pars = actor['output_parameters']
245  for par in pars:
246  self._log_value(gammalib.NORMAL, ' Output parameter', '%s=%s' %
247  (par['name'], self._get_parameter_value(par)))
248 
249  # Set actor tool
250  if 'tool' in actor:
251  tool = actor['tool']
252  if tool[:2] == 'ct':
253  tool_eval = 'ctools.'+tool+'()'
254  elif tool[:2] == 'cs':
255  tool_eval = 'cscripts.'+tool+'()'
256  else:
257  tool_eval = ''
258  if tool_eval == '':
259  return
260 
261  # Create actor tool object
262  tool = eval(tool_eval)
263 
264  # Set actor input parameters
265  pars = actor['input_parameters']
266  for par in pars:
267  app_par = tool[par['name']]
268  app_par.value(self._get_parameter_value(par))
269  tool[par['name']] = app_par
270 
271  # Set actor output parameters
272  pars = actor['output_parameters']
273  for par in pars:
274  app_par = tool[par['name']]
275  app_par.value(self._get_parameter_value(par))
276  tool[par['name']] = app_par
277 
278  # Execute actor
279  tool.logFileOpen()
280  tool.execute()
281 
282  # Return
283  return
284 
285  # Get the parameter value from the respective actor
286  def _get_parameter_value(self, par):
287 
288  # If the actor is empty then simply return the value
289  if (par['actor'] == ''):
290  value = par['value']
291 
292  # ... otherwise get the value from the actor
293  else:
294  found = False
295  actor = self._get_actor(par['actor'])
296  for output in actor['output_parameters']:
297  if output['name'] == par['value']:
298  value = output['value']
299  found = True
300  break
301  if not found:
302  msg = 'Parameter "'+par['value']+'" is not an output '+ \
303  'parameter of actor "'+par['actor']+'".'
304  raise RuntimeError(msg)
305 
306  # Return value
307  return value
308 
309  # Get the actor with the specified name
310  def _get_actor(self, name):
311 
312  # Return actor with specified name
313  for actor in self._actors:
314  if actor['name'] == name:
315  return actor
316 
317  # Throw an exception
318  msg = 'Actor "'+name+'" not found.'
319  raise RuntimeError(msg)
320 
321  # Return
322  return
323 
324 
325  # Public methods
326  def process(self):
327  """
328  Process the script
329  """
330  # Get parameters
331  self._get_parameters()
332 
333  # Parse XML file
334  if self._logTerse():
335  self._log('\n')
336  self._log.header1('Parse workflow XML file')
337  self._parse_workflow()
338 
339  # Execute workflow
340  if self._logTerse():
341  self._log('\n')
342  self._log.header1('Execute workflow')
343  self._execute_workflow()
344 
345  # Return
346  return
347 
348 
349 # ======================== #
350 # Main routine entry point #
351 # ======================== #
352 if __name__ == '__main__':
353 
354  # Create instance of application
355  app = csworkflow(sys.argv)
356 
357  # Execute application
358  app.execute()