ctools  2.0.0
 All Classes Namespaces Files Functions Variables Macros Pages
mputils.py
Go to the documentation of this file.
1 # ==========================================================================
2 # Utility functions for multiprocessing handling
3 #
4 # Copyright (C) 2018 Luigi Tibaldo
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 # ==========================================================================
20 import gammalib
21 
22 
23 # ===================== #
24 # Get number of threads #
25 # ===================== #
26 def nthreads(cls):
27  """
28  Determines the number of parallel processes to use
29 
30  The number is based on the user parameter "nthreads" and the availability
31  of the multiprocessing module.
32 
33  Parameters
34  ----------
35  cls : `~ctools.cscript`
36  cscript class
37 
38  Returns
39  -------
40  nthreads : int
41  Number of threads
42  """
43  # Log multiprocessing configuration
44  cls._log_header1(gammalib.NORMAL, 'Multiprocessing')
45 
46  # Try getting multiprocessing support
47  try:
48  from multiprocessing import cpu_count
49  ncpus = cpu_count()
50  cls._log_value(gammalib.NORMAL, 'Multiprocessing', 'available')
51  cls._log_value(gammalib.EXPLICIT, 'CPUs available', ncpus)
52 
53  # Set processes to number of CPUs if requested by the user
54  if cls['nthreads'].integer() == 0:
55  nthreads = ncpus
56 
57  # ... otherwise use the number requested
58  else:
59  cls._log_value(gammalib.EXPLICIT, 'Processes requested',
60  cls['nthreads'].integer())
61  nthreads = cls['nthreads'].integer()
62 
63  except:
64  nthreads = 1
65  cls._log_value(gammalib.NORMAL, 'Multiprocessing', 'not available')
66 
67  # Log number of processes
68  cls._log_value(gammalib.NORMAL, 'Processes available', nthreads)
69 
70  # Return number of processes to be used
71  return nthreads
72 
73 
74 # ============================ #
75 # Execute function in parallel #
76 # ============================ #
77 def process(nthreads, function, args):
78  """
79  Execute function in parallel
80 
81  Parameters
82  ----------
83  nthreads : int
84  Number of parallel threads
85  function : Python function
86  Function to be executed in parallel
87  args : list of tuples
88  Function arguments for each evaluation
89 
90  Returns
91  -------
92  results : list of dicts
93  List of function evaluation results
94  """
95  # Import Pool module from multiprocessing module
96  from multiprocessing import Pool
97 
98  # Create a pool of processes
99  pool = Pool(processes=nthreads)
100 
101  # Run function in parallel
102  results = pool.map(function, args)
103 
104  # Close pool and join
105  pool.close()
106  pool.join()
107 
108  # Add CPU times of child processes to CPU time of parent
109  nchilds = len(args)
110  for i in range(nchilds):
111  celapse = results[i][1]['celapse']
112  args[i][0]._add_celapse(celapse)
113 
114  # Return results
115  return results
116 
117 
118 # ======== #
119 # Function #
120 # ======== #
121 def mpfunc(args):
122  """
123  Multiprocessing function
124 
125  Parameters
126  ----------
127  args : tuple
128  Tuple comprised of class, function name and function argument
129 
130  Returns
131  -------
132  result, info : tuple
133  Tuple of function result and information
134  """
135  # Extract
136  cls = args[0] # Class
137  fct = args[1] # Class method name
138  i = args[2] # Function argument
139 
140  # Initialise thread logger
141  cls._log.clear()
142  cls._log.buffer_size(100000)
143 
144  # Compute function for argument
145  cstart = cls.celapse()
146  result = getattr(cls, fct)(i)
147  celapse = cls.celapse() - cstart
148  log = cls._log.buffer()
149 
150  # Close logger
151  cls._log.close()
152 
153  # Collect thread information
154  info = {'celapse': celapse, 'log': log}
155 
156  # Return light curve bin result and thread information
157  return result, info