# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: Copyright (c) 2026 TU Wien & AWST
# SPDX-FileCopyrightText: For a full list of authors, see the AUTHORS file.
import glob
import warnings
import pandas as pd
import shutil
from datetime import datetime
import os
import time
import re
import numpy as np
import yaml
from pathlib import Path
import subprocess
from typing import Union
import logging
from qa4sm_api.client_api import Connection
from qa4sm_autoreports.extent import GeographicExtent
import qa4sm_autoreports as utils
from qa4sm_autoreports.run import ValidationRun
from qa4sm_autoreports.data import (NetcdfMetaData, NetcdfData,
SummaryStatsData, ConfigData, RunData,
RemoteData, Data)
[docs]
class AutoReportCreator:
"""
Trigger multiple validation runs, check status, compile PDF.
"""
_STATUS_LUT = {
0: "Staged",
1: "Started",
2: "Processed",
3: "Collected",
4: "Compiled",
}
def __init__(self, runs, report_root):
"""
Parameters
----------
runs: list[ValidationRun, ...]
List of validation runs to use in the report
report_root: str or Path
Path where reports from this series are stored.
"""
self.report_root = Path(report_root)
self.name = str(self.report_root.name)
self.runs = self._collect_runs(runs) # dtype: dict[str, ValidationRun]
def _collect_runs(self, runs) -> dict:
_runs = {}
for run in runs:
name = run.name
i = 0
while name in _runs.keys():
name = run.name + f"({i})"
i += 1
_runs[name] = run
return _runs
[docs]
@classmethod
def from_scratch(cls,
report_root,
templates_path,
connection,
run_name_long=False,
force=False):
"""
Set up report creator from scratch, i.e. from template configs.
If report_root already exists, runs will be loaded from files.
Parameters
----------
report_root: str or Path
Path to the report folder (is created / overwritten)
templates_path: str or Path
Path where the config templates (json) are found (we use all
available files).
connection: Connection
QA4SM Connection
run_name_long: bool, optional
Instead of naming runs "runX", name them "run X - <template>" instead.
force: bool, optional
Force creating a new report_root from scratch
If False, an error is thrown if it exists.
"""
template_path = Path(templates_path)
report_root = Path(report_root)
if os.path.exists(report_root):
if force:
shutil.rmtree(report_root)
else:
warnings.warn("Report directory already exists. "
"Load runs from existing files.")
return cls.from_results(report_root)
os.makedirs(str(report_root))
templates = glob.glob(str(template_path / '*.json'))
if len(templates) == 0:
raise FileNotFoundError(f"No templates found in {template_path}")
runs = []
for i, template in enumerate(templates, start=1):
if run_name_long:
n = os.path.basename(template).replace('.json', '')
name = f"run {i} - {n}"
else:
name = f"run{i}"
os.makedirs(str(report_root / name), exist_ok=True)
instance = connection.session.instance
shutil.copy(template,
str(report_root / name / f"config-{instance}.json"))
run = ValidationRun.from_template(
str(report_root / name), connection=connection, name_tag=name)
runs.append(run)
return cls(runs, report_root)
[docs]
@classmethod
def from_results(cls, report_root, connection=None):
"""
Set up report creator from previously created local runs.
Parameters
----------
report_root: str or Path
Path to the report folder (is created / overwritten)
connection: Connection, optional
Connection to use for all runs. If None, connections will be
created based on the instance in each run's config file.
"""
report_root = Path(report_root)
run_dirs = glob.glob(str(report_root / 'run*'))
runs = []
for local_dir in run_dirs:
name_tag = os.path.basename(local_dir)
run = ValidationRun.from_results(
local_dir, connection=connection, name_tag=name_tag)
runs.append(run)
return cls(runs, report_root)
@property
def status(self) -> int:
"""
Status between all validation runs, returned as a numerical code in
order of progress
- 0 - Staged: Local setup created, not triggered online
- 1 - Started: All runs were triggered
- 2 - Processed: All runs have finished online
- 3 - Collected: All results were downloaded locally
- 4 - Compiled: PDF was created
"""
run_status = [r.status[0] for _, r in self.runs.items()]
if ("NOT FOUND" in run_status) or (len(run_status) == 0):
status = 0
complete = False
else: # either running or finished
status = 1
try:
complete = self.validations_complete()
except ValueError:
complete = False
if complete:
status = 2
if os.path.exists(self.report_root / 'ReportVars.yml'):
status = 3
pdfs = glob.glob(
str(self.report_root / 'pdf_report' / "*.pdf"))
if len(pdfs) > 0:
status = 4
return status
def __len__(self) -> int:
return len(self.runs)
def __getitem__(self, item: Union[int, str]) -> ValidationRun:
""" Can be used to select one of the loaded validation runs """
names = list(self.runs.keys())
if isinstance(item, int):
return self.runs[names[item]]
elif isinstance(item, str):
if item not in names:
raise KeyError(f"The run '{item}' is not part of "
f"the report. "
f"Use one of {list(self.runs.keys())}")
return self.runs[item]
else:
raise ValueError(f"Pass either run index or a "
f"name from {list(self.runs.keys())}.")
def __repr__(self):
s = ''
i = 0
for n, r in self.runs.items():
s += f"{i} [{r.status[0]}]: {n}\n"
i += 1
s += f"<AutoReportCreator <--> {self.report_root}>"
return s
@staticmethod
def _warn_incomplete():
warnings.warn("Skipping content collection as some runs are "
"incomplete.")
[docs]
def open_datasets(self) -> dict:
datasets = {}
for name, run in self.runs.items():
datasets[name] = run.open_dataset()
return datasets
[docs]
def validation_run_table(self, short_url=True):
"""
Create a table in .csv format that lists all validation runs for this
report.
Validation run; URL; Reference; Completed
#1; https://test.qa4sm.eu/ui/validation-result/e95eeaeb-1d2f-43c4-b019-b7f3b3dbd29e; ERA5-Land; December 2, 2025
Parameters
----------
short_url: bool, optional
URL as link, not full URL
Returns
-------
df: pd.DataFrame
A table containing the validation runs
"""
columns = ["Validation run", "URL", "Name", "Completed"]
records = []
for i, run in enumerate(list(self.runs.values()), start=1):
run.has_remote(raise_error=True)
url = run.get_results_url()
ds, vers, _ = run.get_reference('spatial')
time = run.timing()
ref = f"{ds} ({vers})"
name = "\\texttt{" + run.name + "}"
if short_url:
url = "\\href{" + url + "}{" + run.remote_id + "}"
else:
url = "\\url{" + url + "}"
if time['end'] is None:
enddate = "not finished"
else:
enddate = time['end'].strftime('%Y-%m-%d %H:%M')
records.append([f"\\#{i}", url, name, enddate])
df = pd.DataFrame.from_records(records, columns=columns)
return df
[docs]
def rollback(self, status=0):
"""
Roll back the report to the selected stage.
Parameters
----------
status: int
Target status after rollback.
"""
raise NotImplementedError()
[docs]
def override_params(self, **kwargs):
"""
Override parameters in all runs loaded for this report.
Parameters
----------
kwargs:
Kwargs are passed to each run's override_params method.
"""
for name, run in self.runs.items():
run.override_params(**kwargs)
[docs]
def verify_dataset_availability(self) -> bool:
"""
Verify for each run that that datasets cover the required period.
Returns
-------
avail: bool
True if all datasets are available for the requested period,
False otherwise.
"""
for name, run in self.runs.items():
avail = run.verify_period()
if not avail:
return False
return True
[docs]
def start_all_runs(self, delay=1, override=None):
"""
Trigger all validation runs with the run configurations currently
loaded in here (self.runs).
Use self.runs[i].start() to trigger them individually.
Parameters
----------
delay: int, optional (default: 1)
Delay in seconds between API calls to start a run.
override: dict, optional (default: None)
To override certain settings in all validation runs before
starting them, pass them here. Example::
{'interval_from': "2023-01-01", 'interval_to': "2023-03-31",
'min_lat': -17.0, 'max_lon': 150.0, ...}
"""
for name, run in self.runs.items(): # type: ValidationRun
if override is not None:
run.override_params(**override)
run.start()
time.sleep(delay)
[docs]
def validations_complete(self) -> bool:
"""
Check whether all remote runs have already completed.
Returns
-------
all_done : bool
False if at least one run is not complete yet, else True
"""
for name, run in self.runs.items():
run.has_remote(raise_error=True)
s, p = run.status
if not ((s == "DONE") and (p == 100)):
return False
return True
[docs]
def download_all_results(self, delay=1):
"""
Download all results from the server for all runs.
Parameters
----------
delay: int, optional (default: 1)
Delay in seconds between API calls to start a run.
"""
if self.validations_complete():
for name, run in self.runs.items():
run.download_data()
time.sleep(delay)
else:
self._warn_incomplete()
[docs]
def delete(self, remote=True):
"""
Delete all runs in this report.
Parameters
----------
local: bool, optional
Delete the remote version of the run
remote: bool, optional
Delete the local copy of the validation run
"""
for name, run in self.runs.items():
run.delete(remote=remote, local=True)
if os.path.exists(self.report_root):
shutil.rmtree(self.report_root)
[docs]
def collect_content(self, force_download=False):
"""
Collect all content variables for a given run. Write to single file.
Parameters
----------
force_download: bool, optional (default: False)
Always download new results. If this is False, only download
results if the don't yet exist.
"""
if self.validations_complete():
table = self.validation_run_table()
table.to_csv(
self.report_root / "val_run_list.csv", sep=';', index=False)
for i, run in enumerate(list(self.runs.values()), start=1):
# Download all required data from server
run.download_data(force_download=force_download)
# Make the coverage map plot
run.plot_extent()
# Collect various variables
all_vars = RunData(run)
all_vars.data['report_run_index'] = i
all_vars.data['remote_id'] = run.remote_id
config_data = ConfigData(run).collect()
all_vars.append(config_data)
nc_metadata = NetcdfMetaData(run).collect()
all_vars.append(nc_metadata)
nc_data = NetcdfData(run).collect()
all_vars.append(nc_data)
service_data = RemoteData(run).collect()
all_vars.append(service_data)
sum_data = SummaryStatsData(run).collect()
os.makedirs(
os.path.join(run.local_root, 'latex'), exist_ok=True)
sum_data.export_table(
os.path.join(run.local_root, 'latex', 'summary_stats.csv'))
all_vars.append(sum_data)
all_vars.dump(
os.path.join(run.local_root, 'ContentVars.yml'),
overwrite=True)
extents = [r.extent for _, r in self.runs.items()]
if len(extents) == 1:
common_extent = extents[0]
else:
common_extent = GeographicExtent.multi_intersection(*extents)
fig = common_extent.plot_map(global_map=True)
fig.savefig(
self.report_root / "common_extent.png", bbox_inches='tight')
def all_equal(*extents, tolerance=0.0):
return all(
extents[0].equals(e, tolerance) for e in extents[1:])
extents_equal = all_equal(*extents)
# ----------------------------------
# Common, non-run-specific variables
report_data = {
'compilation_date':
datetime.now().strftime("%Y-%m-%d %H:%M"),
'qa4sm_version':
all_vars.data["NetcdfMetaVars"]["qa4sm_version"],
'qa4sm_url':
list(self.runs.values())[-1].connection.session.base_url,
'interval_days':
all_vars.data["ConfigVars"]["interval_days"],
'interval_from':
all_vars.data["ConfigVars"]["interval_from"],
'interval_to':
all_vars.data["ConfigVars"]["interval_to"],
'count_runs':
len(self.runs),
'extents_equal':
extents_equal,
'common_area': [
common_extent.min_lat, common_extent.min_lon,
common_extent.max_lat, common_extent.max_lon
]
}
common_data = Data()
common_data.add(report_data, section='Common')
common_data.dump(
os.path.join(self.report_root, 'ReportVars.yml'),
overwrite=True)
else:
self._warn_incomplete()
@staticmethod
def _fix_apostrophe_keys(expr: str) -> str:
"""
Rewrite dict subscripts whose key contains an apostrophe from single-quoted
to double-quoted so eval() can parse them: ['PEARSON'S R'] -> ["PEARSON'S R"]
A character scan is needed because the apostrophe inside the key would
confuse any regex-based approach.
"""
out, i = [], 0
while i < len(expr):
if expr[i] == '[' and i + 1 < len(expr) and expr[i + 1] == "'":
j = i + 2
while j < len(expr):
if expr[j] == "'" and j + 1 < len(expr) and expr[j +
1] == ']':
key = expr[i + 2:j]
delim = '"' if "'" in key else "'"
out.append(f"[{delim}{key}{delim}]")
i = j + 2
break
j += 1
else:
out.append(expr[i])
i += 1
else:
out.append(expr[i])
i += 1
return "".join(out)
def _replacer(
self,
context: dict,
FMT_RE=re.compile(r"^(.*):([0-9+\- #]*\.?[0-9]*[bcdeEfFgGnosxX%])$")):
def replace(m: re.Match) -> str:
expr = self._fix_apostrophe_keys(m.group(1))
fmt = FMT_RE.match(expr)
return format(eval(fmt.group(1), {"__builtins__": {}}, context), fmt.group(2)) \
if fmt else str(eval(expr, {"__builtins__": {}}, context))
return replace
[docs]
def populate_latex(
self,
template_file: str or Path,
out_file: str or Path,
yaml_bindings: dict,
placeholder=re.compile(r"(?:\\detokenize\{)?\$<(.+?)>\$(?:\})?"),
) -> None:
"""
Populate run latex file with run data.
Parameters
----------
template_file : str or Path
Path to the run latex template
out_file: str or Path, optional
Path where the variables are stored (yaml bindings) and where the
output is written to.
yaml_bindings: dict
Specify the yaml bindings, if None is passed we use the default
bindings from the run and report root.
placeholder: re.Pattern, optional
Placeholder pattern to replace in the tex files.
the default looks like ``\\detokenize{$<...>$}`` and contains python
f-strings.
"""
context = {
name: yaml.safe_load(Path(path).read_text())
for name, path in yaml_bindings.items()
}
context["np"] = np
context["utils"] = utils
replacer = self._replacer(context)
tex = Path(template_file).read_text(encoding="utf-8")
tex = placeholder.sub(replacer, tex)
Path(out_file).write_text(tex, encoding="utf-8")
[docs]
def compile(self,
template_path,
main_tex="main.tex",
run_tex='run.tex',
tex_ignore=None,
from_scratch=False):
"""
Collect contents to compile PDF report from templates.
Parameters
----------
template_path: str or Path
Path where the templates latex files are stored.
main_tex: str, optional
Main tex file
run_tex: str, optional
Tex file template to use for runs (have separate yml bindings).
tex_ignore: list, optional
A list of tex files in the template path to ignore
from_scratch: bool, optional
Download and collect data, even if it already exists.
"""
tex_ignore = tex_ignore or []
self.collect_content(from_scratch) # todo: include!
template_path = Path(template_path)
for file in os.listdir(template_path):
if file.endswith(".tex"):
continue
full_path = os.path.join(template_path, file)
if os.path.isfile(full_path):
shutil.copy2(full_path, self.report_root)
yaml_bindings = {"ReportVars": self.report_root / "ReportVars.yml"}
for i, run in enumerate(list(self.runs.values()), start=1):
yaml_bindings[
f"Run{i}ContentVars"] = run.local_root / "ContentVars.yml"
for f in glob.glob(str(template_path / "*.tex")):
name = os.path.basename(f)
if (name == run_tex) or (name in tex_ignore):
continue
#out_name = name.replace('template_', '')
self.populate_latex(f, self.report_root / name, yaml_bindings)
for i, run in enumerate(list(self.runs.values()), start=1):
yaml_bindings["ContentVars"] = run.local_root / "ContentVars.yml"
#print(run.local_root)
self.populate_latex(template_path / run_tex,
run.local_root / run_tex, yaml_bindings)
os.makedirs(str(self.report_root / "pdf_report"), exist_ok=True)
try:
for i in range(4):
try:
ret = subprocess.run(
["pdflatex", "-interaction=nonstopmode", main_tex],
capture_output=True,
text=True,
check=True,
cwd=str(self.report_root),
timeout=100)
except subprocess.TimeoutExpired as e:
raise RuntimeError(
f"pdflatex timed out on run {i + 1} — likely caused by interactive error prompts. "
f"Check the .log file for lines starting with '!'"
) from e
if "! " in ret.stdout:
errors = [
line for line in ret.stdout.splitlines()
if line.startswith("! ")
]
raise RuntimeError(
f"pdflatex failed on run {i + 1} with errors:\n" +
"\n".join(errors))
if i == 0:
try:
subprocess.run(
["bibtex", main_tex.replace('.tex', '')],
capture_output=True,
text=True,
check=True,
cwd=str(self.report_root),
timeout=100)
except subprocess.TimeoutExpired as e:
raise RuntimeError(
f"bibtex timed out — likely caused by interactive error prompts."
) from e
if ret.returncode != 0:
logging.info("bibtex stdout: %s", ret.stdout)
print("STDOUT:", ret.stdout)
print("STDERR:", ret.stderr)
finally:
# Move the output files to pdf_report (always runs, even on failure)
pdf_out_dir = self.report_root / "pdf_report"
os.makedirs(str(pdf_out_dir), exist_ok=True)
for ext in [
'pdf', 'log', 'aux', 'out', 'tex', 'bib', 'bbl', 'blg'
]:
src = glob.glob(str(self.report_root / f"*.{ext}"))
for f in src:
if os.path.exists(f):
shutil.move(
str(f), str(pdf_out_dir / os.path.basename(f)))