Source code for FirnCorr.utilities

#!/usr/bin/env python
"""
utilities.py
Written by Tyler Sutterley (04/2026)
Download and management utilities for syncing time and auxiliary files

PYTHON DEPENDENCIES:
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml
    platformdirs: Python module for determining platform-specific directories
        https://pypi.org/project/platformdirs/

UPDATE HISTORY:
    Updated 04/2026: added string check to determine if is a valid URL
        added function to check if a dependency is available
        added detection functions for checking if files are compressed
        allow additional keyword arguments to http functions
        added get_cache_path function for application cache directories
        add MAR list function for parsing their http server directories
        add NASA Earthdata credential utilities and AWS s3 functions
        added include_algorithm option to get_hash function
    Updated 09/2024: add wrapper to importlib for optional dependencies
    Updated 06/2022: add NASA Common Metadata Repository (CMR) queries
        added function to build GES DISC subsetting API requests
    Updated 04/2022: updated docstrings to numpy documentation format
    Updated 10/2021: build python logging instance for handling verbose output
    Updated 09/2021: added generic list from Apache http server
    Updated 07/2021: add parser for converting file files to arguments
    Updated 03/2021: added sha1 option for retrieving file hashes
    Updated 01/2021: added username and password to ftp functions
        added ftp connection check
    Updated 12/2020: added file object keyword for downloads if verbose
        add url split function for creating url location lists
    Updated 11/2020: normalize source and destination paths in copy
        make context an optional keyword argument in from_http
    Updated 09/2020: copy from http and https to bytesIO object in chunks
    Written 08/2020
"""

from __future__ import print_function, division, annotations

import sys
import os
import re
import io
import ssl
import json
import netrc
import ftplib
import shutil
import base64
import socket
import getpass
import inspect
import hashlib
import logging
import pathlib
import builtins
import warnings
import importlib
import posixpath
import subprocess
import lxml.etree
import platformdirs
import calendar, time
import dateutil.parser

if sys.version_info[0] == 2:
    from urllib import urlencode, quote_plus
    from cookielib import CookieJar
    from urlparse import urlparse
    import urllib2
else:
    from urllib.parse import urlencode, quote_plus, urlparse
    from http.cookiejar import CookieJar
    import urllib.request as urllib2


__all__ = [
    "reify",
    "get_data_path",
    "get_cache_path",
    "import_dependency",
    "dependency_available",
    "is_valid_url",
    "Path",
    "URL",
    "detect_compression",
    "compressuser",
    "get_hash",
    "url_split",
    "convert_arg_line_to_args",
    "get_unix_time",
    "isoformat",
    "even",
    "copy",
    "symlink",
    "check_ftp_connection",
    "ftp_list",
    "from_ftp",
    "_create_default_ssl_context",
    "_create_ssl_context_no_verify",
    "_set_ssl_context_options",
    "check_connection",
    "http_list",
    "from_http",
    "from_json",
    "mar_list",
    "_s3_buckets",
    "_s3_endpoints",
    "_s3_providers",
    "attempt_login",
    "build_opener",
    "get_token",
    "list_tokens",
    "revoke_token",
    "s3_region",
    "s3_client",
    "s3_bucket",
    "s3_key",
    "check_credentials",
    "gesdisc_list",
    "cmr_filter_json",
    "cmr",
    "build_request",
]


[docs] class reify(object): """Class decorator that puts the result of the method it decorates into the instance""" def __init__(self, wrapped): self.wrapped = wrapped self.__name__ = wrapped.__name__ self.__doc__ = wrapped.__doc__ def __get__(self, inst, objtype=None): if inst is None: return self val = self.wrapped(inst) setattr(inst, self.wrapped.__name__, val) return val
# PURPOSE: get absolute path within a package from a relative path
[docs] def get_data_path(relpath: list | str | pathlib.Path): """ Get the absolute path within a package from a relative path Parameters ---------- relpath: list, str or pathlib.Path Relative path """ # current file path filename = inspect.getframeinfo(inspect.currentframe()).filename filepath = pathlib.Path(filename).absolute().parent if isinstance(relpath, list): # use *splat operator to extract from list return filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): return filepath.joinpath(relpath)
# PURPOSE: get the path to the user cache directory
[docs] def get_cache_path( relpath: list | str | pathlib.Path | None = None, appname="firncorr", ): """ Get the path to the user cache directory for an application Parameters ---------- relpath: list, str, pathlib.Path or None Relative path appname: str, default 'firncorr' Application name """ # get platform-specific cache directory filepath = platformdirs.user_cache_path(appname=appname, ensure_exists=True) if isinstance(relpath, list): # use *splat operator to extract from list filepath = filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): filepath = filepath.joinpath(relpath) return pathlib.Path(filepath)
[docs] def import_dependency( name: str, extra: str = "", raise_exception: bool = False, ): """ Import an optional dependency Adapted from ``pandas.compat._optional::import_optional_dependency`` Parameters ---------- name: str Module name extra: str, default "" Additional text to include in the ``ImportError`` message raise_exception: bool, default False Raise an ``ImportError`` if the module is not found Returns ------- module: obj Imported module """ # check if the module name is a string msg = f"Invalid module name: '{name}'; must be a string" assert isinstance(name, str), msg # default error if module cannot be imported err = f"Missing optional dependency '{name}'. {extra}" module = type("module", (), {}) # try to import the module try: module = importlib.import_module(name) except (ImportError, ModuleNotFoundError) as exc: if raise_exception: raise ImportError(err) from exc else: logging.debug(err) # return the module return module
[docs] def dependency_available( name: str, minversion: str | None = None, ): """ Checks whether a module is installed without importing it Adapted from ``xarray.namedarray.utils.module_available`` Parameters ---------- name: str Module name minversion : str, optional Minimum version of the module Returns ------- available : bool Whether the module is installed """ # check if module is available if importlib.util.find_spec(name) is None: return False # check if the version is greater than the minimum required if minversion is not None: version = importlib.metadata.version(name) return version >= minversion # return if both checks are passed return True
[docs] def is_valid_url(url: str) -> bool: """ Checks if a string is a valid URL Parameters ---------- url: str URL to check """ try: result = urlparse(str(url)) return all([result.scheme, result.netloc]) except AttributeError: return False
[docs] def Path(filename: str | pathlib.Path, *args, **kwargs): """ Create a ``URL`` or ``pathlib.Path`` object Parameters ---------- filename: str or pathlib.Path File path or URL """ if is_valid_url(filename): return URL(filename, *args, **kwargs) else: return pathlib.Path(filename, *args, **kwargs).expanduser()
[docs] class URL: """Handles URLs similar to ``pathlib.Path`` objects""" def __init__(self, urlname: str | pathlib.Path, *args, **kwargs): """Initialize a ``URL`` object""" self.urlname = str(urlname) self._raw_paths = list(url_split(self.urlname)) self._headers = {}
[docs] @classmethod def from_parts(cls, parts: str | list | tuple): """ Return a ``URL`` object from components Parameters ---------- parts: str, list or tuple URL components """ # verify that parts are iterable as list or tuple if isinstance(parts, str): return cls(parts) else: return cls("/".join([*parts]))
[docs] def joinpath(self, *pathsegments: list[str]): """Append URL components to existing Parameters ---------- pathsegments: list[str] URL components to append """ return URL("/".join([*self._raw_paths, *pathsegments]))
[docs] def resolve(self): """Resolve the URL""" return URL("/".join([*self._raw_paths]))
[docs] def is_file(self): """Boolean flag if path is a local file""" return False
[docs] def is_dir(self): """Boolean flag if path is a local directory""" return False
[docs] def geturl(self): """String representation of the ``URL`` object""" return self._components.geturl()
[docs] def get(self, *args, **kwargs): """Get contents from URL""" if self.scheme.startswith("ftp"): host = [self.netloc, *url_split(self.path)] return from_ftp(host, *args, **kwargs) elif self.scheme.startswith("http"): return from_http( self.urlname, *args, headers=self._headers, **kwargs ) else: raise NotImplementedError(f"Unsupported scheme: {self.scheme}")
[docs] def headers(self, *args, **kwargs): """Get headers from URL""" self.urlopen(*args, **kwargs) return self._headers
[docs] def load(self, *args, **kwargs): """Load ``JSON`` response from URL""" return from_json(self.urlname, headers=self._headers, *args, **kwargs)
[docs] def ping(self, *args, **kwargs) -> bool: """Ping URL to check connection""" return check_connection(self.urlname, *args, **kwargs)
[docs] def query(self, *args, **kwargs): """List contents from URL""" return http_list(self.urlname, headers=self._headers, *args, **kwargs)
[docs] def read(self, *args, **kwargs): """Open URL and read response""" return self.urlopen(*args, **kwargs).read()
[docs] def request(self, *args, **kwargs): """Make URL request""" return urllib2.Request(self.urlname)
[docs] def urlopen(self, *args, **kwargs): """Open URL and return response""" request = urllib2.Request(self.urlname) response = urllib2.urlopen(request, *args, **kwargs) self._headers.update( {k.lower(): v for k, v in response.headers.items()} ) return response
@property def name(self): """URL basename""" return pathlib.PurePosixPath(self.urlname).name @property def netloc(self): """URL network location""" return self._components.netloc @property def parent(self): """URL parent path as a ``URL`` object""" paths = url_split(self.urlname)[:-1] return URL.from_parts(paths) @property def parents(self): """URL parents as a list of ``URL`` objects""" paths = url_split(self.urlname) return [URL.from_parts(paths[:i]) for i in range(len(paths) - 1, 0, -1)] @property def parts(self): """URL parts as a tuple""" paths = url_split(self._components.path) return (self.scheme, self.netloc, *paths) @property def path(self): """URL path""" return self._components.path @property def s3bucket(self): """AWS s3 bucket name""" if self.scheme.startswith("s3"): return s3_bucket(self.geturl()) @property def s3key(self): """AWS s3 key""" if self.scheme.startswith("s3"): return s3_key(self.geturl()) @property def scheme(self): """URL scheme""" return self._components.scheme + "://" @property def stem(self): """URL stem""" return pathlib.PurePosixPath(self.urlname).stem @property def _components(self): """ URL parsed into six components using ``urlparse`` """ return urlparse(self.urlname) def __repr__(self): """Representation of the ``URL`` object""" return str(self.urlname) def __str__(self): """String representation of the ``URL`` object""" return str(self.urlname) def __add__(self, other): """Concatenate URL components using the addition operator""" return URL(self.urlname + str(other)) def __div__(self, other): """Join URL components using the division operator""" return self.joinpath(other) def __truediv__(self, other): """Join URL components using the division operator""" return self.joinpath(other)
[docs] def detect_compression(filename: str | pathlib.Path) -> bool: """ Detect if file is compressed based on file extension Parameters ---------- filename: str or pathlib.Path Model file Returns ------- compressed: bool Input file is ``gzip`` compressed """ filename = Path(filename).resolve() return bool(re.search(r"\.gz$", filename.name, re.IGNORECASE))
[docs] def compressuser(filename: str | pathlib.Path): """ Tilde-compress a file to be relative to the home directory Parameters ---------- filename: str or pathlib.Path Input filename to tilde-compress """ # attempt to compress filename relative to home directory filename = pathlib.Path(filename).expanduser().absolute() try: relative_to = filename.relative_to(pathlib.Path().home()) except (ValueError, AttributeError) as exc: return filename else: return pathlib.Path("~").joinpath(relative_to)
# PURPOSE: get the hash value of a file
[docs] def get_hash( local: str | io.IOBase | pathlib.Path, algorithm: str = "md5", include_algorithm: bool = False, ): """ Get the hash value from a local file or ``BytesIO`` object Parameters ---------- local: obj, str or pathlib.Path ``BytesIO`` object or path to file algorithm: str, default 'md5' Hashing algorithm for checksum validation include_algorithm: bool, default False Include the algorithm name in the returned hash """ # check if open file object or if local file exists if isinstance(local, io.IOBase): # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: value = hashlib.new(algorithm, local.getvalue()).hexdigest() return f"{algorithm}:{value}" if include_algorithm else value else: raise ValueError(f"Invalid hashing algorithm: {algorithm}") elif isinstance(local, (str, pathlib.Path)): # generate checksum hash for local file local = pathlib.Path(local).expanduser() # if file currently doesn't exist, return empty string if not local.exists(): return "" # open the local_file in binary read mode with local.open(mode="rb") as local_buffer: # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: value = hashlib.new(algorithm, local_buffer.read()).hexdigest() return f"{algorithm}:{value}" if include_algorithm else value else: raise ValueError(f"Invalid hashing algorithm: {algorithm}") else: return ""
# PURPOSE: recursively split a url path
[docs] def url_split(s: str): """ Recursively split a URL path into a list Parameters ---------- s: str URL string """ head, tail = posixpath.split(str(s)) if head in ("http:", "https:", "ftp:", "s3:"): return (s,) elif head in ("", posixpath.sep): return (tail,) return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments
[docs] def convert_arg_line_to_args(arg_line): """ Convert file lines to arguments Parameters ---------- arg_line: str Line string containing a single argument and/or comments """ # remove commented lines and after argument comments for arg in re.sub(r"\#(.*?)$", r"", arg_line).split(): if not arg.strip(): continue yield arg
# PURPOSE: returns the Unix timestamp value for a formatted date string
[docs] def get_unix_time( time_string: str, format: str = "%Y-%m-%d %H:%M:%S", ): """ Get the Unix timestamp value for a formatted date string Parameters ---------- time_string: str Formatted time string to parse format: str, default '%Y-%m-%d %H:%M:%S' Format for input time string """ try: parsed_time = time.strptime(time_string.rstrip(), format) except (TypeError, ValueError): pass else: return calendar.timegm(parsed_time) # try parsing with dateutil try: parsed_time = dateutil.parser.parse(time_string.rstrip()) except (TypeError, ValueError): return None else: return parsed_time.timestamp()
# PURPOSE: output a time string in isoformat
[docs] def isoformat(time_string: str): """ Reformat a date string to ISO formatting Parameters ---------- time_string: str formatted time string to parse """ # try parsing with dateutil try: parsed_time = dateutil.parser.parse(time_string.rstrip()) except (TypeError, ValueError): return None else: return parsed_time.isoformat()
# PURPOSE: rounds a number to an even number less than or equal to original
[docs] def even(value: float): """ Rounds a number to an even number less than or equal to original Parameters ---------- value: float Number to be rounded """ return 2 * int(value // 2)
# PURPOSE: rounds a number upward to its nearest integer
[docs] def ceil(value: float): """ Rounds a number upward to its nearest integer Parameters ---------- value: float number to be rounded upward """ return -int(-value // 1)
# PURPOSE: make a copy of a file with all system information
[docs] def copy( source: str | pathlib.Path, destination: str | pathlib.Path, move: bool = False, **kwargs, ): """ Copy or move a file with all system information Parameters ---------- source: str or pathlib.Path Source file destination: str or pathlib.Path Copied destination file move: bool, default False Remove the source file """ source = pathlib.Path(source).expanduser().absolute() destination = pathlib.Path(destination).expanduser().absolute() # log source and destination logging.info(f"{str(source)} -->\n\t{str(destination)}") shutil.copyfile(source, destination) shutil.copystat(source, destination) # remove the original file if moving if move: source.unlink()
# PURPOSE: make a symbolic link to a file # PURPOSE: check ftp connection
[docs] def check_ftp_connection( HOST: str, username: str | None = None, password: str | None = None, ): """ Check internet connection with ``ftp`` host Parameters ---------- HOST: str Remote ftp host username: str or NoneType ``ftp`` username password: str or NoneType ``ftp`` password """ # attempt to connect to ftp host try: f = ftplib.FTP(HOST) f.login(username, password) f.voidcmd("NOOP") except IOError: raise RuntimeError("Check internet connection") except ftplib.error_perm: raise RuntimeError("Check login credentials") else: return True
# PURPOSE: list a directory on a ftp host
[docs] def ftp_list( HOST: str | list, username: str | None = None, password: str | None = None, timeout: int | None = None, basename: bool = False, pattern: str | None = None, sort: bool = False, ): """ List a directory on a ``ftp`` host Parameters ---------- HOST: str or list Remote ``ftp`` host path split as list username: str or NoneType ``ftp`` username password: str or NoneType ``ftp`` password timeout: int or NoneType, default None Timeout in seconds for blocking operations basename: bool, default False Return the file or directory basename instead of the full path pattern: str or NoneType, default None Regular expression pattern for reducing list sort: bool, default False Sort output list Returns ------- output: list Items in a directory mtimes: list Last modification times for items in the directory """ # verify inputs for remote ftp host if isinstance(HOST, str): HOST = url_split(HOST) # try to connect to ftp host try: ftp = ftplib.FTP(HOST[0], timeout=timeout) except (socket.gaierror, IOError): raise RuntimeError(f"Unable to connect to {HOST[0]}") else: ftp.login(username, password) # list remote path output = ftp.nlst(posixpath.join(*HOST[1:])) # get last modified date of ftp files and convert into unix time mtimes = [None] * len(output) # iterate over each file in the list and get the modification time for i, f in enumerate(output): try: # try sending modification time command mdtm = ftp.sendcmd(f"MDTM {f}") except ftplib.error_perm: # directories will return with an error pass else: # convert the modification time into unix time mtimes[i] = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S") # reduce to basenames if basename: output = [posixpath.basename(i) for i in output] # reduce using regular expression pattern if pattern: i = [i for i, f in enumerate(output) if re.search(pattern, f)] # reduce list of listed items and last modified times output = [output[indice] for indice in i] mtimes = [mtimes[indice] for indice in i] # sort the list if sort: i = [i for i, j in sorted(enumerate(output), key=lambda i: i[1])] # sort list of listed items and last modified times output = [output[indice] for indice in i] mtimes = [mtimes[indice] for indice in i] # close the ftp connection ftp.close() # return the list of items and last modified times return (output, mtimes)
# PURPOSE: download a file from a ftp host
[docs] def from_ftp( HOST: str | list, username: str | None = None, password: str | None = None, timeout: int | None = None, local: str | pathlib.Path | None = None, hash: str = "", chunk: int = 8192, verbose: bool = False, fid: object = sys.stdout, label: str | None = None, mode: oct = 0o775, **kwargs, ): """ Download a file from a ``ftp`` host Parameters ---------- HOST: str or list Remote ``ftp`` host path username: str or NoneType ``ftp`` username password: str or NoneType ``ftp`` password timeout: int or NoneType, default None Timeout in seconds for blocking operations local: str, pathlib.Path or NoneType, default None Path to local file hash: str, default '' MD5 hash of local file chunk: int, default 8192 Chunk size for transfer encoding verbose: bool, default False Print file transfer information fid: object, default sys.stdout Open file object for logging file transfers if verbose label: str, default None Label for logging file transfer information if verbose mode: oct, default 0o775 Permissions mode of output local file Returns ------- remote_buffer: obj ``BytesIO`` representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # verify inputs for remote ftp host if isinstance(HOST, str): HOST = url_split(HOST) # set default label for logging if label is None: label = f"{posixpath.join(*HOST)} -->\n\t{local}" # try downloading from ftp try: # try to connect to ftp host ftp = ftplib.FTP(HOST[0], timeout=timeout) except (socket.gaierror, IOError): raise RuntimeError(f"Unable to connect to {HOST[0]}") else: ftp.login(username, password) # remote path ftp_remote_path = posixpath.join(*HOST[1:]) # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() ftp.retrbinary( f"RETR {ftp_remote_path}", remote_buffer.write, blocksize=chunk ) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # get last modified date of remote file and convert into unix time mdtm = ftp.sendcmd(f"MDTM {ftp_remote_path}") remote_mtime = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S") # compare checksums if local and (hash != remote_hash): # convert to absolute path local = pathlib.Path(local).expanduser().absolute() # create directory if non-existent local.parent.mkdir(mode=mode, parents=True, exist_ok=True) # print file information logging.info(label) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with local.open(mode="wb") as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode local.chmod(mode) # keep remote modification time of file and local access time os.utime(local, (local.stat().st_atime, remote_mtime)) # close the ftp connection ftp.close() # return the bytesIO object remote_buffer.seek(0) return remote_buffer
[docs] def _create_default_ssl_context() -> ssl.SSLContext: """Creates the default ``SSL`` context""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) _set_ssl_context_options(context) context.options |= ssl.OP_NO_COMPRESSION return context
[docs] def _create_ssl_context_no_verify() -> ssl.SSLContext: """Creates an ``SSL`` context for unverified connections""" context = _create_default_ssl_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context
[docs] def _set_ssl_context_options(context: ssl.SSLContext) -> None: """Sets the default options for the ``SSL`` context""" if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7): context.minimum_version = ssl.TLSVersion.TLSv1_2 else: context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_TLSv1 context.options |= ssl.OP_NO_TLSv1_1
# default ssl context _default_ssl_context = _create_ssl_context_no_verify() # PURPOSE: check connection with http host
[docs] def check_connection( HOST: str, context: ssl.SSLContext = _default_ssl_context, timeout: int = 20, ): """ Check internet connection with ``http`` host Parameters ---------- HOST: str Remote ``http`` host context: obj, default FirnCorr.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object timeout: int, default 20 Timeout in seconds for blocking operations """ # attempt to connect to http host try: urllib2.urlopen(HOST, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: return True
# PURPOSE: list a directory on an Apache http Server
[docs] def http_list( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, parser=lxml.etree.HTMLParser(), format: str = "%Y-%m-%d %H:%M", pattern: str = "", sort: bool = False, **kwargs, ): """ List a directory on an Apache ``http`` Server Parameters ---------- HOST: str or list Remote ``http`` host path timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object parser: obj, default lxml.etree.HTMLParser() ``HTML`` parser for ``lxml`` format: str, default '%Y-%m-%d %H:%M' Format for input time string pattern: str, default '' Regular expression pattern for reducing list sort: bool, default False Sort output list Returns ------- colnames: list Column names in a directory collastmod: list Last modification times for items in the directory """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST), **kwargs) response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = tree.xpath("//tr/td[not(@*)]//a/@href") # get the Unix timestamp value for a modification time collastmod = [ get_unix_time(i, format=format) for i in tree.xpath('//tr/td[@align="right"][1]/text()') ] # reduce using regular expression pattern if pattern: i = [i for i, f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, collastmod)
# PURPOSE: download a file from a http host
[docs] def from_http( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, local: str | pathlib.Path | None = None, hash: str = "", chunk: int = 16384, headers: dict = {}, verbose: bool = False, fid: object = sys.stdout, label: str | None = None, mode: oct = 0o775, **kwargs, ): """ Download a file from a ``http`` host Parameters ---------- HOST: str or list Remote ``http`` host path split as list timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object local: str, pathlib.Path or NoneType, default None Path to local file hash: str, default '' ``MD5`` hash of local file chunk: int, default 16384 Chunk size for transfer encoding headers: dict, default {} Dictionary of headers to append from URL request verbose: bool, default False Print file transfer information fid: object, default sys.stdout Open file object for logging file transfers if verbose label: str or None, default None Label for logging file transfer information if verbose mode: oct, default 0o775 Permissions mode of output local file Returns ------- remote_buffer: obj ``BytesIO`` representation of file """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # set default label for logging if label is None: label = f"{posixpath.join(*HOST)} -->\n\t{local}" # try downloading from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST), **kwargs) response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # copy remote file contents to bytesIO object remote_buffer = io.BytesIO() shutil.copyfileobj(response, remote_buffer, chunk) remote_buffer.seek(0) # save file basename with bytesIO object remote_buffer.filename = HOST[-1] # copy headers from response headers.update({k.lower(): v for k, v in response.getheaders()}) # generate checksum hash for remote file remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest() # compare checksums if local and (hash != remote_hash): # convert to absolute path local = pathlib.Path(local).expanduser().absolute() # create directory if non-existent local.parent.mkdir(mode=mode, parents=True, exist_ok=True) # print file information logging.info(label) # store bytes to file using chunked transfer encoding remote_buffer.seek(0) with local.open(mode="wb") as f: shutil.copyfileobj(remote_buffer, f, chunk) # change the permissions mode local.chmod(mode) # return the bytesIO object remote_buffer.seek(0) return remote_buffer
# PURPOSE: load a JSON response from a http host
[docs] def from_json( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, headers: dict = {}, ) -> dict: """ Load a ``JSON`` response from a ``http`` host Parameters ---------- HOST: str or list Remote ``http`` host path split as list timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object headers: dict, default {} Dictionary of headers to append from URL request Returns ------- json_response: dict ``JSON`` response """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try loading JSON from http try: # Create and submit request for JSON response request = urllib2.Request(posixpath.join(*HOST)) request.add_header("Accept", "application/json") response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # copy headers from response headers.update({k.lower(): v for k, v in response.getheaders()}) # load JSON response json_response = json.loads(response.read()) return json_response
# PURPOSE: list a directory on the MAR server
[docs] def mar_list( HOST: str | list, timeout: int | None = None, context: ssl.SSLContext = _default_ssl_context, parser=lxml.etree.HTMLParser(), pattern: str = "", sort: bool = False, ): """ List a directory from the MAR server at Lèige Université Parameters ---------- HOST: str or list Remote ``http`` host path timeout: int or NoneType, default None Timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context ``SSL`` context for ``urllib`` opener object parser: obj, default lxml.etree.HTMLParser() ``HTML`` parser for ``lxml`` pattern: str, default '' Regular expression pattern for reducing list sort: bool, default False Sort output list Returns ------- colnames: list Column names in a directory collastmod: list Last modification times for items in the directory """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout, context=context) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # read and parse request for files tree = lxml.etree.parse(response, parser) colnames = tree.xpath("//a/@href") # get the Unix timestamp value for a modification time collastmod = [] for s in tree.xpath("//a/following-sibling::text()[1]"): # parse the modification time from the text following the link i = re.sub(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}).*", r"\1", s.strip()) # convert the modification time into unix time collastmod.append(get_unix_time(i, format="%Y-%m-%d %H:%M")) # validate that columns match modification times if len(colnames) != len(collastmod): raise ValueError("Mismatch between names and modification times") # reduce using regular expression pattern if pattern: i = [i for i, f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and modification times return colnames, collastmod
# NASA Cumulus AWS S3 buckets _s3_buckets = { "gesdisc": "gesdisc-cumulus-prod-protected", "ghrcdaac": "ghrc-cumulus-dev", "lpdaac": "lp-prod-protected", "nsidc": "nsidc-cumulus-prod-protected", "ornldaac": "ornl-cumulus-prod-protected", "podaac": "podaac-ops-cumulus-protected", "podaac-doc": "podaac-ops-cumulus-docs", } # NASA Cumulus AWS S3 credential endpoints _s3_endpoints = { "gesdisc": "https://data.gesdisc.earthdata.nasa.gov/s3credentials", "ghrcdaac": "https://data.ghrc.earthdata.nasa.gov/s3credentials", "lpdaac": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials", "nsidc": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials", "ornldaac": "https://data.ornldaac.earthdata.nasa.gov/s3credentials", "podaac": "https://archive.podaac.earthdata.nasa.gov/s3credentials", } # NASA Cumulus AWS providers _s3_providers = { "gesdisc": "GES_DISC", "ghrcdaac": "GHRC_DAAC", "lpdaac": "LPCLOUD", "nsidc": "NSIDC_CPRD", "ornldaac": "ORNL_CLOUD", "podaac": "POCLOUD", } # PURPOSE: attempt to build an opener with netrc def attempt_login( urs: str, context: ssl.SSLContext = _default_ssl_context, password_manager: bool = True, get_ca_certs: bool = False, redirect: bool = False, authorization_header: bool = True, **kwargs, ): """ Attempt to build a ``urllib`` opener for NASA Earthdata Parameters ---------- urs: str Earthdata login URS 3 host context: obj, default FirnCorr.utilities._default_ssl_context SSL context for ``urllib`` opener object password_manager: bool, default True Create password manager context using default realm get_ca_certs: bool, default False Get list of loaded “certification authority” certificates redirect: bool, default False Create redirect handler object authorization_header: bool, default True Add base64 encoded authorization header to opener username: str, default from environmental variable NASA Earthdata username password: str, default from environmental variable NASA Earthdata password endpoint: str, default from _s3_endpoints NASA Cumulus AWS S3 credential endpoint for a provider retries: int, default 5 number of retry attempts netrc: str, default ~/.netrc path to .netrc file for authentication Returns ------- opener: obj OpenerDirector instance """ # set default keyword arguments kwargs.setdefault("username", os.environ.get("EARTHDATA_USERNAME")) kwargs.setdefault("password", os.environ.get("EARTHDATA_PASSWORD")) kwargs.setdefault("endpoint", _s3_endpoints["gesdisc"]) kwargs.setdefault("retries", 5) kwargs.setdefault("netrc", pathlib.Path.home().joinpath(".netrc")) try: # verify permissions level of netrc file # only necessary on jupyterhub nc = pathlib.Path(kwargs["netrc"]).expanduser().absolute() nc.chmod(mode=0o600) # try retrieving credentials from netrc username, _, password = netrc.netrc(nc).authenticators(urs) except Exception as exc: # try retrieving credentials from environmental variables username, password = (kwargs["username"], kwargs["password"]) pass # manual input for username if not available if not username: username = builtins.input(f"Username for {urs}: ") # manual input for password if not available prompt = f"Password for {username}@{urs}: " if not password: password = getpass.getpass(prompt=prompt) # host for endpoint HOST = kwargs.get("endpoint") # for each retry for retry in range(kwargs["retries"]): # build an opener for urs with credentials opener = build_opener( username, password, context=context, password_manager=password_manager, get_ca_certs=get_ca_certs, redirect=redirect, authorization_header=authorization_header, urs=urs, ) # try logging in by check credentials try: check_credentials(HOST) except Exception as exc: pass else: return opener # reattempt login username = builtins.input(f"Username for {urs}: ") password = getpass.getpass(prompt=prompt) # reached end of available retries raise RuntimeError("End of Retries: Check NASA Earthdata credentials") # PURPOSE: "login" to NASA Earthdata with supplied credentials
[docs] def build_opener( username: str, password: str, context: ssl.SSLContext = _default_ssl_context, password_manager: bool = False, get_ca_certs: bool = False, redirect: bool = False, authorization_header: bool = True, urs: str = "https://urs.earthdata.nasa.gov", ): """ Build ``urllib`` opener for NASA Earthdata with supplied credentials Parameters ---------- username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS) SSL context for ``urllib`` opener object password_manager: bool, default False Create password manager context using default realm get_ca_certs: bool, default False Get list of loaded “certification authority” certificates redirect: bool, default False Create redirect handler object authorization_header: bool, default True Add base64 encoded authorization header to opener urs: str, default 'https://urs.earthdata.nasa.gov' Earthdata login URS 3 host Returns ------- opener: object ``OpenerDirector`` instance """ # https://docs.python.org/3/howto/urllib2.html#id5 handler = [] # create a password manager if password_manager: password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() # Add the username and password for NASA Earthdata Login system password_mgr.add_password(None, urs, username, password) handler.append(urllib2.HTTPBasicAuthHandler(password_mgr)) # Create cookie jar for storing cookies. This is used to store and return # the session cookie given to use by the data server (otherwise will just # keep sending us back to Earthdata Login to authenticate). cookie_jar = CookieJar() handler.append(urllib2.HTTPCookieProcessor(cookie_jar)) # SSL context handler if get_ca_certs: context.get_ca_certs() handler.append(urllib2.HTTPSHandler(context=context)) # redirect handler if redirect: handler.append(urllib2.HTTPRedirectHandler()) # create "opener" (OpenerDirector instance) opener = urllib2.build_opener(*handler) # Encode username/password for request authorization headers # add Authorization header to opener if authorization_header: b64 = base64.b64encode(f"{username}:{password}".encode()) opener.addheaders = [("Authorization", f"Basic {b64.decode()}")] # Now all calls to urllib2.urlopen use our opener. urllib2.install_opener(opener) # All calls to urllib2.urlopen will now use handler # Make sure not to include the protocol in with the URL, or # HTTPPasswordMgrWithDefaultRealm will be confused. return opener
# PURPOSE: generate a NASA Earthdata user token def get_token( HOST: str = "https://urs.earthdata.nasa.gov/api/users/token", username: str | None = None, password: str | None = None, build: bool = True, context: ssl.SSLContext = _default_ssl_context, urs: str = "urs.earthdata.nasa.gov", ): """ Generate a NASA Earthdata User Token Parameters ---------- HOST: str or list NASA Earthdata token API host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener and check credentials timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context SSL context for ``urllib`` opener object urs: str, default 'urs.earthdata.nasa.gov' NASA Earthdata URS 3 host Returns ------- token: dict JSON response with NASA Earthdata User Token """ # attempt to build urllib2 opener and check credentials if build: attempt_login( urs, username=username, password=password, context=context, password_manager=False, get_ca_certs=False, redirect=False, authorization_header=True, ) # create post response with Earthdata token API try: request = urllib2.Request(HOST, method="POST") response = urllib2.urlopen(request) except urllib2.HTTPError as exc: logging.debug(exc.code) raise RuntimeError(exc.reason) from exc except urllib2.URLError as exc: logging.debug(exc.reason) raise RuntimeError("Check internet connection") from exc # read and return JSON response return json.loads(response.read()) # PURPOSE: generate a NASA Earthdata user token def list_tokens( HOST: str = "https://urs.earthdata.nasa.gov/api/users/tokens", username: str | None = None, password: str | None = None, build: bool = True, context: ssl.SSLContext = _default_ssl_context, urs: str = "urs.earthdata.nasa.gov", ): """ List the current associated NASA Earthdata User Tokens Parameters ---------- HOST: str NASA Earthdata list token API host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener and check credentials timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context SSL context for ``urllib`` opener object urs: str, default 'urs.earthdata.nasa.gov' NASA Earthdata URS 3 host Returns ------- tokens: list JSON response with NASA Earthdata User Tokens """ # attempt to build urllib2 opener and check credentials if build: attempt_login( urs, username=username, password=password, context=context, password_manager=False, get_ca_certs=False, redirect=False, authorization_header=True, ) # create get response with Earthdata list tokens API try: request = urllib2.Request(HOST) response = urllib2.urlopen(request) except urllib2.HTTPError as exc: logging.debug(exc.code) raise RuntimeError(exc.reason) from exc except urllib2.URLError as exc: logging.debug(exc.reason) raise RuntimeError("Check internet connection") from exc # read and return JSON response return json.loads(response.read()) # PURPOSE: revoke a NASA Earthdata user token def revoke_token( token: str, HOST: str = f"https://urs.earthdata.nasa.gov/api/users/revoke_token", username: str | None = None, password: str | None = None, build: bool = True, context: ssl.SSLContext = _default_ssl_context, urs: str = "urs.earthdata.nasa.gov", ): """ Generate a NASA Earthdata User Token Parameters ---------- token: str NASA Earthdata token to be revoked HOST: str NASA Earthdata revoke token API host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener and check credentials timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default FirnCorr.utilities._default_ssl_context SSL context for ``urllib`` opener object urs: str, default 'urs.earthdata.nasa.gov' NASA Earthdata URS 3 host """ # attempt to build urllib2 opener and check credentials if build: attempt_login( urs, username=username, password=password, context=context, password_manager=False, get_ca_certs=False, redirect=False, authorization_header=True, ) # full path for NASA Earthdata revoke token API url = f"{HOST}?token={token}" # create post response with Earthdata revoke tokens API try: request = urllib2.Request(url, method="POST") response = urllib2.urlopen(request) except urllib2.HTTPError as exc: logging.debug(exc.code) raise RuntimeError(exc.reason) from exc except urllib2.URLError as exc: logging.debug(exc.reason) raise RuntimeError("Check internet connection") from exc # verbose response logging.debug(f"Token Revoked: {token}") def s3_region(): """ Get AWS s3 region for EC2 instance Returns ------- region_name: str AWS region name """ boto3 = import_dependency("boto3") region_name = boto3.session.Session().region_name return region_name # PURPOSE: get AWS s3 client for GES DISC def s3_client( HOST: str = _s3_endpoints["gesdisc"], timeout: int | None = None, region_name: str = "us-west-2", ): """ Get AWS s3 client for NASA Earthdata Parameters ---------- HOST: str S3 credential host timeout: int or NoneType, default None timeout in seconds for blocking operations region_name: str, default 'us-west-2' AWS region name Returns ------- client: obj AWS s3 client """ request = urllib2.Request(HOST) response = urllib2.urlopen(request, timeout=timeout) cumulus = json.loads(response.read()) # get AWS client object boto3 = import_dependency("boto3") client = boto3.client( "s3", aws_access_key_id=cumulus["accessKeyId"], aws_secret_access_key=cumulus["secretAccessKey"], aws_session_token=cumulus["sessionToken"], region_name=region_name, ) # return the AWS client for region return client # PURPOSE: get a s3 bucket name from a presigned url def s3_bucket(presigned_url: str) -> str: """ Get a s3 bucket name from a presigned url Parameters ---------- presigned_url: str s3 presigned url Returns ------- bucket: str s3 bucket name """ host = url_split(presigned_url) bucket = re.sub(r"s3:\/\/", r"", host[0], re.IGNORECASE) return bucket # PURPOSE: get a s3 bucket key from a presigned url def s3_key(presigned_url: str) -> str: """ Get a s3 bucket key from a presigned url Parameters ---------- presigned_url: str s3 presigned url Returns ------- key: str s3 bucket key for object """ host = url_split(presigned_url) key = posixpath.join(*host[1:]) return key # PURPOSE: check that entered NASA Earthdata credentials are valid def check_credentials(HOST: str = _s3_endpoints["gesdisc"]): """ Check that entered NASA Earthdata credentials are valid HOST: str full url to protected credential website """ try: request = urllib2.Request(HOST) response = urllib2.urlopen(request, timeout=20) except urllib2.HTTPError: raise RuntimeError("Check your NASA Earthdata credentials") except urllib2.URLError: raise RuntimeError("Check internet connection") else: return True # PURPOSE: list a directory on NASA GES DISC https server
[docs] def gesdisc_list( HOST: str | list, username: str | None = None, password: str | None = None, build: bool = False, timeout: int | None = None, urs: str = "urs.earthdata.nasa.gov", parser=lxml.etree.HTMLParser(), format: str = r"%Y-%m-%d %H:%M", pattern: str = "", sort: bool = False, ): """ List a directory on NASA GES DISC servers Parameters ---------- HOST: str or list Remote ``https`` host username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password build: bool, default True Build opener with NASA Earthdata credentials timeout: int or NoneType, default None Timeout in seconds for blocking operations parser: obj, default lxml.etree.HTMLParser() ``HTML`` parser for ``lxml`` format: str, default '%Y-%m-%d %H:%M' Format for input time string pattern: str, default '' Regular expression pattern for reducing list sort: bool, default False Sort output list Returns ------- colnames: list column names in a directory collastmod: list last modification times for items in the directory """ # use netrc credentials if build and not (username or password): username, _, password = netrc.netrc().authenticators(urs) # build urllib2 opener with credentials if build: build_opener( username, password, password_manager=True, authorization_header=False, ) # verify inputs for remote https host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from https try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout) except urllib2.HTTPError as exc: logging.debug(exc.code) raise except urllib2.URLError as exc: logging.debug(exc.reason) exc.message = "Check internet connection" raise else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = tree.xpath("//tr/td[not(@*)]//a/@href") # get the Unix timestamp value for a modification time lastmod = [ get_unix_time(i, format=format) for i in tree.xpath('//tr/td[@align="right"][1]/text()') ] # reduce using regular expression pattern if pattern: i = [i for i, f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] lastmod = [lastmod[indice] for indice in i] # sort the list if sort: i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] lastmod = [lastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, lastmod)
# PURPOSE: filter the CMR json response for desired data files
[docs] def cmr_filter_json( search_results: dict, endpoint: str = "data", request_type: str = "application/x-netcdf", ): """ Filter the CMR json response for desired data files Parameters ---------- search_results: dict json response from CMR query endpoint: str, default 'data' url endpoint type - ``'data'``: NASA Earthdata https archive - ``'opendap'``: NASA Earthdata OPeNDAP archive - ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket request_type: str, default 'application/x-netcdf' data type for reducing CMR query Returns ------- granule_names: list Model granule names granule_urls: list Model granule urls granule_mtimes: list Model granule modification times """ # output list of granule ids, urls and modified times granule_names = [] granule_urls = [] granule_mtimes = [] # check that there are urls for request if ("feed" not in search_results) or ( "entry" not in search_results["feed"] ): return (granule_names, granule_urls, granule_mtimes) # descriptor links for each endpoint rel = {} rel["data"] = "http://esipfed.org/ns/fedsearch/1.1/data#" rel["opendap"] = "http://esipfed.org/ns/fedsearch/1.1/service#" rel["s3"] = "http://esipfed.org/ns/fedsearch/1.1/s3#" # iterate over references and get cmr location for entry in search_results["feed"]["entry"]: granule_names.append(entry["producer_granule_id"]) granule_mtimes.append( get_unix_time(entry["updated"], format="%Y-%m-%dT%H:%M:%S.%f%z") ) for link in entry["links"]: # skip inherited granules if "inherited" in link.keys(): continue # append if selected endpoint if link["rel"] == rel[endpoint]: granule_urls.append(link["href"]) break # alternatively append if selected data type if "type" not in link.keys(): continue if link["type"] == request_type: granule_urls.append(link["href"]) break # return the list of urls, granule ids and modified times return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: query the NASA Common Metadata Repository (CMR)
[docs] def cmr( short_name: str, version: str | None = None, start_date: str | None = None, end_date: str | None = None, provider: str = "GES_DISC", endpoint: str = "data", request_type: str = r"application/x-netcdf", verbose: bool = False, fid: object = sys.stdout, ): """ Query the NASA Common Metadata Repository (CMR) for model data Parameters ---------- short_name: str Model shortname in the CMR system version: str or NoneType, default None Model version start_date: str or NoneType, default None starting date for CMR product query end_date: str or NoneType, default None ending date for CMR product query provider: str, default 'GES_DISC' CMR data provider - ``'GES_DISC'``: GESDISC - ``'GESDISCCLD'``: GESDISC Cumulus - ``'PODAAC'``: PO.DAAC Drive - ``'POCLOUD'``: PO.DAAC Cumulus endpoint: str, default 'data' url endpoint type - ``'data'``: NASA Earthdata https archive - ``'opendap'``: NASA Earthdata OPeNDAP archive - ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket request_type: str, default 'application/x-netcdf' data type for reducing CMR query verbose: bool, default False print CMR query information fid: object, default sys.stdout Open file object for logging CMR URL if verbose Returns ------- granule_names: list Model granule names granule_urls: list Model granule urls granule_mtimes: list Model granule modification times """ # create logger loglevel = logging.INFO if verbose else logging.CRITICAL logging.basicConfig(stream=fid, level=loglevel) # build urllib2 opener with SSL context # https://docs.python.org/3/howto/urllib2.html#id5 handler = [] # Create cookie jar for storing cookies cookie_jar = CookieJar() handler.append(urllib2.HTTPCookieProcessor(cookie_jar)) handler.append(urllib2.HTTPSHandler(context=ssl.SSLContext())) # create "opener" (OpenerDirector instance) opener = urllib2.build_opener(*handler) # build CMR query cmr_format = "json" cmr_page_size = 2000 CMR_HOST = [ "https://cmr.earthdata.nasa.gov", "search", f"granules.{cmr_format}", ] # build list of CMR query parameters CMR_KEYS = [] CMR_KEYS.append(f"?provider={provider}") CMR_KEYS.append("&sort_key[]=start_date") CMR_KEYS.append("&sort_key[]=producer_granule_id") CMR_KEYS.append(f"&page_size={cmr_page_size}") # dictionary of product shortnames and version CMR_KEYS.append(f"&short_name={short_name}") if version: CMR_KEYS.append(f"&version={version}") # append keys for start and end time # verify that start and end times are in ISO format start_date = isoformat(start_date) if start_date else "" end_date = isoformat(end_date) if end_date else "" CMR_KEYS.append(f"&temporal={start_date},{end_date}") # full CMR query url cmr_query_url = URL.from_parts(CMR_HOST) + "".join(CMR_KEYS) logging.info(f"CMR request={cmr_query_url}") # output list of granule names and urls granule_names = [] granule_urls = [] granule_mtimes = [] cmr_search_after = None while True: # make CMR query request request = cmr_query_url.request() # add CMR search after header if cmr_search_after: request.add_header("CMR-Search-After", cmr_search_after) logging.debug(f"CMR-Search-After: {cmr_search_after}") # submit request and get response response = opener.open(request) # get search after index for next iteration headers = {k.lower(): v for k, v in dict(response.info()).items()} cmr_search_after = headers.get("cmr-search-after") # read the CMR search as JSON search_page = json.loads(response.read().decode("utf8")) ids, urls, mtimes = cmr_filter_json( search_page, endpoint=endpoint, request_type=request_type, ) if not urls or cmr_search_after is None: break # extend lists granule_names.extend(ids) granule_urls.extend(urls) granule_mtimes.extend(mtimes) # return the list of granule ids, urls and modification times return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: build requests for the GES DISC subsetting API
[docs] def build_request( short_name: str, dataset_version: str, url: str, host: str | None = None, variables: list = [], format: str = "bmM0Lw", service: str = "L34RS_MERRA2", version: str = "1.02", bbox: list[int] | list[float] = [-90, -180, 90, 180], **kwargs, ): """ Build requests for the GES DISC subsetting API Parameters ---------- short_name: str Model shortname in the CMR system dataset_version: str Model version url: str url for granule returned by the CMR system host: str or NoneType, default None Override host provider for GES DISC subsetting Default is host provider given by CMR request variables: list, default [] Variables for product to subset format: str, default 'bmM0Lw' Coded output format for GES DISC subsetting API service: str, default 'L34RS_MERRA2' GES DISC subsetting API service version: str, default '1.02' GES DISC subsetting API service version bbox: list, default [-90,-180,90,180] Bounding box to spatially subset kwargs: dict, default {} Additional parameters for GES DISC subsetting API Returns ------- request_url: str Formatted url for GES DISC subsetting API """ # split CMR supplied url for granule HOST, *args = url_split(url) if host is None: host = HOST # base URL for GES DISC on-the-fly subsetting API api_host = URL.from_parts([host, "daac-bin", "OTF", "HTTP_services.cgi?"]) # create parameters to be encoded kwargs["FILENAME"] = posixpath.join(posixpath.sep, *args) kwargs["FORMAT"] = format kwargs["SERVICE"] = service kwargs["VERSION"] = version kwargs["BBOX"] = ",".join(map(str, bbox)) kwargs["SHORTNAME"] = short_name kwargs["DATASET_VERSION"] = dataset_version if variables is not None: kwargs["VARIABLES"] = ",".join(variables) # return the formatted request url request_url = api_host + urlencode(kwargs) return request_url