#!/usr/bin/env python
"""
utilities.py
Written by Tyler Sutterley (04/2026)
Download and management utilities for syncing time and auxiliary files
PYTHON DEPENDENCIES:
lxml: processing XML and HTML in Python
https://pypi.python.org/pypi/lxml
platformdirs: Python module for determining platform-specific directories
https://pypi.org/project/platformdirs/
UPDATE HISTORY:
Updated 04/2026: added string check to determine if is a valid URL
added function to check if a dependency is available
added detection functions for checking if files are compressed
allow additional keyword arguments to http functions
added get_cache_path function for application cache directories
add MAR list function for parsing their http server directories
add NASA Earthdata credential utilities and AWS s3 functions
added include_algorithm option to get_hash function
Updated 09/2024: add wrapper to importlib for optional dependencies
Updated 06/2022: add NASA Common Metadata Repository (CMR) queries
added function to build GES DISC subsetting API requests
Updated 04/2022: updated docstrings to numpy documentation format
Updated 10/2021: build python logging instance for handling verbose output
Updated 09/2021: added generic list from Apache http server
Updated 07/2021: add parser for converting file files to arguments
Updated 03/2021: added sha1 option for retrieving file hashes
Updated 01/2021: added username and password to ftp functions
added ftp connection check
Updated 12/2020: added file object keyword for downloads if verbose
add url split function for creating url location lists
Updated 11/2020: normalize source and destination paths in copy
make context an optional keyword argument in from_http
Updated 09/2020: copy from http and https to bytesIO object in chunks
Written 08/2020
"""
from __future__ import print_function, division, annotations
import sys
import os
import re
import io
import ssl
import json
import netrc
import ftplib
import shutil
import base64
import socket
import getpass
import inspect
import hashlib
import logging
import pathlib
import builtins
import warnings
import importlib
import posixpath
import subprocess
import lxml.etree
import platformdirs
import calendar, time
import dateutil.parser
if sys.version_info[0] == 2:
from urllib import urlencode, quote_plus
from cookielib import CookieJar
from urlparse import urlparse
import urllib2
else:
from urllib.parse import urlencode, quote_plus, urlparse
from http.cookiejar import CookieJar
import urllib.request as urllib2
__all__ = [
"reify",
"get_data_path",
"get_cache_path",
"import_dependency",
"dependency_available",
"is_valid_url",
"Path",
"URL",
"detect_compression",
"compressuser",
"get_hash",
"url_split",
"convert_arg_line_to_args",
"get_unix_time",
"isoformat",
"even",
"copy",
"symlink",
"check_ftp_connection",
"ftp_list",
"from_ftp",
"_create_default_ssl_context",
"_create_ssl_context_no_verify",
"_set_ssl_context_options",
"check_connection",
"http_list",
"from_http",
"from_json",
"mar_list",
"_s3_buckets",
"_s3_endpoints",
"_s3_providers",
"attempt_login",
"build_opener",
"get_token",
"list_tokens",
"revoke_token",
"s3_region",
"s3_client",
"s3_bucket",
"s3_key",
"check_credentials",
"gesdisc_list",
"cmr_filter_json",
"cmr",
"build_request",
]
[docs]
class reify(object):
"""Class decorator that puts the result of the method it
decorates into the instance"""
def __init__(self, wrapped):
self.wrapped = wrapped
self.__name__ = wrapped.__name__
self.__doc__ = wrapped.__doc__
def __get__(self, inst, objtype=None):
if inst is None:
return self
val = self.wrapped(inst)
setattr(inst, self.wrapped.__name__, val)
return val
# PURPOSE: get absolute path within a package from a relative path
[docs]
def get_data_path(relpath: list | str | pathlib.Path):
"""
Get the absolute path within a package from a relative path
Parameters
----------
relpath: list, str or pathlib.Path
Relative path
"""
# current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
filepath = pathlib.Path(filename).absolute().parent
if isinstance(relpath, list):
# use *splat operator to extract from list
return filepath.joinpath(*relpath)
elif isinstance(relpath, (str, pathlib.Path)):
return filepath.joinpath(relpath)
# PURPOSE: get the path to the user cache directory
[docs]
def get_cache_path(
relpath: list | str | pathlib.Path | None = None,
appname="firncorr",
):
"""
Get the path to the user cache directory for an application
Parameters
----------
relpath: list, str, pathlib.Path or None
Relative path
appname: str, default 'firncorr'
Application name
"""
# get platform-specific cache directory
filepath = platformdirs.user_cache_path(appname=appname, ensure_exists=True)
if isinstance(relpath, list):
# use *splat operator to extract from list
filepath = filepath.joinpath(*relpath)
elif isinstance(relpath, (str, pathlib.Path)):
filepath = filepath.joinpath(relpath)
return pathlib.Path(filepath)
[docs]
def import_dependency(
name: str,
extra: str = "",
raise_exception: bool = False,
):
"""
Import an optional dependency
Adapted from ``pandas.compat._optional::import_optional_dependency``
Parameters
----------
name: str
Module name
extra: str, default ""
Additional text to include in the ``ImportError`` message
raise_exception: bool, default False
Raise an ``ImportError`` if the module is not found
Returns
-------
module: obj
Imported module
"""
# check if the module name is a string
msg = f"Invalid module name: '{name}'; must be a string"
assert isinstance(name, str), msg
# default error if module cannot be imported
err = f"Missing optional dependency '{name}'. {extra}"
module = type("module", (), {})
# try to import the module
try:
module = importlib.import_module(name)
except (ImportError, ModuleNotFoundError) as exc:
if raise_exception:
raise ImportError(err) from exc
else:
logging.debug(err)
# return the module
return module
[docs]
def dependency_available(
name: str,
minversion: str | None = None,
):
"""
Checks whether a module is installed without importing it
Adapted from ``xarray.namedarray.utils.module_available``
Parameters
----------
name: str
Module name
minversion : str, optional
Minimum version of the module
Returns
-------
available : bool
Whether the module is installed
"""
# check if module is available
if importlib.util.find_spec(name) is None:
return False
# check if the version is greater than the minimum required
if minversion is not None:
version = importlib.metadata.version(name)
return version >= minversion
# return if both checks are passed
return True
[docs]
def is_valid_url(url: str) -> bool:
"""
Checks if a string is a valid URL
Parameters
----------
url: str
URL to check
"""
try:
result = urlparse(str(url))
return all([result.scheme, result.netloc])
except AttributeError:
return False
[docs]
def Path(filename: str | pathlib.Path, *args, **kwargs):
"""
Create a ``URL`` or ``pathlib.Path`` object
Parameters
----------
filename: str or pathlib.Path
File path or URL
"""
if is_valid_url(filename):
return URL(filename, *args, **kwargs)
else:
return pathlib.Path(filename, *args, **kwargs).expanduser()
[docs]
class URL:
"""Handles URLs similar to ``pathlib.Path`` objects"""
def __init__(self, urlname: str | pathlib.Path, *args, **kwargs):
"""Initialize a ``URL`` object"""
self.urlname = str(urlname)
self._raw_paths = list(url_split(self.urlname))
self._headers = {}
[docs]
@classmethod
def from_parts(cls, parts: str | list | tuple):
"""
Return a ``URL`` object from components
Parameters
----------
parts: str, list or tuple
URL components
"""
# verify that parts are iterable as list or tuple
if isinstance(parts, str):
return cls(parts)
else:
return cls("/".join([*parts]))
[docs]
def joinpath(self, *pathsegments: list[str]):
"""Append URL components to existing
Parameters
----------
pathsegments: list[str]
URL components to append
"""
return URL("/".join([*self._raw_paths, *pathsegments]))
[docs]
def resolve(self):
"""Resolve the URL"""
return URL("/".join([*self._raw_paths]))
[docs]
def is_file(self):
"""Boolean flag if path is a local file"""
return False
[docs]
def is_dir(self):
"""Boolean flag if path is a local directory"""
return False
[docs]
def geturl(self):
"""String representation of the ``URL`` object"""
return self._components.geturl()
[docs]
def get(self, *args, **kwargs):
"""Get contents from URL"""
if self.scheme.startswith("ftp"):
host = [self.netloc, *url_split(self.path)]
return from_ftp(host, *args, **kwargs)
elif self.scheme.startswith("http"):
return from_http(
self.urlname, *args, headers=self._headers, **kwargs
)
else:
raise NotImplementedError(f"Unsupported scheme: {self.scheme}")
[docs]
def load(self, *args, **kwargs):
"""Load ``JSON`` response from URL"""
return from_json(self.urlname, headers=self._headers, *args, **kwargs)
[docs]
def ping(self, *args, **kwargs) -> bool:
"""Ping URL to check connection"""
return check_connection(self.urlname, *args, **kwargs)
[docs]
def query(self, *args, **kwargs):
"""List contents from URL"""
return http_list(self.urlname, headers=self._headers, *args, **kwargs)
[docs]
def read(self, *args, **kwargs):
"""Open URL and read response"""
return self.urlopen(*args, **kwargs).read()
[docs]
def request(self, *args, **kwargs):
"""Make URL request"""
return urllib2.Request(self.urlname)
[docs]
def urlopen(self, *args, **kwargs):
"""Open URL and return response"""
request = urllib2.Request(self.urlname)
response = urllib2.urlopen(request, *args, **kwargs)
self._headers.update(
{k.lower(): v for k, v in response.headers.items()}
)
return response
@property
def name(self):
"""URL basename"""
return pathlib.PurePosixPath(self.urlname).name
@property
def netloc(self):
"""URL network location"""
return self._components.netloc
@property
def parent(self):
"""URL parent path as a ``URL`` object"""
paths = url_split(self.urlname)[:-1]
return URL.from_parts(paths)
@property
def parents(self):
"""URL parents as a list of ``URL`` objects"""
paths = url_split(self.urlname)
return [URL.from_parts(paths[:i]) for i in range(len(paths) - 1, 0, -1)]
@property
def parts(self):
"""URL parts as a tuple"""
paths = url_split(self._components.path)
return (self.scheme, self.netloc, *paths)
@property
def path(self):
"""URL path"""
return self._components.path
@property
def s3bucket(self):
"""AWS s3 bucket name"""
if self.scheme.startswith("s3"):
return s3_bucket(self.geturl())
@property
def s3key(self):
"""AWS s3 key"""
if self.scheme.startswith("s3"):
return s3_key(self.geturl())
@property
def scheme(self):
"""URL scheme"""
return self._components.scheme + "://"
@property
def stem(self):
"""URL stem"""
return pathlib.PurePosixPath(self.urlname).stem
@property
def _components(self):
"""
URL parsed into six components using ``urlparse``
"""
return urlparse(self.urlname)
def __repr__(self):
"""Representation of the ``URL`` object"""
return str(self.urlname)
def __str__(self):
"""String representation of the ``URL`` object"""
return str(self.urlname)
def __add__(self, other):
"""Concatenate URL components using the addition operator"""
return URL(self.urlname + str(other))
def __div__(self, other):
"""Join URL components using the division operator"""
return self.joinpath(other)
def __truediv__(self, other):
"""Join URL components using the division operator"""
return self.joinpath(other)
[docs]
def detect_compression(filename: str | pathlib.Path) -> bool:
"""
Detect if file is compressed based on file extension
Parameters
----------
filename: str or pathlib.Path
Model file
Returns
-------
compressed: bool
Input file is ``gzip`` compressed
"""
filename = Path(filename).resolve()
return bool(re.search(r"\.gz$", filename.name, re.IGNORECASE))
[docs]
def compressuser(filename: str | pathlib.Path):
"""
Tilde-compress a file to be relative to the home directory
Parameters
----------
filename: str or pathlib.Path
Input filename to tilde-compress
"""
# attempt to compress filename relative to home directory
filename = pathlib.Path(filename).expanduser().absolute()
try:
relative_to = filename.relative_to(pathlib.Path().home())
except (ValueError, AttributeError) as exc:
return filename
else:
return pathlib.Path("~").joinpath(relative_to)
# PURPOSE: get the hash value of a file
[docs]
def get_hash(
local: str | io.IOBase | pathlib.Path,
algorithm: str = "md5",
include_algorithm: bool = False,
):
"""
Get the hash value from a local file or ``BytesIO`` object
Parameters
----------
local: obj, str or pathlib.Path
``BytesIO`` object or path to file
algorithm: str, default 'md5'
Hashing algorithm for checksum validation
include_algorithm: bool, default False
Include the algorithm name in the returned hash
"""
# check if open file object or if local file exists
if isinstance(local, io.IOBase):
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
value = hashlib.new(algorithm, local.getvalue()).hexdigest()
return f"{algorithm}:{value}" if include_algorithm else value
else:
raise ValueError(f"Invalid hashing algorithm: {algorithm}")
elif isinstance(local, (str, pathlib.Path)):
# generate checksum hash for local file
local = pathlib.Path(local).expanduser()
# if file currently doesn't exist, return empty string
if not local.exists():
return ""
# open the local_file in binary read mode
with local.open(mode="rb") as local_buffer:
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
value = hashlib.new(algorithm, local_buffer.read()).hexdigest()
return f"{algorithm}:{value}" if include_algorithm else value
else:
raise ValueError(f"Invalid hashing algorithm: {algorithm}")
else:
return ""
# PURPOSE: recursively split a url path
[docs]
def url_split(s: str):
"""
Recursively split a URL path into a list
Parameters
----------
s: str
URL string
"""
head, tail = posixpath.split(str(s))
if head in ("http:", "https:", "ftp:", "s3:"):
return (s,)
elif head in ("", posixpath.sep):
return (tail,)
return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments
[docs]
def convert_arg_line_to_args(arg_line):
"""
Convert file lines to arguments
Parameters
----------
arg_line: str
Line string containing a single argument and/or comments
"""
# remove commented lines and after argument comments
for arg in re.sub(r"\#(.*?)$", r"", arg_line).split():
if not arg.strip():
continue
yield arg
# PURPOSE: returns the Unix timestamp value for a formatted date string
[docs]
def get_unix_time(
time_string: str,
format: str = "%Y-%m-%d %H:%M:%S",
):
"""
Get the Unix timestamp value for a formatted date string
Parameters
----------
time_string: str
Formatted time string to parse
format: str, default '%Y-%m-%d %H:%M:%S'
Format for input time string
"""
try:
parsed_time = time.strptime(time_string.rstrip(), format)
except (TypeError, ValueError):
pass
else:
return calendar.timegm(parsed_time)
# try parsing with dateutil
try:
parsed_time = dateutil.parser.parse(time_string.rstrip())
except (TypeError, ValueError):
return None
else:
return parsed_time.timestamp()
# PURPOSE: output a time string in isoformat
# PURPOSE: rounds a number to an even number less than or equal to original
[docs]
def even(value: float):
"""
Rounds a number to an even number less than or equal to original
Parameters
----------
value: float
Number to be rounded
"""
return 2 * int(value // 2)
# PURPOSE: rounds a number upward to its nearest integer
[docs]
def ceil(value: float):
"""
Rounds a number upward to its nearest integer
Parameters
----------
value: float
number to be rounded upward
"""
return -int(-value // 1)
# PURPOSE: make a copy of a file with all system information
[docs]
def copy(
source: str | pathlib.Path,
destination: str | pathlib.Path,
move: bool = False,
**kwargs,
):
"""
Copy or move a file with all system information
Parameters
----------
source: str or pathlib.Path
Source file
destination: str or pathlib.Path
Copied destination file
move: bool, default False
Remove the source file
"""
source = pathlib.Path(source).expanduser().absolute()
destination = pathlib.Path(destination).expanduser().absolute()
# log source and destination
logging.info(f"{str(source)} -->\n\t{str(destination)}")
shutil.copyfile(source, destination)
shutil.copystat(source, destination)
# remove the original file if moving
if move:
source.unlink()
# PURPOSE: make a symbolic link to a file
[docs]
def symlink(
source: str | pathlib.Path,
destination: str | pathlib.Path,
):
"""
Create a symbolic link to a file
Parameters
----------
source: str or pathlib.Path
Source file
destination: str or pathlib.Path
Symbolic link file
"""
# verify that source and destination are pathlib.Path objects
source = pathlib.Path(source).expanduser()
destination = pathlib.Path(destination).expanduser()
# skip if symlink has the same path as source file
if source == destination:
logging.debug(f"Symbolic link {destination} matches source {source}")
return
# skip if symlink already points to the source file
if destination.is_symlink() and destination.resolve() == source.resolve():
logging.debug(f"Symbolic link already exists: {destination}")
return
# remove existing symbolic link if it points to a different file
if destination.is_symlink() and destination.resolve() != source.resolve():
logging.debug(f"Removing existing symbolic link: {destination}")
destination.unlink()
# create new symbolic link
logging.info(f"\t--> {destination} (symlink)")
destination.symlink_to(source)
# PURPOSE: check ftp connection
[docs]
def check_ftp_connection(
HOST: str,
username: str | None = None,
password: str | None = None,
):
"""
Check internet connection with ``ftp`` host
Parameters
----------
HOST: str
Remote ftp host
username: str or NoneType
``ftp`` username
password: str or NoneType
``ftp`` password
"""
# attempt to connect to ftp host
try:
f = ftplib.FTP(HOST)
f.login(username, password)
f.voidcmd("NOOP")
except IOError:
raise RuntimeError("Check internet connection")
except ftplib.error_perm:
raise RuntimeError("Check login credentials")
else:
return True
# PURPOSE: list a directory on a ftp host
[docs]
def ftp_list(
HOST: str | list,
username: str | None = None,
password: str | None = None,
timeout: int | None = None,
basename: bool = False,
pattern: str | None = None,
sort: bool = False,
):
"""
List a directory on a ``ftp`` host
Parameters
----------
HOST: str or list
Remote ``ftp`` host path split as list
username: str or NoneType
``ftp`` username
password: str or NoneType
``ftp`` password
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
basename: bool, default False
Return the file or directory basename instead of the full path
pattern: str or NoneType, default None
Regular expression pattern for reducing list
sort: bool, default False
Sort output list
Returns
-------
output: list
Items in a directory
mtimes: list
Last modification times for items in the directory
"""
# verify inputs for remote ftp host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try to connect to ftp host
try:
ftp = ftplib.FTP(HOST[0], timeout=timeout)
except (socket.gaierror, IOError):
raise RuntimeError(f"Unable to connect to {HOST[0]}")
else:
ftp.login(username, password)
# list remote path
output = ftp.nlst(posixpath.join(*HOST[1:]))
# get last modified date of ftp files and convert into unix time
mtimes = [None] * len(output)
# iterate over each file in the list and get the modification time
for i, f in enumerate(output):
try:
# try sending modification time command
mdtm = ftp.sendcmd(f"MDTM {f}")
except ftplib.error_perm:
# directories will return with an error
pass
else:
# convert the modification time into unix time
mtimes[i] = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S")
# reduce to basenames
if basename:
output = [posixpath.basename(i) for i in output]
# reduce using regular expression pattern
if pattern:
i = [i for i, f in enumerate(output) if re.search(pattern, f)]
# reduce list of listed items and last modified times
output = [output[indice] for indice in i]
mtimes = [mtimes[indice] for indice in i]
# sort the list
if sort:
i = [i for i, j in sorted(enumerate(output), key=lambda i: i[1])]
# sort list of listed items and last modified times
output = [output[indice] for indice in i]
mtimes = [mtimes[indice] for indice in i]
# close the ftp connection
ftp.close()
# return the list of items and last modified times
return (output, mtimes)
# PURPOSE: download a file from a ftp host
[docs]
def from_ftp(
HOST: str | list,
username: str | None = None,
password: str | None = None,
timeout: int | None = None,
local: str | pathlib.Path | None = None,
hash: str = "",
chunk: int = 8192,
verbose: bool = False,
fid: object = sys.stdout,
label: str | None = None,
mode: oct = 0o775,
**kwargs,
):
"""
Download a file from a ``ftp`` host
Parameters
----------
HOST: str or list
Remote ``ftp`` host path
username: str or NoneType
``ftp`` username
password: str or NoneType
``ftp`` password
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
local: str, pathlib.Path or NoneType, default None
Path to local file
hash: str, default ''
MD5 hash of local file
chunk: int, default 8192
Chunk size for transfer encoding
verbose: bool, default False
Print file transfer information
fid: object, default sys.stdout
Open file object for logging file transfers if verbose
label: str, default None
Label for logging file transfer information if verbose
mode: oct, default 0o775
Permissions mode of output local file
Returns
-------
remote_buffer: obj
``BytesIO`` representation of file
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# verify inputs for remote ftp host
if isinstance(HOST, str):
HOST = url_split(HOST)
# set default label for logging
if label is None:
label = f"{posixpath.join(*HOST)} -->\n\t{local}"
# try downloading from ftp
try:
# try to connect to ftp host
ftp = ftplib.FTP(HOST[0], timeout=timeout)
except (socket.gaierror, IOError):
raise RuntimeError(f"Unable to connect to {HOST[0]}")
else:
ftp.login(username, password)
# remote path
ftp_remote_path = posixpath.join(*HOST[1:])
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
ftp.retrbinary(
f"RETR {ftp_remote_path}", remote_buffer.write, blocksize=chunk
)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# get last modified date of remote file and convert into unix time
mdtm = ftp.sendcmd(f"MDTM {ftp_remote_path}")
remote_mtime = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S")
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
logging.info(label)
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode="wb") as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode)
# keep remote modification time of file and local access time
os.utime(local, (local.stat().st_atime, remote_mtime))
# close the ftp connection
ftp.close()
# return the bytesIO object
remote_buffer.seek(0)
return remote_buffer
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default ``SSL`` context"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an ``SSL`` context for unverified connections"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the ``SSL`` context"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: check connection with http host
[docs]
def check_connection(
HOST: str,
context: ssl.SSLContext = _default_ssl_context,
timeout: int = 20,
):
"""
Check internet connection with ``http`` host
Parameters
----------
HOST: str
Remote ``http`` host
context: obj, default FirnCorr.utilities._default_ssl_context
``SSL`` context for ``urllib`` opener object
timeout: int, default 20
Timeout in seconds for blocking operations
"""
# attempt to connect to http host
try:
urllib2.urlopen(HOST, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
return True
# PURPOSE: list a directory on an Apache http Server
[docs]
def http_list(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
parser=lxml.etree.HTMLParser(),
format: str = "%Y-%m-%d %H:%M",
pattern: str = "",
sort: bool = False,
**kwargs,
):
"""
List a directory on an Apache ``http`` Server
Parameters
----------
HOST: str or list
Remote ``http`` host path
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
``SSL`` context for ``urllib`` opener object
parser: obj, default lxml.etree.HTMLParser()
``HTML`` parser for ``lxml``
format: str, default '%Y-%m-%d %H:%M'
Format for input time string
pattern: str, default ''
Regular expression pattern for reducing list
sort: bool, default False
Sort output list
Returns
-------
colnames: list
Column names in a directory
collastmod: list
Last modification times for items in the directory
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST), **kwargs)
response = urllib2.urlopen(request, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response, parser)
colnames = tree.xpath("//tr/td[not(@*)]//a/@href")
# get the Unix timestamp value for a modification time
collastmod = [
get_unix_time(i, format=format)
for i in tree.xpath('//tr/td[@align="right"][1]/text()')
]
# reduce using regular expression pattern
if pattern:
i = [i for i, f in enumerate(colnames) if re.search(pattern, f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames, collastmod)
# PURPOSE: download a file from a http host
[docs]
def from_http(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
local: str | pathlib.Path | None = None,
hash: str = "",
chunk: int = 16384,
headers: dict = {},
verbose: bool = False,
fid: object = sys.stdout,
label: str | None = None,
mode: oct = 0o775,
**kwargs,
):
"""
Download a file from a ``http`` host
Parameters
----------
HOST: str or list
Remote ``http`` host path split as list
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
``SSL`` context for ``urllib`` opener object
local: str, pathlib.Path or NoneType, default None
Path to local file
hash: str, default ''
``MD5`` hash of local file
chunk: int, default 16384
Chunk size for transfer encoding
headers: dict, default {}
Dictionary of headers to append from URL request
verbose: bool, default False
Print file transfer information
fid: object, default sys.stdout
Open file object for logging file transfers if verbose
label: str or None, default None
Label for logging file transfer information if verbose
mode: oct, default 0o775
Permissions mode of output local file
Returns
-------
remote_buffer: obj
``BytesIO`` representation of file
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# set default label for logging
if label is None:
label = f"{posixpath.join(*HOST)} -->\n\t{local}"
# try downloading from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST), **kwargs)
response = urllib2.urlopen(request, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
shutil.copyfileobj(response, remote_buffer, chunk)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# copy headers from response
headers.update({k.lower(): v for k, v in response.getheaders()})
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
logging.info(label)
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode="wb") as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode)
# return the bytesIO object
remote_buffer.seek(0)
return remote_buffer
# PURPOSE: load a JSON response from a http host
[docs]
def from_json(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
headers: dict = {},
) -> dict:
"""
Load a ``JSON`` response from a ``http`` host
Parameters
----------
HOST: str or list
Remote ``http`` host path split as list
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
``SSL`` context for ``urllib`` opener object
headers: dict, default {}
Dictionary of headers to append from URL request
Returns
-------
json_response: dict
``JSON`` response
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try loading JSON from http
try:
# Create and submit request for JSON response
request = urllib2.Request(posixpath.join(*HOST))
request.add_header("Accept", "application/json")
response = urllib2.urlopen(request, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
# copy headers from response
headers.update({k.lower(): v for k, v in response.getheaders()})
# load JSON response
json_response = json.loads(response.read())
return json_response
# PURPOSE: list a directory on the MAR server
[docs]
def mar_list(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
parser=lxml.etree.HTMLParser(),
pattern: str = "",
sort: bool = False,
):
"""
List a directory from the MAR server at Lèige Université
Parameters
----------
HOST: str or list
Remote ``http`` host path
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
``SSL`` context for ``urllib`` opener object
parser: obj, default lxml.etree.HTMLParser()
``HTML`` parser for ``lxml``
pattern: str, default ''
Regular expression pattern for reducing list
sort: bool, default False
Sort output list
Returns
-------
colnames: list
Column names in a directory
collastmod: list
Last modification times for items in the directory
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
# read and parse request for files
tree = lxml.etree.parse(response, parser)
colnames = tree.xpath("//a/@href")
# get the Unix timestamp value for a modification time
collastmod = []
for s in tree.xpath("//a/following-sibling::text()[1]"):
# parse the modification time from the text following the link
i = re.sub(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}).*", r"\1", s.strip())
# convert the modification time into unix time
collastmod.append(get_unix_time(i, format="%Y-%m-%d %H:%M"))
# validate that columns match modification times
if len(colnames) != len(collastmod):
raise ValueError("Mismatch between names and modification times")
# reduce using regular expression pattern
if pattern:
i = [i for i, f in enumerate(colnames) if re.search(pattern, f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# return the list of column names and modification times
return colnames, collastmod
# NASA Cumulus AWS S3 buckets
_s3_buckets = {
"gesdisc": "gesdisc-cumulus-prod-protected",
"ghrcdaac": "ghrc-cumulus-dev",
"lpdaac": "lp-prod-protected",
"nsidc": "nsidc-cumulus-prod-protected",
"ornldaac": "ornl-cumulus-prod-protected",
"podaac": "podaac-ops-cumulus-protected",
"podaac-doc": "podaac-ops-cumulus-docs",
}
# NASA Cumulus AWS S3 credential endpoints
_s3_endpoints = {
"gesdisc": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcdaac": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"lpdaac": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"nsidc": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ornldaac": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
}
# NASA Cumulus AWS providers
_s3_providers = {
"gesdisc": "GES_DISC",
"ghrcdaac": "GHRC_DAAC",
"lpdaac": "LPCLOUD",
"nsidc": "NSIDC_CPRD",
"ornldaac": "ORNL_CLOUD",
"podaac": "POCLOUD",
}
# PURPOSE: attempt to build an opener with netrc
def attempt_login(
urs: str,
context: ssl.SSLContext = _default_ssl_context,
password_manager: bool = True,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = True,
**kwargs,
):
"""
Attempt to build a ``urllib`` opener for NASA Earthdata
Parameters
----------
urs: str
Earthdata login URS 3 host
context: obj, default FirnCorr.utilities._default_ssl_context
SSL context for ``urllib`` opener object
password_manager: bool, default True
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default True
Add base64 encoded authorization header to opener
username: str, default from environmental variable
NASA Earthdata username
password: str, default from environmental variable
NASA Earthdata password
endpoint: str, default from _s3_endpoints
NASA Cumulus AWS S3 credential endpoint for a provider
retries: int, default 5
number of retry attempts
netrc: str, default ~/.netrc
path to .netrc file for authentication
Returns
-------
opener: obj
OpenerDirector instance
"""
# set default keyword arguments
kwargs.setdefault("username", os.environ.get("EARTHDATA_USERNAME"))
kwargs.setdefault("password", os.environ.get("EARTHDATA_PASSWORD"))
kwargs.setdefault("endpoint", _s3_endpoints["gesdisc"])
kwargs.setdefault("retries", 5)
kwargs.setdefault("netrc", pathlib.Path.home().joinpath(".netrc"))
try:
# verify permissions level of netrc file
# only necessary on jupyterhub
nc = pathlib.Path(kwargs["netrc"]).expanduser().absolute()
nc.chmod(mode=0o600)
# try retrieving credentials from netrc
username, _, password = netrc.netrc(nc).authenticators(urs)
except Exception as exc:
# try retrieving credentials from environmental variables
username, password = (kwargs["username"], kwargs["password"])
pass
# manual input for username if not available
if not username:
username = builtins.input(f"Username for {urs}: ")
# manual input for password if not available
prompt = f"Password for {username}@{urs}: "
if not password:
password = getpass.getpass(prompt=prompt)
# host for endpoint
HOST = kwargs.get("endpoint")
# for each retry
for retry in range(kwargs["retries"]):
# build an opener for urs with credentials
opener = build_opener(
username,
password,
context=context,
password_manager=password_manager,
get_ca_certs=get_ca_certs,
redirect=redirect,
authorization_header=authorization_header,
urs=urs,
)
# try logging in by check credentials
try:
check_credentials(HOST)
except Exception as exc:
pass
else:
return opener
# reattempt login
username = builtins.input(f"Username for {urs}: ")
password = getpass.getpass(prompt=prompt)
# reached end of available retries
raise RuntimeError("End of Retries: Check NASA Earthdata credentials")
# PURPOSE: "login" to NASA Earthdata with supplied credentials
[docs]
def build_opener(
username: str,
password: str,
context: ssl.SSLContext = _default_ssl_context,
password_manager: bool = False,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = True,
urs: str = "https://urs.earthdata.nasa.gov",
):
"""
Build ``urllib`` opener for NASA Earthdata with supplied credentials
Parameters
----------
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS)
SSL context for ``urllib`` opener object
password_manager: bool, default False
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default True
Add base64 encoded authorization header to opener
urs: str, default 'https://urs.earthdata.nasa.gov'
Earthdata login URS 3 host
Returns
-------
opener: object
``OpenerDirector`` instance
"""
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# create a password manager
if password_manager:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
# Add the username and password for NASA Earthdata Login system
password_mgr.add_password(None, urs, username, password)
handler.append(urllib2.HTTPBasicAuthHandler(password_mgr))
# Create cookie jar for storing cookies. This is used to store and return
# the session cookie given to use by the data server (otherwise will just
# keep sending us back to Earthdata Login to authenticate).
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
# SSL context handler
if get_ca_certs:
context.get_ca_certs()
handler.append(urllib2.HTTPSHandler(context=context))
# redirect handler
if redirect:
handler.append(urllib2.HTTPRedirectHandler())
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# Encode username/password for request authorization headers
# add Authorization header to opener
if authorization_header:
b64 = base64.b64encode(f"{username}:{password}".encode())
opener.addheaders = [("Authorization", f"Basic {b64.decode()}")]
# Now all calls to urllib2.urlopen use our opener.
urllib2.install_opener(opener)
# All calls to urllib2.urlopen will now use handler
# Make sure not to include the protocol in with the URL, or
# HTTPPasswordMgrWithDefaultRealm will be confused.
return opener
# PURPOSE: generate a NASA Earthdata user token
def get_token(
HOST: str = "https://urs.earthdata.nasa.gov/api/users/token",
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = "urs.earthdata.nasa.gov",
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
HOST: str or list
NASA Earthdata token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
token: dict
JSON response with NASA Earthdata User Token
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True,
)
# create post response with Earthdata token API
try:
request = urllib2.Request(HOST, method="POST")
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: generate a NASA Earthdata user token
def list_tokens(
HOST: str = "https://urs.earthdata.nasa.gov/api/users/tokens",
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = "urs.earthdata.nasa.gov",
):
"""
List the current associated NASA Earthdata User Tokens
Parameters
----------
HOST: str
NASA Earthdata list token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
tokens: list
JSON response with NASA Earthdata User Tokens
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True,
)
# create get response with Earthdata list tokens API
try:
request = urllib2.Request(HOST)
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: revoke a NASA Earthdata user token
def revoke_token(
token: str,
HOST: str = f"https://urs.earthdata.nasa.gov/api/users/revoke_token",
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = "urs.earthdata.nasa.gov",
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
token: str
NASA Earthdata token to be revoked
HOST: str
NASA Earthdata revoke token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default FirnCorr.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(
urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True,
)
# full path for NASA Earthdata revoke token API
url = f"{HOST}?token={token}"
# create post response with Earthdata revoke tokens API
try:
request = urllib2.Request(url, method="POST")
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError("Check internet connection") from exc
# verbose response
logging.debug(f"Token Revoked: {token}")
def s3_region():
"""
Get AWS s3 region for EC2 instance
Returns
-------
region_name: str
AWS region name
"""
boto3 = import_dependency("boto3")
region_name = boto3.session.Session().region_name
return region_name
# PURPOSE: get AWS s3 client for GES DISC
def s3_client(
HOST: str = _s3_endpoints["gesdisc"],
timeout: int | None = None,
region_name: str = "us-west-2",
):
"""
Get AWS s3 client for NASA Earthdata
Parameters
----------
HOST: str
S3 credential host
timeout: int or NoneType, default None
timeout in seconds for blocking operations
region_name: str, default 'us-west-2'
AWS region name
Returns
-------
client: obj
AWS s3 client
"""
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=timeout)
cumulus = json.loads(response.read())
# get AWS client object
boto3 = import_dependency("boto3")
client = boto3.client(
"s3",
aws_access_key_id=cumulus["accessKeyId"],
aws_secret_access_key=cumulus["secretAccessKey"],
aws_session_token=cumulus["sessionToken"],
region_name=region_name,
)
# return the AWS client for region
return client
# PURPOSE: get a s3 bucket name from a presigned url
def s3_bucket(presigned_url: str) -> str:
"""
Get a s3 bucket name from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url
Returns
-------
bucket: str
s3 bucket name
"""
host = url_split(presigned_url)
bucket = re.sub(r"s3:\/\/", r"", host[0], re.IGNORECASE)
return bucket
# PURPOSE: get a s3 bucket key from a presigned url
def s3_key(presigned_url: str) -> str:
"""
Get a s3 bucket key from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url
Returns
-------
key: str
s3 bucket key for object
"""
host = url_split(presigned_url)
key = posixpath.join(*host[1:])
return key
# PURPOSE: check that entered NASA Earthdata credentials are valid
def check_credentials(HOST: str = _s3_endpoints["gesdisc"]):
"""
Check that entered NASA Earthdata credentials are valid
HOST: str
full url to protected credential website
"""
try:
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=20)
except urllib2.HTTPError:
raise RuntimeError("Check your NASA Earthdata credentials")
except urllib2.URLError:
raise RuntimeError("Check internet connection")
else:
return True
# PURPOSE: list a directory on NASA GES DISC https server
[docs]
def gesdisc_list(
HOST: str | list,
username: str | None = None,
password: str | None = None,
build: bool = False,
timeout: int | None = None,
urs: str = "urs.earthdata.nasa.gov",
parser=lxml.etree.HTMLParser(),
format: str = r"%Y-%m-%d %H:%M",
pattern: str = "",
sort: bool = False,
):
"""
List a directory on NASA GES DISC servers
Parameters
----------
HOST: str or list
Remote ``https`` host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener with NASA Earthdata credentials
timeout: int or NoneType, default None
Timeout in seconds for blocking operations
parser: obj, default lxml.etree.HTMLParser()
``HTML`` parser for ``lxml``
format: str, default '%Y-%m-%d %H:%M'
Format for input time string
pattern: str, default ''
Regular expression pattern for reducing list
sort: bool, default False
Sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
"""
# use netrc credentials
if build and not (username or password):
username, _, password = netrc.netrc().authenticators(urs)
# build urllib2 opener with credentials
if build:
build_opener(
username,
password,
password_manager=True,
authorization_header=False,
)
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise
except urllib2.URLError as exc:
logging.debug(exc.reason)
exc.message = "Check internet connection"
raise
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response, parser)
colnames = tree.xpath("//tr/td[not(@*)]//a/@href")
# get the Unix timestamp value for a modification time
lastmod = [
get_unix_time(i, format=format)
for i in tree.xpath('//tr/td[@align="right"][1]/text()')
]
# reduce using regular expression pattern
if pattern:
i = [i for i, f in enumerate(colnames) if re.search(pattern, f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i, j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
lastmod = [lastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames, lastmod)
# PURPOSE: filter the CMR json response for desired data files
[docs]
def cmr_filter_json(
search_results: dict,
endpoint: str = "data",
request_type: str = "application/x-netcdf",
):
"""
Filter the CMR json response for desired data files
Parameters
----------
search_results: dict
json response from CMR query
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-netcdf'
data type for reducing CMR query
Returns
-------
granule_names: list
Model granule names
granule_urls: list
Model granule urls
granule_mtimes: list
Model granule modification times
"""
# output list of granule ids, urls and modified times
granule_names = []
granule_urls = []
granule_mtimes = []
# check that there are urls for request
if ("feed" not in search_results) or (
"entry" not in search_results["feed"]
):
return (granule_names, granule_urls, granule_mtimes)
# descriptor links for each endpoint
rel = {}
rel["data"] = "http://esipfed.org/ns/fedsearch/1.1/data#"
rel["opendap"] = "http://esipfed.org/ns/fedsearch/1.1/service#"
rel["s3"] = "http://esipfed.org/ns/fedsearch/1.1/s3#"
# iterate over references and get cmr location
for entry in search_results["feed"]["entry"]:
granule_names.append(entry["producer_granule_id"])
granule_mtimes.append(
get_unix_time(entry["updated"], format="%Y-%m-%dT%H:%M:%S.%f%z")
)
for link in entry["links"]:
# skip inherited granules
if "inherited" in link.keys():
continue
# append if selected endpoint
if link["rel"] == rel[endpoint]:
granule_urls.append(link["href"])
break
# alternatively append if selected data type
if "type" not in link.keys():
continue
if link["type"] == request_type:
granule_urls.append(link["href"])
break
# return the list of urls, granule ids and modified times
return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: query the NASA Common Metadata Repository (CMR)
[docs]
def cmr(
short_name: str,
version: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
provider: str = "GES_DISC",
endpoint: str = "data",
request_type: str = r"application/x-netcdf",
verbose: bool = False,
fid: object = sys.stdout,
):
"""
Query the NASA Common Metadata Repository (CMR) for model data
Parameters
----------
short_name: str
Model shortname in the CMR system
version: str or NoneType, default None
Model version
start_date: str or NoneType, default None
starting date for CMR product query
end_date: str or NoneType, default None
ending date for CMR product query
provider: str, default 'GES_DISC'
CMR data provider
- ``'GES_DISC'``: GESDISC
- ``'GESDISCCLD'``: GESDISC Cumulus
- ``'PODAAC'``: PO.DAAC Drive
- ``'POCLOUD'``: PO.DAAC Cumulus
endpoint: str, default 'data'
url endpoint type
- ``'data'``: NASA Earthdata https archive
- ``'opendap'``: NASA Earthdata OPeNDAP archive
- ``'s3'``: NASA Earthdata Cumulus AWS S3 bucket
request_type: str, default 'application/x-netcdf'
data type for reducing CMR query
verbose: bool, default False
print CMR query information
fid: object, default sys.stdout
Open file object for logging CMR URL if verbose
Returns
-------
granule_names: list
Model granule names
granule_urls: list
Model granule urls
granule_mtimes: list
Model granule modification times
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# build urllib2 opener with SSL context
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# Create cookie jar for storing cookies
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
handler.append(urllib2.HTTPSHandler(context=ssl.SSLContext()))
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# build CMR query
cmr_format = "json"
cmr_page_size = 2000
CMR_HOST = [
"https://cmr.earthdata.nasa.gov",
"search",
f"granules.{cmr_format}",
]
# build list of CMR query parameters
CMR_KEYS = []
CMR_KEYS.append(f"?provider={provider}")
CMR_KEYS.append("&sort_key[]=start_date")
CMR_KEYS.append("&sort_key[]=producer_granule_id")
CMR_KEYS.append(f"&page_size={cmr_page_size}")
# dictionary of product shortnames and version
CMR_KEYS.append(f"&short_name={short_name}")
if version:
CMR_KEYS.append(f"&version={version}")
# append keys for start and end time
# verify that start and end times are in ISO format
start_date = isoformat(start_date) if start_date else ""
end_date = isoformat(end_date) if end_date else ""
CMR_KEYS.append(f"&temporal={start_date},{end_date}")
# full CMR query url
cmr_query_url = URL.from_parts(CMR_HOST) + "".join(CMR_KEYS)
logging.info(f"CMR request={cmr_query_url}")
# output list of granule names and urls
granule_names = []
granule_urls = []
granule_mtimes = []
cmr_search_after = None
while True:
# make CMR query request
request = cmr_query_url.request()
# add CMR search after header
if cmr_search_after:
request.add_header("CMR-Search-After", cmr_search_after)
logging.debug(f"CMR-Search-After: {cmr_search_after}")
# submit request and get response
response = opener.open(request)
# get search after index for next iteration
headers = {k.lower(): v for k, v in dict(response.info()).items()}
cmr_search_after = headers.get("cmr-search-after")
# read the CMR search as JSON
search_page = json.loads(response.read().decode("utf8"))
ids, urls, mtimes = cmr_filter_json(
search_page,
endpoint=endpoint,
request_type=request_type,
)
if not urls or cmr_search_after is None:
break
# extend lists
granule_names.extend(ids)
granule_urls.extend(urls)
granule_mtimes.extend(mtimes)
# return the list of granule ids, urls and modification times
return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: build requests for the GES DISC subsetting API
[docs]
def build_request(
short_name: str,
dataset_version: str,
url: str,
host: str | None = None,
variables: list = [],
format: str = "bmM0Lw",
service: str = "L34RS_MERRA2",
version: str = "1.02",
bbox: list[int] | list[float] = [-90, -180, 90, 180],
**kwargs,
):
"""
Build requests for the GES DISC subsetting API
Parameters
----------
short_name: str
Model shortname in the CMR system
dataset_version: str
Model version
url: str
url for granule returned by the CMR system
host: str or NoneType, default None
Override host provider for GES DISC subsetting
Default is host provider given by CMR request
variables: list, default []
Variables for product to subset
format: str, default 'bmM0Lw'
Coded output format for GES DISC subsetting API
service: str, default 'L34RS_MERRA2'
GES DISC subsetting API service
version: str, default '1.02'
GES DISC subsetting API service version
bbox: list, default [-90,-180,90,180]
Bounding box to spatially subset
kwargs: dict, default {}
Additional parameters for GES DISC subsetting API
Returns
-------
request_url: str
Formatted url for GES DISC subsetting API
"""
# split CMR supplied url for granule
HOST, *args = url_split(url)
if host is None:
host = HOST
# base URL for GES DISC on-the-fly subsetting API
api_host = URL.from_parts([host, "daac-bin", "OTF", "HTTP_services.cgi?"])
# create parameters to be encoded
kwargs["FILENAME"] = posixpath.join(posixpath.sep, *args)
kwargs["FORMAT"] = format
kwargs["SERVICE"] = service
kwargs["VERSION"] = version
kwargs["BBOX"] = ",".join(map(str, bbox))
kwargs["SHORTNAME"] = short_name
kwargs["DATASET_VERSION"] = dataset_version
if variables is not None:
kwargs["VARIABLES"] = ",".join(variables)
# return the formatted request url
request_url = api_host + urlencode(kwargs)
return request_url