33 Copies IACT data from remote machine
35 This script copies IACT data from one location to another. It can
36 take a list of observation IDs to allow the download specific
37 observations. Index files get merged and updated accordingly.
46 self._init_cscript(self.__class__.__name__, ctools.__version__, argv)
64 Get parameters from parfile and setup the observation
68 if not self._remote_master.exists():
69 raise RuntimeError(
'*** ERROR: FITS data not available. No '
70 'master index file found in "'+
72 'file system is properly mounted.')
75 self.
_prodname = self[
'prodname'].string()
76 self.
_outpath = gammalib.expand_env(self[
'outpath'].string())
77 if self[
'runlist'].is_valid():
78 self.
_runlist = self[
'runlist'].filename()
81 self._log_parameters(gammalib.TERSE)
86 def _copy(self, source, clobber):
88 Copy file to self._outpath directory
93 Path of file to be copied
98 Size of the file that was copied
101 destination = os.path.join(self.
_outpath,
108 is_file = os.path.isfile(destination)
111 self._log_header3(gammalib.VERBOSE,
'Copy file')
112 self._log_value(gammalib.VERBOSE,
'Source', source)
113 self._log_value(gammalib.VERBOSE,
'Destination', destination)
114 self._log_value(gammalib.VERBOSE,
'Already exists', is_file)
115 self._log_value(gammalib.VERBOSE,
'Overwrite', clobber)
118 if is_file
and clobber ==
False:
119 self._log_value(gammalib.VERBOSE,
'Copying',
'Skip (clobber=no)')
124 elif is_file
and os.path.samefile(destination, source):
125 self._log_value(gammalib.VERBOSE,
'Copying',
'Skip (same file)')
130 dest_dir = os.path.dirname(destination)
131 if not os.path.isdir(dest_dir):
132 os.makedirs(dest_dir)
136 if os.path.isfile(destination):
137 os.chmod(destination, 420)
138 os.remove(destination)
141 shutil.copy2(source, destination)
144 filesize = os.stat(destination).st_size
147 self._log_value(gammalib.VERBOSE,
'Copying',
'Done!')
152 def _merge(self, localfits, remotefits, hduname, clobber):
154 Merge remote and local fits files
156 If the local fits file is not present, a new one is created.
161 Path of local index FITS file
163 Path of remote index FITS file
165 Name of HDU extension to be merged
167 Flag if remote content should overwrite local content
170 self._log_value(gammalib.NORMAL,
'Local file', localfits)
171 self._log_value(gammalib.NORMAL,
'Remote file', remotefits)
172 self._log_value(gammalib.NORMAL,
'HDU name', hduname)
174 preference =
'Remote'
177 self._log_value(gammalib.NORMAL,
'Conflict preference', preference)
180 if os.path.isfile(localfits):
181 local = gammalib.GFits(str(localfits))
182 local_hdu = local[hduname]
185 local = gammalib.GFits(str(remotefits))
186 local_hdu = local[hduname]
187 local_hdu.remove_rows(0,local_hdu.nrows())
191 lobs_col = local_hdu[
'OBS_ID']
192 for obs_id
in lobs_col:
193 local_obs.append(obs_id)
201 for i
in range(len(self.
_runs) // runs_per_chunk + 1):
202 chunk = self.
_runs[i * runs_per_chunk : (i + 1) * runs_per_chunk]
203 runlists.append(chunk)
206 for i, runlist
in enumerate(runlists):
211 selection +=
'['+hduname+
']['
213 selection +=
'OBS_ID=='+str(run)
215 selection = selection[:-2]
219 remotefile = remotefits + selection
222 remote = gammalib.GFits(str(remotefile))
226 remote_hdu = remote[hduname].clone()
230 tmp_hdu = remote[hduname]
233 size = remote_hdu.nrows()
236 remote_hdu.append_rows(tmp_hdu.nrows())
240 for row
in range(col.nrows()):
241 remote_hdu[col.name()][size + row] = col[row]
245 robs_col = remote_hdu[
'OBS_ID']
246 for obs_id
in robs_col:
247 remote_obs.append(obs_id)
250 self._log_value(gammalib.NORMAL,
'Remote entries', len(remote_obs))
251 self._log_value(gammalib.NORMAL,
'Local entries', len(local_obs))
261 for remote_obs_id
in remote_obs:
264 if remote_obs_id
in local_obs:
267 table_has_obsid =
True
270 while table_has_obsid:
273 for i
in range(local_hdu.nrows()):
274 if remote_obs_id == local_hdu[
'OBS_ID'][i]:
275 local_hdu.remove_rows(i,1)
280 table_has_obsid =
False
281 for i
in range(local_hdu.nrows()):
282 if remote_obs_id == local_hdu[
'OBS_ID'][i]:
283 table_has_obsid =
True
290 for local_obs_id
in local_obs:
293 if local_obs_id
in remote_obs:
296 table_has_obsid =
True
297 while table_has_obsid:
298 for i
in range(remote_hdu.nrows()):
299 if local_obs_id == remote_hdu[
'OBS_ID'][i]:
300 remote_hdu.remove_rows(i,1)
303 table_has_obsid =
False
304 for i
in range(remote_hdu.nrows()):
305 if local_obs_id == remote_hdu[
'OBS_ID'][i]:
306 table_has_obsid =
True
310 old_local_rows = local_hdu.nrows()
313 local_hdu.insert_rows(old_local_rows, remote_hdu.nrows())
317 what =
'Removed local rows'
319 what =
'Skipped remote rows'
320 self._log_value(gammalib.NORMAL, what, removed_rows)
323 for i
in range(local_hdu.ncols()):
326 local_col = local_hdu[i]
327 remote_col = remote_hdu[i]
330 for j
in range(remote_col.nrows()):
331 local_col[j+old_local_rows] = remote_col[j]
334 local.saveto(str(localfits),
True)
345 filename : `~gammalib.GFilename`
354 self._log_header1(gammalib.TERSE,
'Set runlist')
361 if filename.exists():
364 runfile = open(filename.url())
368 for line
in runfile.readlines():
373 if len(line.split()) > 0:
374 run = int(line.split()[0])
376 self._log_value(gammalib.EXPLICIT,
'Run %d' % len(runs), run)
382 self._log_value(gammalib.NORMAL,
'Number of observations',
387 elif filename.is_empty():
388 self._log_string(gammalib.NORMAL,
'Copy all available data')
392 msg =
'*** ERROR: Runlist file "%s" not available' % filename.url()
393 raise RuntimeError(msg)
408 if not os.path.isdir(self.
_outpath):
415 if not self._remote_master.exists():
416 raise RuntimeError(
'*** ERROR: Remote master file "'+
420 json_data = open(self._remote_master.url()).read()
421 data = json.loads(json_data)
422 if not 'datasets' in data:
423 raise RuntimeError(
'*** ERROR: Key "datasets" not available '
424 'in remote master index file "'+
428 configs = data[
'datasets']
440 self._log_header2(gammalib.TERSE,
'Loop over remote configs')
443 for config
in configs:
448 self._log_header2(gammalib.VERBOSE, str(config[
'name']))
463 self._log_header3(gammalib.NORMAL,
'Remote config "'+
465 self._log_value(gammalib.NORMAL,
'HDU index', remote_hdu)
466 self._log_value(gammalib.NORMAL,
'Observation index', remote_obs)
469 fits = gammalib.GFits(str(remote_hdu))
470 table = fits[
'HDU_INDEX']
473 has_size = table.contains(
'SIZE')
481 for row
in range(table.nrows()):
482 remote_ids.add(table[
'OBS_ID'][row])
485 for run
in self.
_runs:
488 if not run
in remote_ids:
489 msg = (
'Skip observation "%s": ID not available '
490 'remotely' % str(run))
491 self._log_string(msg)
494 for row
in range(table.nrows()):
497 obs_id = table[
'OBS_ID'][row]
498 file_dir = table[
'FILE_DIR'][row]
499 file_name = table[
'FILE_NAME'][row]
509 if obs_id
in self.
_runs:
512 fname = os.path.join(os.path.dirname(remote_hdu),
521 if has_size
and newlen > oldlen:
522 cp_size += table[
'SIZE'][row]
527 fname = os.path.join(os.path.dirname(remote_hdu),
536 if has_size
and newlen > oldlen:
537 cp_size += table[
'SIZE'][row]
540 self._log_header2(gammalib.NORMAL,
'File information')
541 self._log_value(gammalib.NORMAL,
'Number of files', len(files))
543 size = float(cp_size) * 1.0e-6
544 self._log_value(gammalib.NORMAL,
'Size',
'%.2f MB' % size)
545 self._log_header3(gammalib.VERBOSE,
'File names')
546 for filename
in files:
547 self._log_string(gammalib.VERBOSE, str(filename)+
'\n')
554 self._log_header3(gammalib.EXPLICIT,
'Skipping config "'+
555 str(config[
'name'])+
'"')
559 msg =
'*** ERROR: FITS production "'+self.
_prodname+
'" not '
560 msg +=
'available. Available productions are:\n'
561 for config
in configs:
562 msg +=
' - '+config[
'name']+
'\n'
563 raise RuntimeError(msg)
566 self._log_header1(gammalib.NORMAL,
'Copying files')
573 fraction_increment = 20.0
576 if self._logNormal():
577 fraction_increment = 10.0
581 fraction_increment = 5.0
584 if self._logExplicit():
585 fraction_increment = 2.0
592 for filename
in files:
595 fraction = float(k) / float(len(files)) * 100.0
596 while fraction > last_fraction:
599 self._log_value(gammalib.NORMAL,
'Status',
'%d %%' %
601 last_fraction += fraction_increment
604 filesize = self.
_copy(filename, self._clobber())
609 total_size += filesize
616 self._log_value(gammalib.NORMAL,
'Status',
'Finished')
619 self._log_header1(gammalib.TERSE,
'Updating index files')
622 local_hdu = os.path.join(self.
_outpath,
623 os.path.relpath(remote_hdu,
626 local_obs = os.path.join(self.
_outpath,
627 os.path.relpath(remote_obs,
634 self._log_header3(gammalib.TERSE,
'HDU index')
637 self.
_merge(local_hdu, remote_hdu,
'HDU_INDEX', self._clobber())
640 self._log_header3(gammalib.TERSE,
'OBS index')
643 self.
_merge(local_obs, remote_obs,
'OBS_INDEX', self._clobber())
647 self.
_copy(remote_hdu, self._clobber())
648 self.
_copy(remote_obs, self._clobber())
651 self._log_header3(gammalib.TERSE,
'Master index file')
654 localmaster = os.path.join(self.
_outpath,
'master.json')
657 if not os.path.isfile(localmaster):
658 self.
_copy(self._remote_master.url(), self._clobber())
661 json_data = open(localmaster).read()
662 data = json.loads(json_data)
663 configs = data[
'datasets']
673 for config
in configs:
676 hdu = os.path.join(self.
_outpath, config[
'hduindx'])
677 obs = os.path.join(self.
_outpath, config[
'obsindx'])
680 if not (gammalib.GFilename(str(hdu)).is_fits()
and
681 gammalib.GFilename(str(obs)).is_fits()):
682 self._log_value(gammalib.NORMAL,
683 'Removing "'+str(config[
'name']),
687 newconfigs.append(config)
688 self._log_value(gammalib.NORMAL,
689 'Keeping "'+str(config[
'name']),
698 newdict = dict.fromkeys([
'name',
'hduindx',
'obsindx'])
700 newdict[
'hduindx'] = os.path.relpath(local_hdu, self.
_outpath)
701 newdict[
'obsindx'] = os.path.relpath(local_obs, self.
_outpath)
702 newconfigs.append(newdict)
704 self._log(
'Adding "'+str(newdict[
'name'])+
'"')
711 os.chmod(localmaster, 420)
712 f = open(localmaster,
'w')
713 data[
'datasets'] = newconfigs
714 json.dump(data, f, indent=2)
718 size_MB = float(total_size) * 1.0e-6
721 self._log_header1(gammalib.NORMAL,
'Summary')
722 self._log_value(gammalib.NORMAL,
'Data files found', len(files))
723 self._log_value(gammalib.NORMAL,
'Data files copied', n_copied)
724 self._log_value(gammalib.NORMAL,
'Size',
'%.2f MB' % size_MB)
725 self._log_value(gammalib.NORMAL,
'Local configs', len(newconfigs))
726 self._log_header3(gammalib.TERSE,
'Content of master index')
727 for config
in newconfigs:
728 self._log_string(gammalib.TERSE, str(config[
'name'])+
'\n')
736 if __name__ ==
'__main__':