Source code for FirnCorr.datasets.fetch_mar

#!/usr/bin/env python
"""
fetch_mar.py
Written by Tyler Sutterley (04/2026)

Syncs MAR regional climate outputs for a given url
    ftp://ftp.climato.be/fettweis
    http://ftp.climato.be/fettweis

PROGRAM DEPENDENCIES:
    utilities.py: download and management utilities for syncing files

UPDATE HISTORY:
    Updated 04/2026: refactored for new FirnCorr library
    Updated 10/2021: using python logging for handling verbose output
        use utilities module for finding and retrieving files
    Updated 05/2020: added years option to reduce list of files
    Updated 11/2019: added multiprocessing option to run in parallel
    Written 07/2019
"""

from __future__ import print_function

import os
import logging
import pathlib
import argparse
import traceback
import multiprocessing
import FirnCorr.utilities

# default data directory for SMB and firn models
_default_directory = FirnCorr.utilities.get_cache_path()


# PURPOSE: sync local MAR files for a given URL
[docs] def fetch_mar( URL, directory: str | pathlib.Path = _default_directory, years: list[int] | None = None, timeout: int | None = None, processes: int = 0, clobber: bool = False, mode: int = 0o775, ): """ Syncs MAR regional climate outputs for a given url Parameters ---------- URL: object URL from :py:class:`FirnCorr.utilities.URL` directory: str or pathlib.Path Working data directory years: list, default None Years to sync timeout: int, default None Timeout in seconds for blocking operations processes: int, default 0 Number of processes to use in file downloads (0 for serial) clobber: bool, default False Overwrite existing data in transfer mode: int, default 0o775 Local permissions mode of directories and files """ # standard output (terminal output) logging.basicConfig(level=logging.INFO) # directory setup directory = pathlib.Path(directory).expanduser().absolute() # check if local directory exists and recursively create if not local_directory = directory.joinpath("MAR") local_directory.mkdir(exist_ok=True, parents=True, mode=mode) # regular expression for finding years in file names R1 = r"\d+" if not years else r"|".join(map(str, years)) # find files and reduce to years of interest if specified if URL.scheme.startswith("ftp"): # find files on ftp server remote_files, remote_times = FirnCorr.utilities.ftp_list( [URL.netloc, URL.path], timeout=timeout, basename=True, pattern=R1, sort=True, ) elif URL.scheme.startswith("http"): # find files on http server remote_files, remote_times = FirnCorr.utilities.mar_list( URL.geturl(), timeout=timeout, pattern=R1, sort=True, ) # sync each data file (in parallel if processes > 0) if processes > 0: # sync each data file out = [] # set multiprocessing start method ctx = multiprocessing.get_context("fork") # sync in parallel with multiprocessing Pool pool = ctx.Pool(processes=processes) # keyword arguments for multiprocessing download function kwds = dict(timeout=timeout, clobber=clobber, mode=mode) # download remote MAR files to local directory for colname, collastmod in zip(remote_files, remote_times): remote_path = URL.joinpath(colname) local_file = local_directory.joinpath(colname) out.append( pool.apply_async( _multiprocess, args=(remote_path, collastmod, local_file), kwds=kwds, ) ) # start multiprocessing jobs # close the pool # prevents more tasks from being submitted to the pool pool.close() # exit the completed processes pool.join() # print the output string for output in out: temp = output.get() logging.info(temp) if temp else None else: # sync each data file in series kwds = dict(timeout=timeout, clobber=clobber, mode=mode) for colname, collastmod in zip(remote_files, remote_times): remote_path = URL.joinpath(colname) local_file = local_directory.joinpath(colname) output = _download(remote_path, collastmod, local_file, **kwds) logging.info(output) if output else None
# PURPOSE: wrapper for running the sync program in multiprocessing mode def _multiprocess(*args, **kwds): """ Wrapper for running the sync program in ``multiprocessing`` mode """ try: output = _download(*args, **kwds) except Exception as exc: # if there has been an error exception # print the type, value, and stack trace of the # current exception being handled logging.critical(f"process id {os.getpid():d} failed") logging.error(traceback.format_exc()) else: return output # PURPOSE: pull file from a remote host checking if file exists locally # and if the remote file is newer than the local file def _download( URL, mtime: int | float, local: str | pathlib.Path, **kwargs, ): """ Pull file from a remote host Parameters ---------- URL: object URL from :py:class:`FirnCorr.utilities.URL` mtime: float Last modification time of the remote file in seconds since the epoch local: str or pathlib.Path Path to local file to be synced kwargs: dict Additional keyword arguments for syncing files """ # check if local version of file exists if kwargs["clobber"]: why = "overwrite" elif not local.exists(): why = "new" elif local.exists() and _newer(mtime, local.stat().st_mtime): return else: why = "old" # if file does not exist locally, is to be overwritten, or clobber is set # output string for printing files transferred output = f"\n\tremote={URL} -->\n\tlocal={local}\n\treason={why}" # copy remote file contents to local file URL.get( timeout=kwargs["timeout"], local=local, hash=FirnCorr.utilities.get_hash(local), label=output, mode=kwargs["mode"], ) # keep remote modification time of file and local access time os.utime(local, (local.stat().st_atime, mtime)) # return the output string return output # PURPOSE: compare the modification time of two files def _newer(t1: int, t2: int) -> bool: """ Compare the modification time of two files Parameters ---------- t1: int Modification time of first file t2: int Modification time of second file """ return FirnCorr.utilities.even(t1) <= FirnCorr.utilities.even(t2) # PURPOSE: create arguments parser def arguments(): parser = argparse.ArgumentParser( description="""Syncs MAR regional climate outputs """ ) parser.add_argument("url", type=str, help="MAR url") # working data directory parser.add_argument( "--directory", "-D", type=pathlib.Path, default=_default_directory, help="Working data directory", ) # years of data to sync parser.add_argument( "--year", "-Y", type=int, nargs="+", help="Years to sync" ) # run sync in series if processes is 0 parser.add_argument( "--np", "-P", metavar="PROCESSES", type=int, default=1, help="Number of processes to use in file downloads", ) # connection timeout parser.add_argument( "--timeout", "-T", type=int, default=120, help="Timeout in seconds for blocking operations", ) # clobber will overwrite the existing data parser.add_argument( "--clobber", "-C", default=False, action="store_true", help="Overwrite existing data", ) # permissions mode of the local directories and files (number in octal) parser.add_argument( "--mode", "-M", type=lambda x: int(x, base=8), default=0o775, help="Permission mode of directories and files downloaded", ) # return the parser return parser # This is the main part of the program that calls the individual modules def main(): # Read the system arguments listed after the program parser = arguments() args, _ = parser.parse_known_args() # create and parse URL object URL = FirnCorr.utilities.URL(args.url) # parameters for connection check if URL.scheme.startswith("ftp"): check_connection = FirnCorr.utilities.check_ftp_connection netloc = URL.netloc elif URL.scheme.startswith("http"): check_connection = FirnCorr.utilities.check_connection netloc = URL.parents[-1].geturl() # check internet connection if check_connection(netloc): # run program for URL fetch_mar( URL, directory=args.directory, years=args.year, timeout=args.timeout, processes=args.np, clobber=args.clobber, mode=args.mode, ) # run main program if __name__ == "__main__": main()