ctools 2.1.0
Loading...
Searching...
No Matches
csiactcopy.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# ==========================================================================
3# Copies IACT data from remote machine
4#
5# Copyright (C) 2016-2022 Michael Mayer
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19#
20# ==========================================================================
21import os
22import sys
23import json
24import shutil
25import gammalib
26import ctools
27
28# ================ #
29# csiactdata class #
30# ================ #
31class csiactcopy(ctools.cscript):
32 """
33 Copies IACT data from remote machine
34
35 This script copies IACT data from one location to another. It can
36 take a list of observation IDs to allow the download specific
37 observations. Index files get merged and updated accordingly.
38 """
39
40 # Constructor
41 def __init__(self, *argv):
42 """
43 Constructor
44 """
45 # Initialise application by calling the base class constructor
46 self._init_cscript(self.__class__.__name__, ctools.__version__, argv)
47
48 # Set name
49 self._datapath = os.getenv('VHEFITS','')
50 self._remote_master = gammalib.GFilename()
51 self._remote_base = ''
52 self._prodname = ''
53 self._outpath = ''
54 self._runlist = gammalib.GFilename()
55 self._runs = []
56
57 # Return
58 return
59
60
61 # Private methods
62 def _get_parameters(self):
63 """
64 Get parameters from parfile and setup the observation
65 """
66 # Get Parameters
67 self._remote_master = self['remote_master'].filename()
68 if not self._remote_master.exists():
69 raise RuntimeError('*** ERROR: FITS data not available. No '
70 'master index file found in "'+
71 self._remote_master+'". Make sure remote '
72 'file system is properly mounted.')
73
74 # Get parameters
75 self._prodname = self['prodname'].string()
76 self._outpath = gammalib.expand_env(self['outpath'].string())
77 if self['runlist'].is_valid():
78 self._runlist = self['runlist'].filename()
79
80 # Write input parameters into logger
81 self._log_parameters(gammalib.TERSE)
82
83 # Return
84 return
85
86 def _copy(self, source, clobber):
87 """
88 Copy file to self._outpath directory
89
90 Parameters
91 ----------
92 source : str
93 Path of file to be copied
94
95 Returns
96 -------
97 filesize : float
98 Size of the file that was copied
99 """
100 # Get file destination
101 destination = os.path.join(self._outpath,
102 os.path.relpath(source, self._remote_base))
103
104 # Initialise return value
105 filesize = 0.0
106
107 # Flag if file destination is already available
108 is_file = os.path.isfile(destination)
109
110 # Logging
111 self._log_header3(gammalib.VERBOSE, 'Copy file')
112 self._log_value(gammalib.VERBOSE, 'Source', source)
113 self._log_value(gammalib.VERBOSE, 'Destination', destination)
114 self._log_value(gammalib.VERBOSE, 'Already exists', is_file)
115 self._log_value(gammalib.VERBOSE, 'Overwrite', clobber)
116
117 # Check if file could be skipped because clobber=no
118 if is_file and clobber == False:
119 self._log_value(gammalib.VERBOSE, 'Copying', 'Skip (clobber=no)')
120
121 # ... check if file could be skipped because it is the same file as
122 # inferred from the file path and name (the file is in fact not
123 # checked) ...
124 elif is_file and os.path.samefile(destination, source):
125 self._log_value(gammalib.VERBOSE, 'Copying', 'Skip (same file)')
126
127 # ... otherwise copy file
128 else:
129 # Create directory if it does not exist
130 dest_dir = os.path.dirname(destination)
131 if not os.path.isdir(dest_dir):
132 os.makedirs(dest_dir)
133
134 # If destination file exists then delete it. Make sure that file
135 # is writable for that
136 if os.path.isfile(destination):
137 os.chmod(destination, 420)
138 os.remove(destination)
139
140 # Copy file
141 shutil.copy2(source, destination)
142
143 # Get filesize
144 filesize = os.stat(destination).st_size
145
146 # Logging
147 self._log_value(gammalib.VERBOSE, 'Copying', 'Done!')
148
149 # Return
150 return filesize
151
152 def _merge(self, localfits, remotefits, hduname, clobber):
153 """
154 Merge remote and local fits files
155
156 If the local fits file is not present, a new one is created.
157
158 Parameters
159 ----------
160 localfits : str
161 Path of local index FITS file
162 remotefits : str
163 Path of remote index FITS file
164 hduname : str
165 Name of HDU extension to be merged
166 clobber : bool
167 Flag if remote content should overwrite local content
168 """
169 # Write input parameters into logger
170 self._log_value(gammalib.NORMAL, 'Local file', localfits)
171 self._log_value(gammalib.NORMAL, 'Remote file', remotefits)
172 self._log_value(gammalib.NORMAL, 'HDU name', hduname)
173 if clobber:
174 preference = 'Remote'
175 else:
176 preference = 'Local'
177 self._log_value(gammalib.NORMAL, 'Conflict preference', preference)
178
179 # Check if local fits file is available
180 if os.path.isfile(localfits):
181 local = gammalib.GFits(str(localfits))
182 local_hdu = local[hduname]
183 else:
184 # Otherwise load remote fits file and delete rows
185 local = gammalib.GFits(str(remotefits))
186 local_hdu = local[hduname]
187 local_hdu.remove_rows(0,local_hdu.nrows())
188
189 # find local obs_id
190 local_obs = []
191 lobs_col = local_hdu['OBS_ID']
192 for obs_id in lobs_col:
193 local_obs.append(obs_id)
194
195 # Initialise run list. A kluge has been implemented to circumvent a
196 # too long file name: the run list is split into chunks of 50
197 # observations maximum. Note also the "//" division operator that
198 # makes sure that "i" is an integer on Python 3.
199 runlists = []
200 runs_per_chunk = 50
201 for i in range(len(self._runs) // runs_per_chunk + 1):
202 chunk = self._runs[i * runs_per_chunk : (i + 1) * runs_per_chunk]
203 runlists.append(chunk)
204
205 # Loop over runlist chunks
206 for i, runlist in enumerate(runlists):
207
208 # Create selection expression for opening remote fits file
209 selection = ''
210 if len(runlist):
211 selection += '['+hduname+']['
212 for run in runlist:
213 selection += 'OBS_ID=='+str(run)
214 selection += '||'
215 selection = selection[:-2]
216 selection += ']'
217
218 # Create file name
219 remotefile = remotefits + selection
220
221 # Open remote fits file
222 remote = gammalib.GFits(str(remotefile))
223
224 # If first iteration, create clone of FITS column for later use
225 if i == 0:
226 remote_hdu = remote[hduname].clone()
227 else:
228
229 # get temporary hdu to be added to current HDU
230 tmp_hdu = remote[hduname]
231
232 # Get old size of remote HDU
233 size = remote_hdu.nrows()
234
235 # Extend remote HDU
236 remote_hdu.append_rows(tmp_hdu.nrows())
237
238 # Loop over rows and columns to copy over values exteding remote HDU
239 for col in tmp_hdu:
240 for row in range(col.nrows()):
241 remote_hdu[col.name()][size + row] = col[row]
242
243 # Get remote obs_id
244 remote_obs = []
245 robs_col = remote_hdu['OBS_ID']
246 for obs_id in robs_col:
247 remote_obs.append(obs_id)
248
249 # Log entries of index files
250 self._log_value(gammalib.NORMAL, 'Remote entries', len(remote_obs))
251 self._log_value(gammalib.NORMAL, 'Local entries', len(local_obs))
252
253 # initialise counter for logging
254 removed_rows = 0
255
256 # If clobber=yes, we use the remote content to overwrite
257 # corresponding local content
258 if clobber:
259
260 # Loop over remote obs_ids
261 for remote_obs_id in remote_obs:
262
263 # Check if remote obs_id is present locally
264 if remote_obs_id in local_obs:
265
266 # Remove local obs_id entries
267 table_has_obsid = True
268
269 # Loop over obs ids and overwrite with remote content
270 while table_has_obsid:
271
272 # First remove old rows
273 for i in range(local_hdu.nrows()):
274 if remote_obs_id == local_hdu['OBS_ID'][i]:
275 local_hdu.remove_rows(i,1)
276 removed_rows += 1
277 break
278
279 # Replace with remote content
280 table_has_obsid = False
281 for i in range(local_hdu.nrows()):
282 if remote_obs_id == local_hdu['OBS_ID'][i]:
283 table_has_obsid = True
284 break
285
286 # If clobber=false, we keep the local content unchanged and just
287 # expand by remote content
288 else:
289 # Loop over remote obs_ids
290 for local_obs_id in local_obs:
291
292 # Check if local obs_id is present remotely
293 if local_obs_id in remote_obs:
294
295 # Remove remote obs_id entries
296 table_has_obsid = True
297 while table_has_obsid:
298 for i in range(remote_hdu.nrows()):
299 if local_obs_id == remote_hdu['OBS_ID'][i]:
300 remote_hdu.remove_rows(i,1)
301 removed_rows += 1
302 break
303 table_has_obsid = False
304 for i in range(remote_hdu.nrows()):
305 if local_obs_id == remote_hdu['OBS_ID'][i]:
306 table_has_obsid = True
307 break
308
309 # Store number of local rows
310 old_local_rows = local_hdu.nrows()
311
312 # Add remote HDUs
313 local_hdu.insert_rows(old_local_rows, remote_hdu.nrows())
314
315 # Log actions
316 if clobber:
317 what = 'Removed local rows'
318 else:
319 what = 'Skipped remote rows'
320 self._log_value(gammalib.NORMAL, what, removed_rows)
321
322 # Loop over columns
323 for i in range(local_hdu.ncols()):
324
325 # Get local and remote columns
326 local_col = local_hdu[i]
327 remote_col = remote_hdu[i]
328
329 # Loop over entries and merge
330 for j in range(remote_col.nrows()):
331 local_col[j+old_local_rows] = remote_col[j]
332
333 # Save local fits file
334 local.saveto(str(localfits), True)
335
336 # Return
337 return
338
339 def _set_runs(self, filename):
340 """
341 Set the run list
342
343 Parameters
344 ----------
345 filename : `~gammalib.GFilename`
346 Run list file name
347
348 Returns
349 -------
350 runs : list of int
351 Run list
352 """
353 # Write header into logger
354 self._log_header1(gammalib.TERSE, 'Set runlist')
355
356 # Initialise runlist
357 runs = []
358
359 # If runlist file exists then open file and extract the run
360 # information into the run list
361 if filename.exists():
362
363 # Open the runlist file
364 runfile = open(filename.url())
365
366 # Read runlist file and append all run ID as integers to the
367 # run list. Skip any blank or comment lines
368 for line in runfile.readlines():
369 if len(line) == 0:
370 continue
371 if line[0] == '#':
372 continue
373 if len(line.split()) > 0:
374 run = int(line.split()[0])
375 runs.append(run)
376 self._log_value(gammalib.EXPLICIT, 'Run %d' % len(runs), run)
377
378 # Close runlist file
379 runfile.close()
380
381 # Logging
382 self._log_value(gammalib.NORMAL, 'Number of observations',
383 len(runs))
384
385 # ... otherwise, if the runlist filename is empty then leave the
386 # run list empty. This implies copying all available data.
387 elif filename.is_empty():
388 self._log_string(gammalib.NORMAL, 'Copy all available data')
389
390 # ... otherwise raise an exception
391 else:
392 msg = '*** ERROR: Runlist file "%s" not available' % filename.url()
393 raise RuntimeError(msg)
394
395 # Return run list
396 return runs
397
398
399 # Public methods
400 def process(self):
401 """
402 Process the script
403 """
404 # Get parameters
405 self._get_parameters()
406
407 # Make destination directory if not available
408 if not os.path.isdir(self._outpath):
409 os.makedirs(self._outpath)
410
411 # Set run list
412 self._runs = self._set_runs(self._runlist)
413
414 # Check for availability of remote master file
415 if not self._remote_master.exists():
416 raise RuntimeError('*** ERROR: Remote master file "'+
417 self._remote_master+'" does not exist.')
418
419 # Retrieve json data from remote master
420 json_data = open(self._remote_master.url()).read()
421 data = json.loads(json_data)
422 if not 'datasets' in data:
423 raise RuntimeError('*** ERROR: Key "datasets" not available '
424 'in remote master index file "'+
425 self._remote_master+'".')
426
427 # Get array of configurations
428 configs = data['datasets']
429
430 # Get remote paths
431 self._remote_base = self._remote_master.path()
432
433 # Initialise flag if prod has been found
434 has_prod = False
435
436 # Initialise file names to be copied
437 files = set()
438
439 # Write header into logger
440 self._log_header2(gammalib.TERSE, 'Loop over remote configs')
441
442 # Loop over configs
443 for config in configs:
444
445 # Write header into logger. The str() is needed since
446 # "config['name']" is <type 'unicode'>, and under Python 2 a
447 # string is expected.
448 self._log_header2(gammalib.VERBOSE, str(config['name']))
449
450 # Check if prodname was found
451 if config['name'] == self._prodname:
452
453 # Inidicate that prodname has been found
454 has_prod = True
455
456 # Build path of index files
457 remote_hdu = str(os.path.join(self._remote_base,
458 config['hduindx']))
459 remote_obs = str(os.path.join(self._remote_base,
460 config['obsindx']))
461
462 # Log information
463 self._log_header3(gammalib.NORMAL, 'Remote config "'+
464 self._prodname+'"')
465 self._log_value(gammalib.NORMAL, 'HDU index', remote_hdu)
466 self._log_value(gammalib.NORMAL, 'Observation index', remote_obs)
467
468 # Open remote HDU index file
469 fits = gammalib.GFits(str(remote_hdu))
470 table = fits['HDU_INDEX']
471
472 # Initialise flag if SIZE column is present
473 has_size = table.contains('SIZE')
474
475 # Initialise file size
476 if has_size:
477 cp_size = 0
478
479 # Initialise remote observation IDs
480 remote_ids = set()
481 for row in range(table.nrows()):
482 remote_ids.add(table['OBS_ID'][row])
483
484 # Log runs that are not available remotely
485 for run in self._runs:
486
487 # Check for run not in remote data store
488 if not run in remote_ids:
489 msg = ('Skip observation "%s": ID not available '
490 'remotely' % str(run))
491 self._log_string(msg)
492
493 # Loop over remote HDU index file
494 for row in range(table.nrows()):
495
496 # Get observation ID
497 obs_id = table['OBS_ID'][row]
498 file_dir = table['FILE_DIR'][row]
499 file_name = table['FILE_NAME'][row]
500
501 # Skip if filename is empty
502 if file_name == '':
503 continue
504
505 # Check if we need to consider an input runlist
506 if len(self._runs):
507
508 # Check if obs id is in runlist
509 if obs_id in self._runs:
510
511 # Get filename
512 fname = os.path.join(os.path.dirname(remote_hdu),
513 file_dir, file_name)
514
515 # Add file
516 oldlen = len(files)
517 files.add(fname)
518 newlen = len(files)
519
520 # Add file size
521 if has_size and newlen > oldlen:
522 cp_size += table['SIZE'][row]
523
524 # Otherwise add every file
525 else:
526 # Get filename
527 fname = os.path.join(os.path.dirname(remote_hdu),
528 file_dir, file_name)
529
530 # Add file
531 oldlen = len(files)
532 files.add(fname)
533 newlen = len(files)
534
535 # Add file size
536 if has_size and newlen > oldlen:
537 cp_size += table['SIZE'][row]
538
539 # Log file information
540 self._log_header2(gammalib.NORMAL, 'File information')
541 self._log_value(gammalib.NORMAL, 'Number of files', len(files))
542 if has_size:
543 size = float(cp_size) * 1.0e-6
544 self._log_value(gammalib.NORMAL, 'Size', '%.2f MB' % size)
545 self._log_header3(gammalib.VERBOSE, 'File names')
546 for filename in files:
547 self._log_string(gammalib.VERBOSE, str(filename)+'\n')
548
549 # Close HDU index file
550 fits.close()
551
552 # If prodname is not found just log that we skip the config
553 else:
554 self._log_header3(gammalib.EXPLICIT, 'Skipping config "'+
555 str(config['name'])+'"')
556
557 # Raise Exception if prodname was not found
558 if not has_prod:
559 msg = '*** ERROR: FITS production "'+self._prodname+'" not '
560 msg += 'available. Available productions are:\n'
561 for config in configs:
562 msg += ' - '+config['name']+'\n'
563 raise RuntimeError(msg)
564
565 # Write header
566 self._log_header1(gammalib.NORMAL, 'Copying files')
567
568 # Intialise counter
569 k = 0
570
571 # Initialise values for logging
572 last_fraction = 0.0
573 fraction_increment = 20.0
574
575 # Use 10% step increase
576 if self._logNormal():
577 fraction_increment = 10.0
578
579 # Use 5% step increase
580 if self._logTerse():
581 fraction_increment = 5.0
582
583 # Use 2% step increase
584 if self._logExplicit():
585 fraction_increment = 2.0
586
587 # Initialise logging properties
588 n_copied = 0
589 total_size = 0.0
590
591 # Loop over files and copy
592 for filename in files:
593
594 # Log progress
595 fraction = float(k) / float(len(files)) * 100.0
596 while fraction > last_fraction:
597
598 # Print status of copying procedure
599 self._log_value(gammalib.NORMAL, 'Status', '%d %%' %
600 int(last_fraction))
601 last_fraction += fraction_increment
602
603 # Copy file
604 filesize = self._copy(filename, self._clobber())
605
606 # If the filesize is positive then increment the number of copied
607 # files and add the size to the total copied filesize
608 if filesize > 0.0:
609 total_size += filesize
610 n_copied += 1
611
612 # Increment file counter
613 k += 1
614
615 # Logging
616 self._log_value(gammalib.NORMAL, 'Status', 'Finished')
617
618 # Logging about index files
619 self._log_header1(gammalib.TERSE, 'Updating index files')
620
621 # Build local hdu index file name
622 local_hdu = os.path.join(self._outpath,
623 os.path.relpath(remote_hdu,
624 self._remote_base))
625 # Build local obs index file name
626 local_obs = os.path.join(self._outpath,
627 os.path.relpath(remote_obs,
628 self._remote_base))
629
630 # If we have a runlist then merge index file
631 if len(self._runs):
632
633 # Logging
634 self._log_header3(gammalib.TERSE, 'HDU index')
635
636 # Merge remote index files with local files
637 self._merge(local_hdu, remote_hdu, 'HDU_INDEX', self._clobber())
638
639 # Logging
640 self._log_header3(gammalib.TERSE, 'OBS index')
641
642 # Merge remote index files with local files
643 self._merge(local_obs, remote_obs, 'OBS_INDEX', self._clobber())
644
645 else:
646 # If all files were copied, just copy index files too
647 self._copy(remote_hdu, self._clobber())
648 self._copy(remote_obs, self._clobber())
649
650 # Logging
651 self._log_header3(gammalib.TERSE, 'Master index file')
652
653 # Adding prodname to local master
654 localmaster = os.path.join(self._outpath, 'master.json')
655
656 # If no local master is found, copy master over first
657 if not os.path.isfile(localmaster):
658 self._copy(self._remote_master.url(), self._clobber())
659
660 # Load local master
661 json_data = open(localmaster).read()
662 data = json.loads(json_data)
663 configs = data['datasets']
664
665 # Initialise flag indicating if we already have prodname in master
666 # file
667 has_config = False
668
669 # Initialise new configs array
670 newconfigs = []
671
672 # Loop over configs in master index file
673 for config in configs:
674
675 # Get hdu and obs index files
676 hdu = os.path.join(self._outpath, config['hduindx'])
677 obs = os.path.join(self._outpath, config['obsindx'])
678
679 # Check if index files are available
680 if not (gammalib.GFilename(str(hdu)).is_fits() and
681 gammalib.GFilename(str(obs)).is_fits()):
682 self._log_value(gammalib.NORMAL,
683 'Removing "'+str(config['name']),
684 'Not available')
685 else:
686 # Append config if valid
687 newconfigs.append(config)
688 self._log_value(gammalib.NORMAL,
689 'Keeping "'+str(config['name']),
690 'Available')
691
692 # Signals that downloaded config is available
693 if config['name'] == self._prodname:
694 has_config = True
695
696 # Create new entry if config was not available
697 if not has_config:
698 newdict = dict.fromkeys(['name','hduindx','obsindx'])
699 newdict['name'] = self._prodname
700 newdict['hduindx'] = os.path.relpath(local_hdu, self._outpath)
701 newdict['obsindx'] = os.path.relpath(local_obs, self._outpath)
702 newconfigs.append(newdict)
703 if self._logTerse():
704 self._log('Adding "'+str(newdict['name'])+'"')
705 self._log('\n')
706
707 # Write new json master file. Make sure that we have write permission
708 # before writing the file. This is needed as the original master file
709 # may be read-only, and the shutil.copy2 function copies over the
710 # access permissions.
711 os.chmod(localmaster, 420)
712 f = open(localmaster, 'w')
713 data['datasets'] = newconfigs
714 json.dump(data, f, indent=2)
715 f.close()
716
717 # Compute total size in MB
718 size_MB = float(total_size) * 1.0e-6
719
720 # Log summary
721 self._log_header1(gammalib.NORMAL, 'Summary')
722 self._log_value(gammalib.NORMAL, 'Data files found', len(files))
723 self._log_value(gammalib.NORMAL, 'Data files copied', n_copied)
724 self._log_value(gammalib.NORMAL, 'Size', '%.2f MB' % size_MB)
725 self._log_value(gammalib.NORMAL, 'Local configs', len(newconfigs))
726 self._log_header3(gammalib.TERSE, 'Content of master index')
727 for config in newconfigs:
728 self._log_string(gammalib.TERSE, str(config['name'])+'\n')
729
730 # Return
731 return
732
733# ======================== #
734# Main routine entry point #
735# ======================== #
736if __name__ == '__main__':
737
738 # Create instance of application
739 app = csiactcopy(sys.argv)
740
741 # Execute application
742 app.execute()
_merge(self, localfits, remotefits, hduname, clobber)
_copy(self, source, clobber)
Definition csiactcopy.py:86