# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: Copyright (c) 2026 TU Wien & AWST
# SPDX-FileCopyrightText: For a full list of authors, see the AUTHORS file.
import glob
import pandas as pd
import requests.exceptions
from typing import Tuple, Union
import shutil
import os
from pathlib import Path
import xarray as xr
from qa4sm_api.client_api import Connection, ValidationConfiguration
from qa4sm_autoreports.extent import GeographicExtent
[docs]
class ValidationRun:
def __init__(
self,
config: ValidationConfiguration,
root_local: Union[str, Path],
connection: Connection,
remote_id=None,
name_tag=None,
):
"""
Parameters
----------
config: ValidationConfiguration
Configuration for validation run to trigger (settings)
root_local: Union[str, Path]
Local root folder, a subfolder for the validation run is created
connection: Connection
Connection to the QA4SM instance to run the validation on
remote_id: str, optional
Remote ID if the run already exists online
name_tag: str, optional
Name tag for the run. If None is passed, we use the tag from the
config file.
"""
self.config = config
self.local_root = Path(root_local)
self.connection = connection
self.remote_id = remote_id
self.name = self.update_name(name_tag or self.config['name_tag'])
def __repr__(self):
return (f"ValidationRun [{self.status[0]}]\n"
f" - name: {self.name}\n"
f" - remote_id: {self.remote_id}\n"
f" - local_root: {self.local_root}\n"
f" - connection: {self.connection}\n")
[docs]
@classmethod
def from_remote(cls, local_root: Union[str, Path], connection: Connection,
remote_id: str):
"""
Set up ValidationRun based on a remote validation run with a local
folder for synchronization.
Parameters
----------
local_root: str
Local folder where the run data is stored
connection: Connection
Service connection for your user
remote_id: str
Name of the remote run (UID).
Returns
-------
run : ValidationRun
"""
local_root = Path(local_root)
url = connection.url(f"validation-configuration/{remote_id}")
response = connection.session.get(url)
config = ValidationConfiguration(response.data[0])
cls._init_origin = 'remote'
return cls(config, local_root, connection, remote_id)
[docs]
@classmethod
def from_template(cls,
local_dir: Union[str, Path],
connection: Connection,
name_tag=None):
"""
Set up ValidationRun based on a previously synchronized, now local, run.
Parameters
----------
local_dir: Union[str, Path]
Local run folder containing at least the config.json or some
previously downloaded results.
connection: Connection
Connection to QA4SM instance to which the validation run should be
assigned.
name_tag: str, optional
Name to assign to the new run. If None is passed, the name
of the local_dir is used.
Returns
-------
run : ValidationRun
"""
local_dir = Path(local_dir)
conf_file = glob.glob(str(local_dir / "config-*.json"))
assert len(conf_file) == 1, \
f"Found multiple config files in {local_dir}"
conf_file = conf_file[0]
config = ValidationConfiguration.from_file(conf_file)
name_tag = name_tag or os.path.dirname(local_dir)
return cls(
config,
root_local=local_dir,
connection=connection,
remote_id=None,
name_tag=name_tag)
[docs]
@classmethod
def from_results(cls,
local_dir: Union[str, Path],
connection: Connection = None,
name_tag=None):
"""
Set up ValidationRun based on a previously synchronized, now local, run.
Uses: run_id, instance url from response/results files to restore a
connection.
Parameters
----------
local_dir: Union[str, Path]
Local run folder containing at least the config.json or some
previously downloaded results.
connection: Connection, optional
Connection to use for the run. If None, a new connection will be
created based on the instance in the config file.
name_tag: str, optional
Name to assign to the new run. If None is passed, the name
of the local_dir is used.
Returns
-------
run : ValidationRun
"""
local_dir = Path(local_dir)
conf_file = glob.glob(str(local_dir / "config-*.json"))
assert len(conf_file) == 1, \
f"No unique config file found in {local_dir}"
conf_file = conf_file[0]
instance = os.path.basename(conf_file).split('-')[1].replace(
'.json', '')
config = ValidationConfiguration.from_file(conf_file)
results_files = glob.glob(str(local_dir / "*.nc"))
response_file = glob.glob(str(local_dir / "response-*.csv"))
remote_id = None
if connection is None:
connection = Connection(instance)
if len(results_files) > 0:
assert len(results_files) == 1, \
f"Found multiple results netcdf files in {local_dir}"
remote_id = os.path.basename(results_files[0]).split('.nc')[0]
if len(response_file) > 0:
assert len(response_file) == 1, \
f"Found multiple response csv files in {local_dir}"
response_file = response_file[0]
response = pd.read_csv(response_file, index_col=0).squeeze()
remote_id = remote_id or response['pk']
if connection is None:
connection = Connection(response['instance'])
name_tag = name_tag or os.path.dirname(local_dir)
return cls(
config,
root_local=local_dir,
connection=connection,
remote_id=remote_id,
name_tag=name_tag)
def __eq__(self, other) -> bool:
return self.remote_id == other.remote_id
@property
def extent(self) -> (float, float, float, float):
# y_min, x_min, y_max, x_max
d = self.config.data
extent = GeographicExtent.from_corners(d['min_lat'], d['min_lon'],
d['max_lat'], d['max_lon'])
return extent
@property
def instance(self) -> str:
if self.remote_id is None:
return None
else:
return self.connection.session.instance
@property
def url(self):
"""Get the API URL of the validation run."""
if self.remote_id is None:
return None
else:
return self.connection.url(
f"validation-configuration/{self.remote_id}")
@property
def status(self) -> Tuple[str, int]:
"""
Check the status of the remote run.
Returns
-------
status[str], progress[int]
see :func:`Connection.validation_status`
"""
try:
s = self.connection.validation_status(self.remote_id)
except requests.exceptions.HTTPError:
s = ("unknown", 0)
return s
[docs]
def open_dataset(self) -> xr.Dataset:
"""
Read local netcdf data as xarray Dataset
"""
ncpath = self.local_root / f"{self.remote_id}.nc"
return xr.open_dataset(ncpath)
[docs]
def has_remote(self, raise_error: bool = False):
""" Check if the validation run has a remote counter part """
s = self.remote_id is not None
if not s and raise_error:
raise ValueError("Validation run has no remote counter part")
return s
[docs]
def get_results_url(self):
"""Get the UI URL of the validation run."""
if self.remote_id is None:
return None
else:
url = self.connection.url(f"validation-result/{self.remote_id}")
return url.replace('api', 'ui')
[docs]
def get_reference(self, reftype='spatial'):
"""
Get reference dataset for this run.
Parameters
----------
reftype: Literal['spatial', 'temporal', 'scaling']
What scaling reference to get
Returns
-------
dataset: str
Dataset name
version: str
Version name
variable: str
Variable name
"""
for conf in self.config.data["dataset_configs"]:
if conf[f'is_{reftype}_reference']:
dataset = int(conf['dataset_id'])
version = int(conf['version_id'])
variable = int(conf['variable_id'])
dataset = self.connection.dataset_info(dataset)['pretty_name']
version = self.connection.version_info(version)['pretty_name']
variable = self.connection.variable_info(
variable)['pretty_name']
return str(dataset), str(version), str(variable)
return None, None, None
[docs]
def load_results(self) -> xr.Dataset:
"""
Load downloaded results as xarray.
"""
ds = xr.open_dataset(self.local_root / f'{self.remote_id}.nc')
return ds
[docs]
def update_remote_id(self, pk):
if self.response is not None:
self.remote_id = pk
return self.remote_id
[docs]
def update_name(self, new_name: str):
self.config['name_tag'] = new_name
self.name = self.config['name_tag']
return self.name
[docs]
def setup_workdir(self, clear=False):
if self.local_root.exists() and clear:
shutil.rmtree(self.local_root)
os.makedirs(self.local_root, exist_ok=True)
[docs]
def override_params(self, **kwargs):
"""
Override certain parameters in the validation config file. Such as
name_tag and start/end date etc.
Parameters
----------
kwargs:
Keys and new values. Keys must already exist in the config. You
cannot add anything new, only change existing fields!
"""
for k, v in kwargs.items():
self.config[k] = v
[docs]
def verify_period(self):
"""
Checks if the chosen validation period is within the range available
for all datasets on the service.
Returns
-------
status: bool
True if all datasets are available, False otherwise
"""
period_start = pd.to_datetime(self.config['interval_from'])
period_end = pd.to_datetime(self.config['interval_to'])
for ds_config in self.config['dataset_configs']:
avail_start, avail_end = self.connection.get_period(
ds_config['version_id'])
avail_start = pd.to_datetime(avail_start)
avail_end = pd.to_datetime(avail_end)
if (period_start < avail_start) or (period_end > avail_end):
return False
return True
[docs]
def start(self):
"""
Start the current Validation Run on the chosen instance. Creates
a local folder and dumps the config and the response from the server
there.
Returns
-------
response: dict
Response from validation run
"""
self.setup_workdir(clear=True)
self.response = self.connection.run_validation(self.config)
run_pk = self.response['pk']
instance = self.connection.session.instance
self.config.dump(self.local_root / f'config-{instance}.json')
self.response['instance'] = instance
self.response.to_csv(self.local_root / f'response-{run_pk}.csv')
self.update_remote_id(self.response['pk'])
return self.response
[docs]
def timing(self) -> dict:
"""
Get timing information for the remote validation run
Returns
-------
time: dict
Time information as a dict
"""
status, progress = self.status
time = {'start': None, 'end': None, 'duration': None}
if status == 'NOT FOUND':
pass
else:
start_time, end_time = (
self.connection.validation_time(self.remote_id))
_, duration = self.connection.validation_duration(self.remote_id)
time['start'] = start_time
time['end'] = end_time
time['duration'] = duration
return time
[docs]
def download_data(self, force_download=False):
"""
Download the run's results, i.e., netcdf file, plots.
Parameters
----------
force_download: bool, optional
Always download, replace any existing local files.
If False, only downloads results that don't exist locally.
"""
os.makedirs(self.local_root, exist_ok=True)
self.config.dump(self.local_root /
f'config-{self.connection.session.instance}.json')
self.connection.download_results(
self.remote_id, self.local_root, force_download=force_download)
[docs]
def plot_extent(self, global_map=False):
"""
Create a map plot of the area covered by the validation run.
"""
os.makedirs(self.local_root, exist_ok=True)
path = self.local_root
fig = self.extent.plot_map(global_map)
fig.savefig(path / "extent.png", bbox_inches='tight')
[docs]
def delete(self, local=True, remote=True):
"""
Delete validation run. Online and/or offline.
Parameters
----------
local: bool, optional
Delete the remote version of the run
remote: bool, optional
Delete the local copy of the validation run
"""
if local:
if os.path.exists(self.local_root):
shutil.rmtree(self.local_root)
if remote:
self.connection.delete(self.remote_id)