Source code for FirnCorr.io.MAR

#!/usr/bin/env python
"""
MAR.py
Written by Tyler Sutterley (04/2026)
Reads Modèle Atmosphérique Régional (MAR) data products provided by
Lèige Université (Belgium)

PYTHON DEPENDENCIES:
    numpy: Scientific Computing Tools For Python
        https://numpy.org
        https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
    xarray: N-D labeled arrays and datasets in Python
        https://docs.xarray.dev/en/stable/

UPDATE HISTORY:
    Updated 04/2026: added lineage attribute to save model filename(s)
    Written 04/2026
"""

from __future__ import print_function

import re
import gzip
import logging
import pathlib
import xarray as xr
import numpy as np
from FirnCorr.io.dataset import combine_attrs
from FirnCorr.utilities import import_dependency, dependency_available

# attempt imports
dask = import_dependency("dask")
dask_available = dependency_available("dask")

# mapping of netCDF4 variable names to internal variable names
_variable_mapping = {
    "SMB": "SMB",
    "ZN4": "zfirn",
    "ZN5": "zmelt",
    "ZN6": "zsurf",
}
# variable attributes for derived fields
_attributes = dict(
    zsmb=dict(long_name="Snow Height Change due to SMB"),
    zaccum=dict(long_name="Snow Height Change due to Accumulation"),
)

# PROJ4 parameters for MAR model projections
proj4_params = dict()
# Greenland: Polar Stereographic (Oblique)
# Earth Radius: 6371229 m
# True Latitude: 0
# Center Longitude: -40
# Center Latitude: 70.5
# Coordinate Axis Units: km
proj4_params["Greenland"] = {
    "proj": "sterea",
    "lat_0": 70.5,
    "lat_ts": 0,
    "lon_0": -40,
    "k": 1,
    "x_0": 0,
    "y_0": 0,
    "R": 6371229,
    "units": "km",
    "no_defs": None,
    "type": "crs",
}
# Antarctica: WGS84 / Polar Stereographic
# Modification of EPSG:3031
# Coordinate Axis Units: km
proj4_params["Antarctica"] = {
    "proj": "stere",
    "lat_0": -90,
    "lat_ts": -71,
    "lon_0": 0,
    "datum": "WGS84",
    "units": "km",
    "no_defs": None,
    "type": "crs",
}


[docs] def open_mfdataset( filenames: list[str] | list[pathlib.Path], parallel: bool = False, **kwargs, ): """ Open multiple files containing MAR model data Parameters ---------- filenames: list of str or pathlib.Path Path(s) to file(s) containing MAR data parallel: bool, default False Open files in parallel using ``dask.delayed`` kwargs: dict Additional keyword arguments for opening MAR files """ # merge multiple granules if parallel and dask_available: opener = dask.delayed(open_dataset) else: opener = open_dataset # verify that filename is iterable if isinstance(filenames, str): filenames = [filenames] # read each file as xarray dataset and append to list datasets = [opener(f, **kwargs) for f in filenames] # read datasets as dask arrays if parallel and dask_available: (datasets,) = dask.compute(datasets) # concatenate a single variable over time ds = xr.concat( datasets, combine_attrs=combine_attrs, compat="override", coords="minimal", dim="time", join="override", ) # return xarray dataset return ds
[docs] def open_dataset( filename: str | pathlib.Path, variable: str | list[str], surface_type: int | list[int] = 4, chunks: str | None = None, **kwargs, ): """ Open a netCDF4 file containing MAR data Parameters ---------- filename: str or pathlib.Path Path to netCDF4 file containing MAR data variable: str or list netCDF4 variable name(s) to extract surface_type: int or list, default 4 Surface type(s) to extract (1 = ocean, 4 = land) chunks: str or None, default None Chunk size for ``xarray`` dataset compressed: bool, default False If True, read gzipped netCDF4 file """ # set default keyword arguments kwargs.setdefault("compressed", False) # read the netCDF4-format file logging.debug(filename) if kwargs["compressed"]: # read gzipped netCDF4 file f = gzip.open(filename, "rb") tmp = xr.open_dataset(f, mask_and_scale=True, chunks=chunks) else: tmp = xr.open_dataset(filename, mask_and_scale=True, chunks=chunks) # get coordinate names mapping = {} for v in tmp.coords: if re.match(r"X", tmp[v].attrs.get("axis", ""), re.IGNORECASE): mapping[v] = "x" elif re.match(r"Y", tmp[v].attrs.get("axis", ""), re.IGNORECASE): mapping[v] = "y" elif re.match(r"T", tmp[v].attrs.get("axis", ""), re.IGNORECASE): mapping[v] = "time" elif re.match(r"sector$", v, re.IGNORECASE): mapping[v] = "sector" # rename to standardized coordinate names tmp = tmp.rename(mapping) # attempt to get MAR region from latitude variable region = "Greenland" if tmp["LAT"].min() > 0 else "Antarctica" # output dataset ds = xr.Dataset() # extract x, y and time coordinate arrays ds["y"] = tmp["y"].copy() ds["x"] = tmp["x"].copy() ds["time"] = tmp["time"].copy() # check if surface_types is a string if isinstance(surface_type, int): surface_type = [surface_type] # extract surface type and ice fraction variables valid_surface = tmp["SRF"].isin(surface_type) # combine sectors (if applicable) using fractional area if "FRA" in tmp and "sector" in tmp["FRA"].dims: fa = tmp["FRA"].isel(sector=0) / 100.0 elif "FRV" in tmp and "sector" in tmp["FRV"].dims: fa = tmp["FRV"].isel(sector=0) / 100.0 # check if variable is a string if isinstance(variable, str): variable = [variable] # loop over variables to extract for var in variable: # read dataset and remove singleton dimensions imap = _variable_mapping.get(var, var) v = tmp[var].squeeze() # combine sectors (if applicable) using fractional area if v.ndim == 4 and "sector" in v.dims: ds[imap] = fa * v.isel(sector=0) + (1 - fa) * v.isel(sector=1) else: ds[imap] = v.copy() # reduce to surface type of interest ds[imap] = ds[imap].where(valid_surface, np.nan, drop=False) # add attributes for variable ds[imap].attrs["group"] = var ds[imap].attrs["units"] = tmp[var].attrs.get("units", "") # check if fields are available to derive SMB and accumulation ZN6 = "zsurf" in ds.data_vars ZN4 = "zfirn" in ds.data_vars ZN5 = "zmelt" in ds.data_vars # calculate derived fields (if available) if ZN6 and ZN4: # height change due to surface mass balance (SMB) ds["zsmb"] = ds["zsurf"] - ds["zfirn"] ds["zsmb"].attrs.update(_attributes["zsmb"]) ds["zsmb"].attrs["group"] = ["ZN6", "ZN4"] if ZN6 and ZN4 and ZN5: # height change due to accumulation (SMB - melt) ds["zaccum"] = ds["zsurf"] - ds["zfirn"] - ds["zmelt"] ds["zaccum"].attrs.update(_attributes["zaccum"]) ds["zaccum"].attrs["group"] = ["ZN6", "ZN4", "ZN5"] # drop coordinates that are not in the dimensions drop_coords = [c for c in ds.coords if c not in ds.dims] ds = ds.drop_vars(drop_coords) # verify that chunks are unified (if specified) if chunks is not None: ds = ds.unify_chunks() # add attributes to dataset ds.attrs["lineage"] = pathlib.Path(filename).name ds.attrs["crs"] = proj4_params[region] # return the dataset return ds