ctools 2.1.0.dev
Loading...
Searching...
No Matches
mputils.py
Go to the documentation of this file.
1# ==========================================================================
2# Utility functions for multiprocessing handling
3#
4# Copyright (C) 2018 Luigi Tibaldo
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
18#
19# ==========================================================================
20import gammalib
21
22
23# ===================== #
24# Get number of threads #
25# ===================== #
26def nthreads(cls):
27 """
28 Determines the number of parallel processes to use
29
30 The number is based on the user parameter "nthreads" and the availability
31 of the multiprocessing module.
32
33 Parameters
34 ----------
35 cls : `~ctools.cscript`
36 cscript class
37
38 Returns
39 -------
40 nthreads : int
41 Number of threads
42 """
43 # Log multiprocessing configuration
44 cls._log_header1(gammalib.NORMAL, 'Multiprocessing')
45
46 # Try getting multiprocessing support
47 try:
48 from multiprocessing import cpu_count
49 ncpus = cpu_count()
50 cls._log_value(gammalib.NORMAL, 'Multiprocessing', 'available')
51 cls._log_value(gammalib.EXPLICIT, 'CPUs available', ncpus)
52
53 # Set processes to number of CPUs if requested by the user
54 if cls['nthreads'].integer() == 0:
55 nthreads = ncpus
56
57 # ... otherwise use the number requested
58 else:
59 cls._log_value(gammalib.EXPLICIT, 'Processes requested',
60 cls['nthreads'].integer())
61 nthreads = cls['nthreads'].integer()
62
63 except:
64 nthreads = 1
65 cls._log_value(gammalib.NORMAL, 'Multiprocessing', 'not available')
66
67 # Log number of processes
68 cls._log_value(gammalib.NORMAL, 'Processes available', nthreads)
69
70 # Return number of processes to be used
71 return nthreads
72
73
74# ============================ #
75# Execute function in parallel #
76# ============================ #
77def process(nthreads, function, args):
78 """
79 Execute function in parallel
80
81 Parameters
82 ----------
83 nthreads : int
84 Number of parallel threads
85 function : Python function
86 Function to be executed in parallel
87 args : list of tuples
88 Function arguments for each evaluation
89
90 Returns
91 -------
92 results : list of dicts
93 List of function evaluation results
94 """
95 # Import Pool module from multiprocessing module
96 from multiprocessing import Pool
97
98 # Create a pool of processes
99 pool = Pool(processes=nthreads)
100
101 # Run function in parallel
102 results = pool.map(function, args)
103
104 # Close pool and join
105 pool.close()
106 pool.join()
107
108 # Add CPU times of child processes to CPU time of parent
109 nchilds = len(args)
110 for i in range(nchilds):
111 celapse = results[i][1]['celapse']
112 args[i][0]._add_celapse(celapse)
113
114 # Return results
115 return results
116
117
118# ======== #
119# Function #
120# ======== #
121def mpfunc(args):
122 """
123 Multiprocessing function
124
125 Parameters
126 ----------
127 args : tuple
128 Tuple comprised of class, function name and function argument
129
130 Returns
131 -------
132 result, info : tuple
133 Tuple of function result and information
134 """
135 # Extract
136 cls = args[0] # Class
137 fct = args[1] # Class method name
138 i = args[2] # Function argument
139
140 # Initialise thread logger
141 cls._log.clear()
142 cls._log.buffer_size(100000)
143
144 # Compute function for argument
145 cstart = cls.celapse()
146 result = getattr(cls, fct)(i)
147 celapse = cls.celapse() - cstart
148 log = cls._log.buffer()
149
150 # Close logger
151 cls._log.close()
152
153 # Collect thread information
154 info = {'celapse': celapse, 'log': log}
155
156 # Return light curve bin result and thread information
157 return result, info
process(nthreads, function, args)
Definition mputils.py:77