#!/usr/bin/env python
"""
RACMO.py
Written by Tyler Sutterley (04/2026)
Reads Regional Atmospheric and Climate MOdel (RACMO) data products
provided by IMAU (Utrecht University)
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
xarray: N-D labeled arrays and datasets in Python
https://docs.xarray.dev/en/stable/
UPDATE HISTORY:
Updated 04/2026: added lineage attribute to save model filename(s)
add a case-insensitive search for variables in input dataset
check if x and y coordinates are present in the downscaled dataset
Written 04/2026
"""
from __future__ import print_function
import re
import gzip
import logging
import pathlib
import xarray as xr
import numpy as np
import timescale.time
from FirnCorr.io.dataset import combine_attrs, get_variable
from FirnCorr.utilities import import_dependency, dependency_available
# attempt imports
dask = import_dependency("dask")
dask_available = dependency_available("dask")
# mapping of netCDF4 variable names to internal variable names
_variable_mapping = {
"smb": "SMB",
"smb_rec": "SMB", # downscaled SMB
"precipcorr": "precip", # downscaled precipitation
"refreezecorr": "refreeze", # downscaled meltwater refreeze
"runoffcorr": "runoff", # downscaled runoff
"snowmeltcorr": "snowmelt", # downscaled snowmelt
"firnair": "zfirn",
"hgtsrf": "zsurf",
"zs": "zsurf",
}
# PROJ4 parameters for RACMO model projections
proj4_params = {"rotated_pole": {}, "downscaled": {}}
# RACMO (default) model projections: Rotated Latitude-Longitude
proj4_params["rotated_pole"]["ANT"] = (
"-m 57.295779506 +proj=ob_tran +o_proj=latlon +o_lat_p=-180.0 +lon_0=10.0"
)
proj4_params["rotated_pole"]["ASE"] = (
"-m 57.295779506 +proj=ob_tran +o_proj=latlon +o_lat_p=167.0 +lon_0=53.0"
)
proj4_params["rotated_pole"]["PEN"] = (
"-m 57.295779506 +proj=ob_tran +o_proj=latlon +o_lat_p=-180.0 +lon_0=30.0"
)
proj4_params["rotated_pole"]["ARC"] = (
"-m 57.295779506 +proj=ob_tran +o_proj=latlon +o_lat_p=6.55 +lon_0=180.0"
)
proj4_params["rotated_pole"]["GRN"] = (
"-m 57.295779506 +proj=ob_tran +o_proj=latlon +o_lat_p=18.0 +lon_0=-37.5"
)
# RACMO downscaled model projections: Polar Stereographic
# ANT: WGS84 / Polar Stereographic South
# GRN: WGS84 / NSIDC Sea Ice Polar Stereographic North
proj4_params["downscaled"]["ANT"] = 3031
proj4_params["downscaled"]["GRN"] = 3413
[docs]
def open_mfdataset(
filenames: list[str] | list[pathlib.Path],
parallel: bool = False,
how: str = "merge",
**kwargs,
):
"""
Open multiple files containing RACMO model data
Parameters
----------
filenames: list of str or pathlib.Path
Path(s) to file(s) containing RACMO data
parallel: bool, default False
Open files in parallel using ``dask.delayed``
how: str, default 'merge'
How to merge the datasets
- ``'merge'``: merge variables from multiple files
- ``'concat'``: concatenate a single variable over time
kwargs: dict
Additional keyword arguments for opening RACMO files
"""
# merge multiple granules
if parallel and dask_available:
opener = dask.delayed(open_dataset)
else:
opener = open_dataset
# verify that filename is iterable
if isinstance(filenames, str):
filenames = [filenames]
# read each file as xarray dataset and append to list
datasets = [opener(f, **kwargs) for f in filenames]
# read datasets as dask arrays
if parallel and dask_available:
(datasets,) = dask.compute(datasets)
# merge or concatenate datasets
if how == "merge":
# merge variables from multiple files
ds = xr.merge(
datasets,
combine_attrs=combine_attrs,
compat="override",
join="override",
)
elif how == "concat":
# concatenate a single variable over time
ds = xr.concat(
datasets,
combine_attrs=combine_attrs,
compat="override",
coords="minimal",
dim="time",
join="override",
)
else:
# raise an error for unknown or invalid merge methods
raise ValueError(f"Invalid merge method: {how}")
# return xarray dataset
return ds
[docs]
def open_dataset(
filename: str | pathlib.Path,
format: str = "netcdf",
**kwargs,
):
"""
Open a file with RACMO model data
Parameters
----------
filename: str or pathlib.Path
Path to file containing RACMO data
format: str
Format of RACMO data
- ``'ascii'``: ascii-formatted model output
- ``'downscaled'``: downscaled model output in netCDF4 format
- ``'netcdf'``: daily or monthly model outputs in netCDF4 format
kwargs: dict
Additional keyword arguments for opening RACMO files
"""
if format == "ascii":
return open_ascii_dataset(filename, **kwargs)
elif format == "netcdf":
return open_netcdf_dataset(filename, **kwargs)
elif format == "downscaled":
return open_downscaled_dataset(filename, **kwargs)
else:
raise ValueError(f"Invalid format: {format}")
[docs]
def open_ascii_dataset(
filename: str | pathlib.Path,
variable: str = "SMB",
chunks: str | None = None,
**kwargs,
):
"""
Open an ASCII file with RACMO model data
Parameters
----------
filename: str or pathlib.Path
Path to ASCII file containing RACMO data
variable: str, default 'SMB'
Variable name in the ASCII file to extract
chunks: str or None, default None
Chunk size for ``xarray`` dataset
"""
# read the tab-delimited text file
logging.debug(filename)
tmp = np.loadtxt(filename)
# latitude and longitude arrays
lon = tmp[:, 0].copy()
lat = tmp[:, 1].copy()
# flattened data matrix
flattened = tmp[:, 2:]
# number of time steps in the dataset
_, nt = flattened.shape
# grid dimensions for each RACMO region
if np.max(lat) > 0:
# XGRN11 grid dimensions
ny, nx = 312, 306
# starting epoch and conversion from months to seconds
epoch, to_secs = [1960, 1, 15], 365.25 * 86400.0 / 12.0
elif np.min(lat) < 0:
# XANT27 grid dimensions
ny, nx = 240, 262
# starting epoch and conversion from months to seconds
epoch, to_secs = [1979, 1, 15], 365.25 * 86400.0 / 12.0
# data dictionary
var = dict(dims=("y", "x", "time"), coords={}, data_vars={})
# store the latitude coordinates
var["data_vars"]["lat"] = {}
var["data_vars"]["lat"]["dims"] = ("y", "x")
var["data_vars"]["lat"]["data"] = lat.reshape(ny, nx)
# store the longitude coordinates
var["data_vars"]["lon"] = {}
var["data_vars"]["lon"]["dims"] = ("y", "x")
var["data_vars"]["lon"]["data"] = lon.reshape(ny, nx)
# store the data variables
var["data_vars"][variable] = {}
var["data_vars"][variable]["dims"] = ("y", "x", "time")
var["data_vars"][variable]["data"] = flattened.reshape(ny, nx, nt)
# convert to xarray Dataset from the data dictionary
ds = xr.Dataset.from_dict(var)
# replace zeros with NaNs
ds[variable] = ds[variable].where(
ds[variable].sum(dim="time", skipna=False) != 0, np.nan, drop=False
)
# convert delta times from seconds since epoch to datetime objects
ts = timescale.from_deltatime(np.arange(nt) * to_secs, epoch=epoch)
ds["time"] = ts.to_datetime()
# coerce to specified chunks
if chunks is not None:
ds = ds.chunk(chunks)
# add attributes to dataset
ds.attrs["lineage"] = pathlib.Path(filename).name
ds[variable].attrs["units"] = "mm w.e."
# return the dataset
return ds
[docs]
def open_netcdf_dataset(
filename: str | pathlib.Path,
variable: str | list[str],
chunks: str | None = None,
**kwargs,
):
"""
Open a netCDF4 file with RACMO model data
Parameters
----------
filename: str or pathlib.Path
Path to netCDF4 file containing RACMO data
variable: str or list
netCDF4 variable name(s) to extract
chunks: str or None, default None
Chunk size for ``xarray`` dataset
compressed: bool, default False
If True, read gzipped netCDF4 file
"""
# set default keyword arguments
kwargs.setdefault("compressed", False)
# read the netCDF4-format file
logging.debug(filename)
if kwargs["compressed"]:
# read gzipped netCDF4 file
f = gzip.open(filename, "rb")
tmp = xr.open_dataset(f, mask_and_scale=True, chunks=chunks)
else:
tmp = xr.open_dataset(filename, mask_and_scale=True, chunks=chunks)
# regular expression pattern for extracting parameters
pattern = r"[F|X]?(ARC|GRN|ANT|ASE|PEN)[\d+]"
m = re.search(pattern, pathlib.Path(filename).stem)
if m:
region = m.group(1)
elif "LAT" in tmp.data_vars:
# attempt to get RACMO region from latitude variable
latname = "LAT" if "LAT" in tmp.data_vars else "lat"
region = "GRN" if tmp[latname].min() > 0 else "ANT"
# get coordinate names
mapping = {}
for v in tmp.coords:
if re.match(r"X", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "x"
elif re.match(r"Y", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "y"
elif re.match(r"T", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "time"
# verify mapping
if "x" not in mapping and "rlon" in tmp.coords:
mapping["rlon"] = "x"
if "y" not in mapping and "rlat" in tmp.coords:
mapping["rlat"] = "y"
if "time" not in mapping and "time" in tmp.coords:
mapping["time"] = "time"
# rename to standardized coordinate names
tmp = tmp.rename(mapping)
# output dataset
ds = xr.Dataset()
# extract x, y and time coordinate arrays
ds["y"] = tmp["y"].copy()
ds["x"] = tmp["x"].copy()
ds["time"] = tmp["time"].copy()
# check if variable is a string
if isinstance(variable, str):
variable = [variable]
# loop over variables to extract
for var in variable:
# get variable in dataset (case-insensitive search)
darr = get_variable(tmp, var)
# skip if the variable is not found in the dataset
if darr is None:
logging.info(f"Variable {var} not found in dataset")
continue
elif darr.name != var:
logging.info(f"Variable {var} mapped to {darr.name} in dataset")
# mapping between netCDF4 variable names and internal variable names
imap = _variable_mapping.get(darr.name.lower(), var)
# remove singleton dimensions from data array and assign to dataset
ds[imap] = darr.squeeze()
# replace zeros with NaNs
ds[imap] = ds[imap].where(
ds[imap].sum(dim="time", skipna=False) != 0, np.nan, drop=False
)
# add attributes for variable
ds[imap].attrs["group"] = darr.name
ds[imap].attrs["units"] = darr.attrs.get("units", "")
# drop coordinates that are not in the dimensions
drop_coords = [c for c in ds.coords if c not in ds.dims]
ds = ds.drop_vars(drop_coords)
# add attributes to dataset
ds.attrs["lineage"] = pathlib.Path(filename).name
if "rotated_pole" in tmp.data_vars:
# extract rotated pole parameters from dataset attributes
ds.attrs["crs"] = tmp["rotated_pole"].proj4_params
else:
# add default rotated pole parameters for region
ds.attrs["crs"] = proj4_params["rotated_pole"][region]
# raise a value error if there are no variables in the dataset
if len(ds.data_vars) == 0:
raise ValueError(f"No variables found in dataset: {filename}")
# return the dataset
return ds
[docs]
def open_downscaled_dataset(
filename: str | pathlib.Path,
variable: str | list[str],
chunks: str | None = None,
**kwargs,
):
"""
Open a netCDF4 file with downscaled RACMO model data
Parameters
----------
filename: str or pathlib.Path
Path to netCDF4 file containing RACMO data
variable: str or list
netCDF4 variable name(s) to extract
chunks: str or None, default None
Chunk size for ``xarray`` dataset
compressed: bool, default False
If True, read gzipped netCDF4 file
"""
# set default keyword arguments
kwargs.setdefault("compressed", False)
# read the netCDF4-format file
if kwargs["compressed"]:
# read gzipped netCDF4 file
f = gzip.open(filename, "rb")
tmp = xr.open_dataset(
f, mask_and_scale=True, decode_times=False, chunks=chunks
)
else:
tmp = xr.open_dataset(
filename, mask_and_scale=True, decode_times=False, chunks=chunks
)
# regular expression pattern for extracting parameters
# default to Greenland if region cannot be determined from filename
m = re.search(r"[F|X]?(GRN|ANT)[\d+]", pathlib.Path(filename).stem)
region = m.group(1) if m else "GRN"
# set the x and y coordinates if presently unavailable in the dataset
# some versions of the downscaled product have (r)lon and (r)lat variables
if "x" not in tmp.coords and "y" not in tmp.coords:
for v in tmp.data_vars:
if tmp[v].ndim > 1:
continue
elif re.match(r"X", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
tmp = tmp.assign_coords(x=tmp[v])
elif re.match(r"Y", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
tmp = tmp.assign_coords(y=tmp[v])
# get coordinate names
mapping = {}
for v in tmp.coords:
if re.match(r"X", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "x"
elif re.match(r"Y", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "y"
elif re.match(r"T", tmp[v].attrs.get("axis", ""), re.IGNORECASE):
mapping[v] = "time"
# rename to standardized coordinate names
tmp = tmp.rename(mapping)
# output dataset
ds = xr.Dataset()
# cell origins in the downscaled product are on the bottom right
dx = np.abs(tmp["x"][1] - tmp["x"][0])
dy = np.abs(tmp["y"][1] - tmp["y"][0])
# convert x and y arrays to cell centers
ds["y"] = tmp["y"].copy() - dy / 2.0
ds["x"] = tmp["x"].copy() - dx / 2.0
# parse dates from time variable
epoch, to_secs = timescale.time.parse_date_string(tmp["time"].units)
# if monthly: convert to seconds using average month lengths
if re.search(r"month", tmp["time"].units, re.IGNORECASE):
to_secs = 365.25 * 86400.0 / 12.0
# convert delta times from seconds since epoch to datetime objects
ts = timescale.from_deltatime(tmp["time"] * to_secs, epoch=epoch)
ds["time"] = ts.to_datetime()
# get the ice mask from the downscaled dataset
if "icemask" in tmp.data_vars:
tmp = tmp.where(tmp["icemask"] == 1, drop=False)
elif "Promicemask" in tmp.data_vars:
tmp = tmp.where(
(tmp["Promicemask"] >= 1) & (tmp["Promicemask"] <= 3),
drop=False,
)
# check if variable is a string
if isinstance(variable, str):
variable = [variable]
# loop over variables to extract
for var in variable:
# get variable in dataset (case-insensitive search)
darr = get_variable(tmp, var)
# skip if the variable is not found in the dataset
if darr is None:
logging.info(f"Variable {var} not found in dataset")
continue
elif darr.name != var:
logging.info(f"Variable {var} mapped to {darr.name} in dataset")
# mapping between netCDF4 variable names and internal variable names
imap = _variable_mapping.get(darr.name.lower(), var)
# remove singleton dimensions from data array and assign to dataset
ds[imap] = ("time", "y", "x"), darr.squeeze().data
# replace points where all values are zero with NaNs
ds[imap] = ds[imap].where(
ds[imap].sum(dim="time", skipna=False) != 0, np.nan, drop=False
)
# add attributes for variable
ds[imap].attrs["group"] = darr.name
ds[imap].attrs["units"] = darr.attrs.get("units", "")
# drop coordinates that are not in the dimensions
drop_coords = [c for c in ds.coords if c not in ds.dims]
ds = ds.drop_vars(drop_coords)
# add attributes to dataset
ds.attrs["lineage"] = pathlib.Path(filename).name
grid_mapping = tmp.attrs.get("grid", "")
m = re.search(r"EPSG[:]?(\d+)", grid_mapping, re.IGNORECASE)
if m:
# extract crs parameters from dataset attributes
ds.attrs["crs"] = int(m.group(1))
else:
# add default downscaled crs for region
ds.attrs["crs"] = proj4_params["downscaled"][region]
# raise a value error if there are no variables in the dataset
if len(ds.data_vars) == 0:
raise ValueError(f"No variables found in dataset: {filename}")
# return the dataset
return ds