ctools  2.0.0
 All Classes Namespaces Files Functions Variables Macros Pages
csiactcopy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # ==========================================================================
3 # Copies IACT data from remote machine
4 #
5 # Copyright (C) 2016-2022 Michael Mayer
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #
20 # ==========================================================================
21 import os
22 import sys
23 import json
24 import shutil
25 import gammalib
26 import ctools
27 
28 # ================ #
29 # csiactdata class #
30 # ================ #
31 class csiactcopy(ctools.cscript):
32  """
33  Copies IACT data from remote machine
34 
35  This script copies IACT data from one location to another. It can
36  take a list of observation IDs to allow the download specific
37  observations. Index files get merged and updated accordingly.
38  """
39 
40  # Constructor
41  def __init__(self, *argv):
42  """
43  Constructor
44  """
45  # Initialise application by calling the base class constructor
46  self._init_cscript(self.__class__.__name__, ctools.__version__, argv)
47 
48  # Set name
49  self._datapath = os.getenv('VHEFITS','')
50  self._remote_master = gammalib.GFilename()
51  self._remote_base = ''
52  self._prodname = ''
53  self._outpath = ''
54  self._runlist = gammalib.GFilename()
55  self._runs = []
56 
57  # Return
58  return
59 
60 
61  # Private methods
62  def _get_parameters(self):
63  """
64  Get parameters from parfile and setup the observation
65  """
66  # Get Parameters
67  self._remote_master = self['remote_master'].filename()
68  if not self._remote_master.exists():
69  raise RuntimeError('*** ERROR: FITS data not available. No '
70  'master index file found in "'+
71  self._remote_master+'". Make sure remote '
72  'file system is properly mounted.')
73 
74  # Get parameters
75  self._prodname = self['prodname'].string()
76  self._outpath = gammalib.expand_env(self['outpath'].string())
77  if self['runlist'].is_valid():
78  self._runlist = self['runlist'].filename()
79 
80  # Write input parameters into logger
81  self._log_parameters(gammalib.TERSE)
82 
83  # Return
84  return
85 
86  def _copy(self, source, clobber):
87  """
88  Copy file to self._outpath directory
89 
90  Parameters
91  ----------
92  source : str
93  Path of file to be copied
94 
95  Returns
96  -------
97  filesize : float
98  Size of the file that was copied
99  """
100  # Get file destination
101  destination = os.path.join(self._outpath,
102  os.path.relpath(source, self._remote_base))
103 
104  # Initialise return value
105  filesize = 0.0
106 
107  # Flag if file destination is already available
108  is_file = os.path.isfile(destination)
109 
110  # Logging
111  self._log_header3(gammalib.VERBOSE, 'Copy file')
112  self._log_value(gammalib.VERBOSE, 'Source', source)
113  self._log_value(gammalib.VERBOSE, 'Destination', destination)
114  self._log_value(gammalib.VERBOSE, 'Already exists', is_file)
115  self._log_value(gammalib.VERBOSE, 'Overwrite', clobber)
116 
117  # Check if file could be skipped because clobber=no
118  if is_file and clobber == False:
119  self._log_value(gammalib.VERBOSE, 'Copying', 'Skip (clobber=no)')
120 
121  # ... check if file could be skipped because it is the same file as
122  # inferred from the file path and name (the file is in fact not
123  # checked) ...
124  elif is_file and os.path.samefile(destination, source):
125  self._log_value(gammalib.VERBOSE, 'Copying', 'Skip (same file)')
126 
127  # ... otherwise copy file
128  else:
129  # Create directory if it does not exist
130  dest_dir = os.path.dirname(destination)
131  if not os.path.isdir(dest_dir):
132  os.makedirs(dest_dir)
133 
134  # If destination file exists then delete it. Make sure that file
135  # is writable for that
136  if os.path.isfile(destination):
137  os.chmod(destination, 420)
138  os.remove(destination)
139 
140  # Copy file
141  shutil.copy2(source, destination)
142 
143  # Get filesize
144  filesize = os.stat(destination).st_size
145 
146  # Logging
147  self._log_value(gammalib.VERBOSE, 'Copying', 'Done!')
148 
149  # Return
150  return filesize
151 
152  def _merge(self, localfits, remotefits, hduname, clobber):
153  """
154  Merge remote and local fits files
155 
156  If the local fits file is not present, a new one is created.
157 
158  Parameters
159  ----------
160  localfits : str
161  Path of local index FITS file
162  remotefits : str
163  Path of remote index FITS file
164  hduname : str
165  Name of HDU extension to be merged
166  clobber : bool
167  Flag if remote content should overwrite local content
168  """
169  # Write input parameters into logger
170  self._log_value(gammalib.NORMAL, 'Local file', localfits)
171  self._log_value(gammalib.NORMAL, 'Remote file', remotefits)
172  self._log_value(gammalib.NORMAL, 'HDU name', hduname)
173  if clobber:
174  preference = 'Remote'
175  else:
176  preference = 'Local'
177  self._log_value(gammalib.NORMAL, 'Conflict preference', preference)
178 
179  # Check if local fits file is available
180  if os.path.isfile(localfits):
181  local = gammalib.GFits(str(localfits))
182  local_hdu = local[hduname]
183  else:
184  # Otherwise load remote fits file and delete rows
185  local = gammalib.GFits(str(remotefits))
186  local_hdu = local[hduname]
187  local_hdu.remove_rows(0,local_hdu.nrows())
188 
189  # find local obs_id
190  local_obs = []
191  lobs_col = local_hdu['OBS_ID']
192  for obs_id in lobs_col:
193  local_obs.append(obs_id)
194 
195  # Initialise run list. A kluge has been implemented to circumvent a
196  # too long file name: the run list is split into chunks of 50
197  # observations maximum. Note also the "//" division operator that
198  # makes sure that "i" is an integer on Python 3.
199  runlists = []
200  runs_per_chunk = 50
201  for i in range(len(self._runs) // runs_per_chunk + 1):
202  chunk = self._runs[i * runs_per_chunk : (i + 1) * runs_per_chunk]
203  runlists.append(chunk)
204 
205  # Loop over runlist chunks
206  for i, runlist in enumerate(runlists):
207 
208  # Create selection expression for opening remote fits file
209  selection = ''
210  if len(runlist):
211  selection += '['+hduname+']['
212  for run in runlist:
213  selection += 'OBS_ID=='+str(run)
214  selection += '||'
215  selection = selection[:-2]
216  selection += ']'
217 
218  # Create file name
219  remotefile = remotefits + selection
220 
221  # Open remote fits file
222  remote = gammalib.GFits(str(remotefile))
223 
224  # If first iteration, create clone of FITS column for later use
225  if i == 0:
226  remote_hdu = remote[hduname].clone()
227  else:
228 
229  # get temporary hdu to be added to current HDU
230  tmp_hdu = remote[hduname]
231 
232  # Get old size of remote HDU
233  size = remote_hdu.nrows()
234 
235  # Extend remote HDU
236  remote_hdu.append_rows(tmp_hdu.nrows())
237 
238  # Loop over rows and columns to copy over values exteding remote HDU
239  for col in tmp_hdu:
240  for row in range(col.nrows()):
241  remote_hdu[col.name()][size + row] = col[row]
242 
243  # Get remote obs_id
244  remote_obs = []
245  robs_col = remote_hdu['OBS_ID']
246  for obs_id in robs_col:
247  remote_obs.append(obs_id)
248 
249  # Log entries of index files
250  self._log_value(gammalib.NORMAL, 'Remote entries', len(remote_obs))
251  self._log_value(gammalib.NORMAL, 'Local entries', len(local_obs))
252 
253  # initialise counter for logging
254  removed_rows = 0
255 
256  # If clobber=yes, we use the remote content to overwrite
257  # corresponding local content
258  if clobber:
259 
260  # Loop over remote obs_ids
261  for remote_obs_id in remote_obs:
262 
263  # Check if remote obs_id is present locally
264  if remote_obs_id in local_obs:
265 
266  # Remove local obs_id entries
267  table_has_obsid = True
268 
269  # Loop over obs ids and overwrite with remote content
270  while table_has_obsid:
271 
272  # First remove old rows
273  for i in range(local_hdu.nrows()):
274  if remote_obs_id == local_hdu['OBS_ID'][i]:
275  local_hdu.remove_rows(i,1)
276  removed_rows += 1
277  break
278 
279  # Replace with remote content
280  table_has_obsid = False
281  for i in range(local_hdu.nrows()):
282  if remote_obs_id == local_hdu['OBS_ID'][i]:
283  table_has_obsid = True
284  break
285 
286  # If clobber=false, we keep the local content unchanged and just
287  # expand by remote content
288  else:
289  # Loop over remote obs_ids
290  for local_obs_id in local_obs:
291 
292  # Check if local obs_id is present remotely
293  if local_obs_id in remote_obs:
294 
295  # Remove remote obs_id entries
296  table_has_obsid = True
297  while table_has_obsid:
298  for i in range(remote_hdu.nrows()):
299  if local_obs_id == remote_hdu['OBS_ID'][i]:
300  remote_hdu.remove_rows(i,1)
301  removed_rows += 1
302  break
303  table_has_obsid = False
304  for i in range(remote_hdu.nrows()):
305  if local_obs_id == remote_hdu['OBS_ID'][i]:
306  table_has_obsid = True
307  break
308 
309  # Store number of local rows
310  old_local_rows = local_hdu.nrows()
311 
312  # Add remote HDUs
313  local_hdu.insert_rows(old_local_rows, remote_hdu.nrows())
314 
315  # Log actions
316  if clobber:
317  what = 'Removed local rows'
318  else:
319  what = 'Skipped remote rows'
320  self._log_value(gammalib.NORMAL, what, removed_rows)
321 
322  # Loop over columns
323  for i in range(local_hdu.ncols()):
324 
325  # Get local and remote columns
326  local_col = local_hdu[i]
327  remote_col = remote_hdu[i]
328 
329  # Loop over entries and merge
330  for j in range(remote_col.nrows()):
331  local_col[j+old_local_rows] = remote_col[j]
332 
333  # Save local fits file
334  local.saveto(str(localfits), True)
335 
336  # Return
337  return
338 
339  def _set_runs(self, filename):
340  """
341  Set the run list
342 
343  Parameters
344  ----------
345  filename : `~gammalib.GFilename`
346  Run list file name
347 
348  Returns
349  -------
350  runs : list of int
351  Run list
352  """
353  # Write header into logger
354  self._log_header1(gammalib.TERSE, 'Set runlist')
355 
356  # Initialise runlist
357  runs = []
358 
359  # If runlist file exists then open file and extract the run
360  # information into the run list
361  if filename.exists():
362 
363  # Open the runlist file
364  runfile = open(filename.url())
365 
366  # Read runlist file and append all run ID as integers to the
367  # run list. Skip any blank or comment lines
368  for line in runfile.readlines():
369  if len(line) == 0:
370  continue
371  if line[0] == '#':
372  continue
373  if len(line.split()) > 0:
374  run = int(line.split()[0])
375  runs.append(run)
376  self._log_value(gammalib.EXPLICIT, 'Run %d' % len(runs), run)
377 
378  # Close runlist file
379  runfile.close()
380 
381  # Logging
382  self._log_value(gammalib.NORMAL, 'Number of observations',
383  len(runs))
384 
385  # ... otherwise, if the runlist filename is empty then leave the
386  # run list empty. This implies copying all available data.
387  elif filename.is_empty():
388  self._log_string(gammalib.NORMAL, 'Copy all available data')
389 
390  # ... otherwise raise an exception
391  else:
392  msg = '*** ERROR: Runlist file "%s" not available' % filename.url()
393  raise RuntimeError(msg)
394 
395  # Return run list
396  return runs
397 
398 
399  # Public methods
400  def process(self):
401  """
402  Process the script
403  """
404  # Get parameters
405  self._get_parameters()
406 
407  # Make destination directory if not available
408  if not os.path.isdir(self._outpath):
409  os.makedirs(self._outpath)
410 
411  # Set run list
412  self._runs = self._set_runs(self._runlist)
413 
414  # Check for availability of remote master file
415  if not self._remote_master.exists():
416  raise RuntimeError('*** ERROR: Remote master file "'+
417  self._remote_master+'" does not exist.')
418 
419  # Retrieve json data from remote master
420  json_data = open(self._remote_master.url()).read()
421  data = json.loads(json_data)
422  if not 'datasets' in data:
423  raise RuntimeError('*** ERROR: Key "datasets" not available '
424  'in remote master index file "'+
425  self._remote_master+'".')
426 
427  # Get array of configurations
428  configs = data['datasets']
429 
430  # Get remote paths
431  self._remote_base = self._remote_master.path()
432 
433  # Initialise flag if prod has been found
434  has_prod = False
435 
436  # Initialise file names to be copied
437  files = set()
438 
439  # Write header into logger
440  self._log_header2(gammalib.TERSE, 'Loop over remote configs')
441 
442  # Loop over configs
443  for config in configs:
444 
445  # Write header into logger. The str() is needed since
446  # "config['name']" is <type 'unicode'>, and under Python 2 a
447  # string is expected.
448  self._log_header2(gammalib.VERBOSE, str(config['name']))
449 
450  # Check if prodname was found
451  if config['name'] == self._prodname:
452 
453  # Inidicate that prodname has been found
454  has_prod = True
455 
456  # Build path of index files
457  remote_hdu = str(os.path.join(self._remote_base,
458  config['hduindx']))
459  remote_obs = str(os.path.join(self._remote_base,
460  config['obsindx']))
461 
462  # Log information
463  self._log_header3(gammalib.NORMAL, 'Remote config "'+
464  self._prodname+'"')
465  self._log_value(gammalib.NORMAL, 'HDU index', remote_hdu)
466  self._log_value(gammalib.NORMAL, 'Observation index', remote_obs)
467 
468  # Open remote HDU index file
469  fits = gammalib.GFits(str(remote_hdu))
470  table = fits['HDU_INDEX']
471 
472  # Initialise flag if SIZE column is present
473  has_size = table.contains('SIZE')
474 
475  # Initialise file size
476  if has_size:
477  cp_size = 0
478 
479  # Initialise remote observation IDs
480  remote_ids = set()
481  for row in range(table.nrows()):
482  remote_ids.add(table['OBS_ID'][row])
483 
484  # Log runs that are not available remotely
485  for run in self._runs:
486 
487  # Check for run not in remote data store
488  if not run in remote_ids:
489  msg = ('Skip observation "%s": ID not available '
490  'remotely' % str(run))
491  self._log_string(msg)
492 
493  # Loop over remote HDU index file
494  for row in range(table.nrows()):
495 
496  # Get observation ID
497  obs_id = table['OBS_ID'][row]
498  file_dir = table['FILE_DIR'][row]
499  file_name = table['FILE_NAME'][row]
500 
501  # Skip if filename is empty
502  if file_name == '':
503  continue
504 
505  # Check if we need to consider an input runlist
506  if len(self._runs):
507 
508  # Check if obs id is in runlist
509  if obs_id in self._runs:
510 
511  # Get filename
512  fname = os.path.join(os.path.dirname(remote_hdu),
513  file_dir, file_name)
514 
515  # Add file
516  oldlen = len(files)
517  files.add(fname)
518  newlen = len(files)
519 
520  # Add file size
521  if has_size and newlen > oldlen:
522  cp_size += table['SIZE'][row]
523 
524  # Otherwise add every file
525  else:
526  # Get filename
527  fname = os.path.join(os.path.dirname(remote_hdu),
528  file_dir, file_name)
529 
530  # Add file
531  oldlen = len(files)
532  files.add(fname)
533  newlen = len(files)
534 
535  # Add file size
536  if has_size and newlen > oldlen:
537  cp_size += table['SIZE'][row]
538 
539  # Log file information
540  self._log_header2(gammalib.NORMAL, 'File information')
541  self._log_value(gammalib.NORMAL, 'Number of files', len(files))
542  if has_size:
543  size = float(cp_size) * 1.0e-6
544  self._log_value(gammalib.NORMAL, 'Size', '%.2f MB' % size)
545  self._log_header3(gammalib.VERBOSE, 'File names')
546  for filename in files:
547  self._log_string(gammalib.VERBOSE, str(filename)+'\n')
548 
549  # Close HDU index file
550  fits.close()
551 
552  # If prodname is not found just log that we skip the config
553  else:
554  self._log_header3(gammalib.EXPLICIT, 'Skipping config "'+
555  str(config['name'])+'"')
556 
557  # Raise Exception if prodname was not found
558  if not has_prod:
559  msg = '*** ERROR: FITS production "'+self._prodname+'" not '
560  msg += 'available. Available productions are:\n'
561  for config in configs:
562  msg += ' - '+config['name']+'\n'
563  raise RuntimeError(msg)
564 
565  # Write header
566  self._log_header1(gammalib.NORMAL, 'Copying files')
567 
568  # Intialise counter
569  k = 0
570 
571  # Initialise values for logging
572  last_fraction = 0.0
573  fraction_increment = 20.0
574 
575  # Use 10% step increase
576  if self._logNormal():
577  fraction_increment = 10.0
578 
579  # Use 5% step increase
580  if self._logTerse():
581  fraction_increment = 5.0
582 
583  # Use 2% step increase
584  if self._logExplicit():
585  fraction_increment = 2.0
586 
587  # Initialise logging properties
588  n_copied = 0
589  total_size = 0.0
590 
591  # Loop over files and copy
592  for filename in files:
593 
594  # Log progress
595  fraction = float(k) / float(len(files)) * 100.0
596  while fraction > last_fraction:
597 
598  # Print status of copying procedure
599  self._log_value(gammalib.NORMAL, 'Status', '%d %%' %
600  int(last_fraction))
601  last_fraction += fraction_increment
602 
603  # Copy file
604  filesize = self._copy(filename, self._clobber())
605 
606  # If the filesize is positive then increment the number of copied
607  # files and add the size to the total copied filesize
608  if filesize > 0.0:
609  total_size += filesize
610  n_copied += 1
611 
612  # Increment file counter
613  k += 1
614 
615  # Logging
616  self._log_value(gammalib.NORMAL, 'Status', 'Finished')
617 
618  # Logging about index files
619  self._log_header1(gammalib.TERSE, 'Updating index files')
620 
621  # Build local hdu index file name
622  local_hdu = os.path.join(self._outpath,
623  os.path.relpath(remote_hdu,
624  self._remote_base))
625  # Build local obs index file name
626  local_obs = os.path.join(self._outpath,
627  os.path.relpath(remote_obs,
628  self._remote_base))
629 
630  # If we have a runlist then merge index file
631  if len(self._runs):
632 
633  # Logging
634  self._log_header3(gammalib.TERSE, 'HDU index')
635 
636  # Merge remote index files with local files
637  self._merge(local_hdu, remote_hdu, 'HDU_INDEX', self._clobber())
638 
639  # Logging
640  self._log_header3(gammalib.TERSE, 'OBS index')
641 
642  # Merge remote index files with local files
643  self._merge(local_obs, remote_obs, 'OBS_INDEX', self._clobber())
644 
645  else:
646  # If all files were copied, just copy index files too
647  self._copy(remote_hdu, self._clobber())
648  self._copy(remote_obs, self._clobber())
649 
650  # Logging
651  self._log_header3(gammalib.TERSE, 'Master index file')
652 
653  # Adding prodname to local master
654  localmaster = os.path.join(self._outpath, 'master.json')
655 
656  # If no local master is found, copy master over first
657  if not os.path.isfile(localmaster):
658  self._copy(self._remote_master.url(), self._clobber())
659 
660  # Load local master
661  json_data = open(localmaster).read()
662  data = json.loads(json_data)
663  configs = data['datasets']
664 
665  # Initialise flag indicating if we already have prodname in master
666  # file
667  has_config = False
668 
669  # Initialise new configs array
670  newconfigs = []
671 
672  # Loop over configs in master index file
673  for config in configs:
674 
675  # Get hdu and obs index files
676  hdu = os.path.join(self._outpath, config['hduindx'])
677  obs = os.path.join(self._outpath, config['obsindx'])
678 
679  # Check if index files are available
680  if not (gammalib.GFilename(str(hdu)).is_fits() and
681  gammalib.GFilename(str(obs)).is_fits()):
682  self._log_value(gammalib.NORMAL,
683  'Removing "'+str(config['name']),
684  'Not available')
685  else:
686  # Append config if valid
687  newconfigs.append(config)
688  self._log_value(gammalib.NORMAL,
689  'Keeping "'+str(config['name']),
690  'Available')
691 
692  # Signals that downloaded config is available
693  if config['name'] == self._prodname:
694  has_config = True
695 
696  # Create new entry if config was not available
697  if not has_config:
698  newdict = dict.fromkeys(['name','hduindx','obsindx'])
699  newdict['name'] = self._prodname
700  newdict['hduindx'] = os.path.relpath(local_hdu, self._outpath)
701  newdict['obsindx'] = os.path.relpath(local_obs, self._outpath)
702  newconfigs.append(newdict)
703  if self._logTerse():
704  self._log('Adding "'+str(newdict['name'])+'"')
705  self._log('\n')
706 
707  # Write new json master file. Make sure that we have write permission
708  # before writing the file. This is needed as the original master file
709  # may be read-only, and the shutil.copy2 function copies over the
710  # access permissions.
711  os.chmod(localmaster, 420)
712  f = open(localmaster, 'w')
713  data['datasets'] = newconfigs
714  json.dump(data, f, indent=2)
715  f.close()
716 
717  # Compute total size in MB
718  size_MB = float(total_size) * 1.0e-6
719 
720  # Log summary
721  self._log_header1(gammalib.NORMAL, 'Summary')
722  self._log_value(gammalib.NORMAL, 'Data files found', len(files))
723  self._log_value(gammalib.NORMAL, 'Data files copied', n_copied)
724  self._log_value(gammalib.NORMAL, 'Size', '%.2f MB' % size_MB)
725  self._log_value(gammalib.NORMAL, 'Local configs', len(newconfigs))
726  self._log_header3(gammalib.TERSE, 'Content of master index')
727  for config in newconfigs:
728  self._log_string(gammalib.TERSE, str(config['name'])+'\n')
729 
730  # Return
731  return
732 
733 # ======================== #
734 # Main routine entry point #
735 # ======================== #
736 if __name__ == '__main__':
737 
738  # Create instance of application
739  app = csiactcopy(sys.argv)
740 
741  # Execute application
742  app.execute()