Source code for finch.processes.wps_xsubset_bbox_dataset

# noqa: D100
from pathlib import Path

from pywps.app import WPSRequest
from pywps.app.exceptions import ProcessError
from pywps.response.execute import ExecuteResponse

from . import wpsio
from .ensemble_utils import get_datasets, make_output_filename
from .subset import finch_subset_bbox
from .utils import (
    get_datasets_config,
    netcdf_file_list_to_csv,
    single_input_or_none,
    write_log,
    zip_files,
)
from .wps_base import FinchProcess


[docs] class SubsetBboxDatasetProcess(FinchProcess): """Subset a NetCDF file using bounding box geometry.""" def __init__(self): inputs = [ *wpsio.get_ensemble_inputs(), wpsio.lon0, wpsio.lon1, wpsio.lat0, wpsio.lat1, wpsio.start_date, wpsio.end_date, wpsio.output_format_netcdf_csv, ] outputs = [wpsio.output_netcdf_csv] super().__init__( self._handler, identifier="subset_bbox_dataset", title="Subset of a dataset, using a bounding box", version="0.1", abstract=( "For the given dataset, " "return the data for which grid cells intersect the " "bounding box for each input dataset as well as " "the time range selected." ), inputs=inputs, outputs=outputs, status_supported=True, store_supported=True, ) self.status_percentage_steps = { "start": 5, "subset": 7, "convert_to_csv": 90, "zip_outputs": 95, "done": 99, } def _handler(self, request: WPSRequest, response: ExecuteResponse): convert_to_csv = request.inputs["output_format"][0].data == "csv" if not convert_to_csv: del self.status_percentage_steps["convert_to_csv"] write_log(self, "Processing started", process_step="start") write_log(self, "Fetching datasets") variables = [v.data for v in request.inputs.get("variable", [])] or None scenario = single_input_or_none(request.inputs, "scenario") models = [m.data.strip() for m in request.inputs["models"]] dataset_name = single_input_or_none(request.inputs, "dataset") dataset = get_datasets_config()[dataset_name] request.inputs["resource"] = get_datasets( dataset, workdir=self.workdir, variables=variables, scenario=scenario, models=models, ) output_filename = Path(make_output_filename(self, request.inputs)) write_log( self, f"Running subset on {len(request.inputs['resource'])} resources.", process_step="subset", ) output_files = finch_subset_bbox( self, netcdf_inputs=request.inputs["resource"], request_inputs=request.inputs, ) if not output_files: message = "No data was produced when subsetting using the provided bounds." raise ProcessError(message) if convert_to_csv: write_log(self, "Converting outputs to csv", process_step="convert_to_csv") csv_files, metadata_folder = netcdf_file_list_to_csv( output_files, output_folder=Path(self.workdir), filename_prefix=output_filename, ) output_files = csv_files + [metadata_folder] write_log(self, "Zipping outputs", process_step="zip_outputs") output_zip = Path(self.workdir) / output_filename.with_suffix(".zip") def _log(message, percentage): write_log(self, message, subtask_percentage=percentage) zip_files(output_zip, output_files, log_function=_log) response.outputs["output"].file = output_zip write_log(self, "Processing finished successfully", process_step="done") return response