Source code for qa4sm_api.client_api

import json
import warnings
import requests
import time
import pandas as pd
from typing import Union, Optional, List
import os
import zipfile
from pathlib import Path
from datetime import datetime

from tornado.httpclient import HTTPError

from qa4sm_api.globals import (
    QA4SM_DOTRC_PATH,
    ValidationRunNotFoundError,
    AuthenticationError,
    ValidationInstanceError,
    _load_dotrc
)


[docs] class Response: def __init__(self, response, serialize=True): self.response = response if serialize: self.response = self.response.json() @property def data(self) -> List[dict]: if self.response is None: raise ValueError("No response data is available") elif isinstance(self.response, dict): response = [self.response] else: response = self.response return response @property def pandas(self) -> Union[pd.DataFrame, pd.Series]: """ Get all request return data as a DataFrame """ df = pd.DataFrame.from_dict(self.data, orient='columns') \ .set_index('id') \ .sort_index() if len(df.index) == 1: return df.iloc[0, :] # dtype: pd.Series else: return df # dtype: pd.DataFrame
[docs] class Access: def __init__(self, access): """ Initialize the Access class with credentials to the API. """ self.access = access def __getitem__(self, item): return self.access[item]
[docs] @classmethod def from_available(cls): # Try first if the environment variables are set try: return cls.from_env() except EnvironmentError: cls.from_dotrcfile()
[docs] @classmethod def from_env(cls): try: instance = os.environ["QA4SM_INSTANCE"] except KeyError: raise EnvironmentError("QA4SM_INSTANCE not found in environment " "variables.") try: token = os.environ["QA4SM_TOKEN"] except KeyError: raise EnvironmentError("QA4SM_TOKEN not found in environment " "variables.") access = {instance: token} return cls(access)
[docs] @classmethod def from_dotrcfile(cls, path=QA4SM_DOTRC_PATH): """ Load access from dotrc file. Parameters ---------- path: str Path to the dotrc file """ config = _load_dotrc(path) return cls(config)
[docs] @classmethod def with_token(cls, instance, token): """ Load access with token for a specific instance. Parameters ---------- instance: str QA4SM instance name token: str API token for the instance Returns ------- Access Access object with token """ return cls({instance: {'token': token}})
[docs] @classmethod def without_token(cls, instance): """ Create access for instance without token (limited access). Parameters ---------- instance: str QA4SM instance name Returns ------- Access Access object with None token (limited access) """ warnings.warn(f"No token was found {instance}, continue as anonymous " f"user (with limited access)") return cls({instance: {'token': None}})
[docs] class Session: """ Wrapper to send API request to QA4SM after authentication. """ def __init__(self, instance="qa4sm.eu", token="auto", protocol="https"): """ Session to send requests to QA4SM via API. Parameters ---------- instance: str, optional (default: 'qa4sm.eu') Base URL to send API requests to token: str, optional (default: None) To authenticate with QA4SM API pass your token. While some request work without authentication, all user-specific request need you to log in first. - "auto" (default) will try "file" first and continue with "none" otherwise. - "file" uses the token from the ~/.qa4smapirc file (error if not found). - "none" to force not setting a token even if .qa4smapirc is found. - "<token>" pass a valid token to use it directly (without checking .qa4smapirc) protocol: Literal['http', 'https'], optional (default: 'https') Developer setting. Use https for the public instances (test, prod) or http for local instances. e.g., 127.0.0.1:8000 """ self.headers = { "Content-Type": "application/json" } self.instance = instance self.base_url = f"{protocol}://{self.instance}/" self.api_url = self.base_url + "api/" self.client = requests.Session() self.response = None if token == "auto": self.access = Access.from_available() elif token == "file": self.access = Access.from_dotrcfile() elif token.lower() == "none": self.access = Access.without_token(self.instance) else: self.access = Access.with_token(self.instance, token) self.user = None if instance not in list(self.access.access.keys()): raise ValidationInstanceError(f"Unknown instance {instance}. " f"Please add it to your .qa4smapirc file.") else: token = self.access[instance]['token'] if token is not None: _ = self.login_with_token(token) else: warnings.warn("No token was passed, limited API access. " "Only public API calls are possible.")
[docs] def url(self, *args) -> str: # Join URL parts args = [str(a) for a in args] url = '/'.join(args).replace('//', '/') if url.endswith('/'): url = url[:-1] return self.api_url + url
[docs] def login_with_token(self, token, quiet=False) -> int: """ status_code 200: OK, login successful """ self.headers["Authorization"] = f"Token {token}" re = self.get(self.url("auth/login"), headers=self.headers) username = re.pandas['username'] if not quiet: print(f"Hi, {username}! You're successfully logged " f"in at {self.api_url}!") self.user = username self.access = Access({self.instance: {'token': token, 'username': self.user}}) return 200
[docs] def login_with_credentials(self, username=None, password=None, quiet=False): """ Authenticate with username and password to receive a token for subsequent requests. Parameters ---------- username: str Username for the chosen QA4SM instance. password: str Password for the chosen QA4SM instance. quiet: bool, optional (default: False) Suppress welcome message. Returns ------- token: str user login token """ data = {'username': username, 'password': password} try: response = self.post(self.url("auth/login"), data=data) except requests.exceptions.HTTPError as _: raise AuthenticationError( f"Login failed for {username} with the passed " f"credentials. Please make sure that the chosen " f"username/password combination is valid for " f"{self.instance}.") token = response.pandas['auth_token'] if token is None: raise AuthenticationError( f"User does not have a " f"token yet. Please request one first at " f"https://qa4sm.eu/ui/user-profile." ) self.login_with_token(token, quiet=quiet) return token
def _send_request(self, url, data=None, max_retries: int = 5, wait_time_s: float = 0.1, serialize=True, **kwargs) -> Response: """ Send request. Usually happens already during initialization. Except when the request is delayed. Will try again when a request fails temporarily. """ for attempt in range(max_retries): try: if data is None: response = requests.get(url, timeout=10, **kwargs) else: if self.headers is None: raise ValueError("No headers found to post request.") response = requests.post(url, headers=self.headers, json=data, timeout=10, **kwargs) response.raise_for_status() response = Response(response, serialize=serialize) self.response = response return response except requests.exceptions.RequestException as e: if attempt == max_retries - 1: # Last attempt raise e time.sleep(wait_time_s)
[docs] def post(self, url, data, *args, **kwargs) -> Response: """ Send a POST request to the API. Parameters ---------- url: str URL to send the request to data: dict Data to include in the request body Returns ------- Response Response from the API """ return self._send_request(url, data, *args, **kwargs)
[docs] def get(self, url, *args, **kwargs) -> Response: """ Send a GET request to the API. Parameters ---------- url: str URL to send the request to Returns ------- Response Response from the API """ return self._send_request(url, *args, **kwargs)
[docs] class ValidationConfiguration: def __init__(self, config_data: dict): self.data = config_data[0] if len(config_data) == 1 else config_data def __getitem__(self, item): return self.data[item] def __setitem__(self, key, value): if key not in self.data.keys(): raise KeyError(f"{key} is not in the configuration.") self.data[key] = value def __eq__(self, other): return self.data == other.data @property def empty(self): return len(self.data) == 0
[docs] def dump(self, path): """ Dump the configuration to a new json file. Parameters ---------- path: str or Path Path to json file to create """ with open(Path(path), 'w') as jfile: json.dump(self.data, jfile, indent=2)
[docs] @classmethod def from_remote(cls, run_id, **connection_kwargs): """ Load a configuration from an existing remote run. Configs are public, so it's not necessary to authenticate. Parameters ---------- run_id: str UID of the validation run at the chosen instance (see connection_kwargs). instance: str, optional (default: "qa4sm.eu") see qa4sm_api.client_api.Connection token: str, optional see qa4sm_api.client_api.Connection protocol: str, optional see qa4sm_api.client_api.Connection """ connection = Connection(**connection_kwargs) config = connection.download_configuration(run_id=run_id) return cls(config_data=config.data)
[docs] @classmethod def from_file(cls, path): """ Load a configuration from a json file. Parameters ---------- path: str or Path Path to the json file to load """ with open(Path(path), 'r', encoding='utf-8') as file: data = json.load(file) return cls(data)
[docs] class Connection: """ Communication with QA4SM. """ def __init__(self, instance: str="qa4sm.eu", token="auto", protocol="https"): """ Parameters ---------- instance: str, optional (default: "qa4sm.eu") service URL or IP:PORT, e.g - qa4sm.eu [productive] - test.qa4sm.eu [test] - test2.qa4sm.eu [test2] - 0.0.0.0:8000 [develop] token: str, optional Authentication user token (required to POST) - "auto" (default) will try "file" first and continue with "none" otherwise. - "file" will search for a token in a .qa4smapirc file - "none" will force not using a token (only public commands) - "<token>" will use the passed token directly protocol: str, optional Developer setting. Use https for the public instances (test, prod) or http for local instances. e.g., 127.0.0.1:8000 """ self.session = Session(instance, token, protocol)
[docs] def url(self, *args, **kwargs) -> str: return self.session.url(*args, **kwargs)
[docs] def login(self, username, password): self.session.login_with_credentials(username, password)
[docs] def user(self) -> pd.Series: re = self.session.get(self.url("auth/login"), headers=self.session.headers) return re.pandas
def _find(self, df, val): """ In the passed dataframe, find the rows (ids) that contain val. Will throw an error if the value is not found or if multiple are found. Parameters ---------- df: pd.DataFrame Pandas Dataframe to search val: Any Value to find Returns ------- id: int """ idx = df[df.eq(val).any(axis=1)].index.tolist() if len(idx) == 0: raise ValueError(f"{val} was not found. " f"Please pass a valid name or ID.") elif len(idx) > 1: raise ValueError(f"Multiple instances for {val} found. " f"Please pass a unique name or ID.") else: idx = idx[0] return int(idx)
[docs] def dataset_id(self, dataset: str) -> int: """ Get dataset ID from short name. Parameters ---------- dataset: str Dataset short name Returns ------- id: int Dataset ID """ datasets = self.datasets()[['short_name', 'pretty_name']] return self._find(datasets, dataset)
[docs] def version_id(self, version, dataset): """ Get the version ID for a passed version name Parameters ---------- version: str Version name dataset: str or int Dataset id or short name Returns ------- version_id: int Version ID """ versions = self.versions(dataset)[['short_name', 'pretty_name']] return self._find(versions, version)
[docs] def datasets(self) -> pd.DataFrame: """ Get a list of available datasets. """ r = self.session.get(self.url("dataset")) df = r.pandas return df
[docs] def versions(self, ds) -> pd.DataFrame: """ Get the version information for a dataset. Parameters ---------- ds: int or str The dataset index or short name to get versions for Returns ------- df: pd.DataFrame Version information for the chosen dataset """ ds_id = self.dataset_id(ds) if isinstance(ds, str) else ds datasets = self.datasets() version_ids = datasets.loc[ds_id, 'versions'] dfs = [] for vid in version_ids: r = self.session.get(self.url("dataset-version", vid)) row = r.pandas.T.to_frame().T dfs.append(row) return pd.concat(dfs, axis=0).sort_index()
[docs] def dataset_info(self, dataset: Union[str, int]) -> pd.Series: """ Get dataset metadata Parameters ---------- dataset: str or int Dataset name or ID Returns ------- ser: pd.Series """ if isinstance(dataset, str): ds_id = self.dataset_id(dataset) else: ds_id = dataset ser = self.datasets().loc[ds_id] ser['id'] = ds_id return ser
[docs] def version_info(self, version: Union[str, int], dataset: Optional[Union[str, int]] = None) -> pd.Series: """ Get version metadata Parameters ---------- version: str or int Version name or ID. dataset: str or int Dataset name or ID that the version belongs to. Only required when version is a string Returns ------- ser: pd.Series """ if isinstance(version, str): vers_id = self.version_id(version, dataset) else: vers_id = version re = self.session.get(self.url("dataset-version", vers_id)) ser = re.pandas ser['id'] = vers_id return ser
[docs] def variable_info(self, var_id) -> pd.Series: """ Get information about a dataset variable. Parameters ---------- var_id: int The ID of the dataset variable to get information for Returns ------- ds: pd.Series Information about the dataset variable """ re = self.session.get(self.url("dataset-variable", var_id)) ds = re.pandas ds['id'] = var_id return ds
[docs] def filter_info(self, filter_id): """ Get information about a data filter. Parameters ---------- filter_id: int The ID of the data filter to get information for Returns ------- ds: pd.Series Information about the data filter """ re = self.session.get(self.url("data-filter")) ds = re.pandas ds = ds.loc[filter_id, :] ds['id'] = filter_id return ds
[docs] def get_period(self, vers_id: int) -> (str, str): """ Get start and end date of selected dataset directly from the service """ ds = self.version_info(vers_id) return ds["time_range_start"], ds["time_range_end"]
[docs] def check_errors(self, validation_id): """Check if the passed validation run has ended with an error""" # TODO: Should be implemented in API raise NotImplementedError()
def _remote_val_status(self, validation_id): url = self.url(f"validation-runs-status/{validation_id}") response = self.session.get(url, headers=self.session.headers).data[0] return response def _remote_timing(self, validation_id): url = self.url(f"validation-runs-timing/{validation_id}") response = self.session.get(url, headers=self.session.headers).data[0] return response
[docs] def validation_exists(self, validation_id: str) -> bool: """ Check if a validation run exists online (running or finished, not deleted). """ try: _ = self._remote_val_status(validation_id) except HTTPError: return False return True
[docs] def validation_time(self, validation_id: str) -> \ (Union[datetime, None], Union[datetime, None]): """ Get start and end time when a validation run was processing. This works for finished OR running validations. Returns ------- start_time: datetime or None None means that the validation was not started """ if not self.validation_exists(validation_id): raise ValidationRunNotFoundError(validation_id) else: response = self._remote_timing(validation_id) start_time = pd.to_datetime(response["start_time"]).to_pydatetime() end_time = response["end_time"] if end_time is not None: end_time = pd.to_datetime(end_time).to_pydatetime() return start_time, end_time
[docs] def validation_duration(self, validation_id: str) -> (int, str): """ Get the duration of a validation run in seconds and formatted string. This works for finished OR running validations """ if not self.validation_exists(validation_id): raise ValidationRunNotFoundError(validation_id) else: url = self.url(f"validation-runs-timing/{validation_id}") response = self.session.get(url, headers=self.session.headers).data[0] duration_seconds = response["duration_seconds"] duration_format = response["duration_format"] return duration_seconds, duration_format
[docs] def validation_status(self, validation_id): """ Check if the passed validation run is still running, completed or is not found. Parameters ---------- validation_id: str Hash of the remote validation run to check. Returns ------- status: str Status of the validation run. Can be one of: - 'NOT FOUND': The validation id was not found - 'SCHEDULED': The validation is queued - 'RUNNING': The validation is still running - 'DONE': The validation is completed - 'CANCELLED': The validation was cancelled - 'ERROR': The validation failed with an error progress: int Progress of the validation run in percent (0-100). """ exists = self.validation_exists(validation_id) if not exists: return "NOT FOUND", 0 else: response = self._remote_val_status(validation_id) status = response['status'] progress = response['progress'] return status, progress
[docs] def run_validation(self, config): """ Trigger validation run based on the passed config. Parameters ---------- config: ValidationConfiguration Validation configuration to send to the service Returns ------- response: pd.Series Response from validation run """ re = self.session.post(self.url("start-validation"), data=config.data) return re.pandas
[docs] def download_configuration(self, run_id, out_dir=None): """ Download validation configuration used for a specific run. Parameters ---------- run_id: str UID of remote run to download configuration for out_dir: str, optional To save the config as a .json file, pass the storage path Returns ------- config: ValidationConfiguration Downloaded Configuration object """ url = self.url(f"validation-configuration/{run_id}") response = self.session.get(url) config = ValidationConfiguration(response.data) if out_dir is not None: config.dump(os.path.join(out_dir, f"{run_id}.json")) return config
[docs] def download_results(self, run_id, out_dir, force_download=False): """ Download all results for a run Parameters ---------- run_id: str UID of remote run to download results for out_dir: str or Path Where the results are stored, will be created if it doesn't exist yet. force_download: bool, optional Always download, replace any existing local files. If False, only downloads results that don't exist locally. """ out_dir = Path(out_dir) params = { "validationId": run_id, "fileType": "graphics", } os.makedirs(out_dir, exist_ok=True) graphx_dir = os.path.join(out_dir, "qa4sm_graphics") if force_download or (not os.path.exists(graphx_dir)): re = self.session.get(self.url("download-result"), serialize=False, params=params, stream=True) file_out = os.path.join(out_dir, "graphics.zip") with open(file_out, "wb") as file: for chunk in re.response.iter_content(chunk_size=8192): file.write(chunk) with zipfile.ZipFile(file_out, 'r') as zip_ref: zip_ref.extractall(graphx_dir) # Remove .zip after extraction os.remove(file_out) params["fileType"] = "netCDF" file_out = os.path.join(out_dir, f"{run_id}.nc") if force_download or (not os.path.exists(file_out)): re = self.session.get(self.url("download-result"), serialize=False, params=params, stream=True) with open(file_out, "wb") as file: for chunk in re.response.iter_content(chunk_size=8192): file.write(chunk) _ = params.pop("fileType") file_out = os.path.join(out_dir, f"summary_stats.csv") if force_download or (not os.path.exists(file_out)): re = self.session.get(self.url("download-statistics-csv"), serialize=False, params=params, stream=True) with open(file_out, "wb") as file: for chunk in re.response.iter_content(chunk_size=8192): file.write(chunk)
[docs] def run_config_validation(self, config_path, override=None): """ Trigger validation run based on the passed config. Parameters ---------- config_path: str Path to the config json to post. override: dict, optional (default: None) keys and values to override settings in the configuration. Returns ------- response: dict Response from validation run (or config if dry_run is True) """ config = ValidationConfiguration.from_file(config_path) if override is not None: for k, v in override.items(): if k not in config.data: raise KeyError(f"{k} does not exist in config.") else: config.data[k] = v response = self.run_validation(config) return response