{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": { "tags": [] }, "source": [ "# Processing Large Climate Datasets with Dask and Xarray\n", "Efficient data processing on PAVICS, or using Xarray/Dask in general, typically requires understanding two different notions of data *chunking*: 1) *on-disk* chunking or the way the data is stored in the filesystem and 2) *in-memory* chunking, or the way that Dask will break up a large dataset into manageable portions and process the data in parallel. In the case of *in-memory* chunks, it is possible to specify any size of in-memory chunking based on system resources and the analysis at hand. However, this does not guarantee efficient computation. Indeed, to efficiently process large datasets and prevent memory overloads, aligning in-memory chunks with the on-disk chunk structure is essential. This allows Dask to load and process data in a way that minimizes memory usage and I/O operations, speeding up computation.\n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "id": "1", "metadata": { "tags": [] }, "outputs": [], "source": [ "import shutil\n", "import time\n", "import warnings\n", "from pathlib import Path\n", "from tempfile import TemporaryDirectory\n", "\n", "import psutil\n", "import xarray as xr\n", "from dask import compute\n", "from dask.distributed import Client\n", "from IPython.display import clear_output\n", "\n", "# ignore warnings about xsdba\n", "warnings.simplefilter(\"ignore\", category=UserWarning)\n", "\n", "from xclim import atmos\n", "from xscen.io import save_to_zarr" ] }, { "cell_type": "markdown", "id": "2", "metadata": { "tags": [] }, "source": [ "## Working with Dask and Xarray\n", "\n", "### How to align memory chunks with on-disk chunks?\n", "\n", "We begin by checking the on-disk chunk structure of my dataset by loading it with `decode_times=False`. This skips time decoding and loads only metadata without reading the data into memory." ] }, { "cell_type": "code", "execution_count": 2, "id": "3", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1461 50 50]\n" ] } ], "source": [ "# open the daily ERA5-land dataset as an example\n", "url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", "\n", "# use decode_times=False to skip time decoding and load only metadata\n", "ds = xr.open_dataset(url, decode_times=False, chunks={})\n", "\n", "# `._ChunkSizes` attribute shows the chunk structure of the variable\n", "print(ds[\"tas\"]._ChunkSizes)" ] }, { "cell_type": "markdown", "id": "4", "metadata": { "tags": [] }, "source": [ "The `._ChunkSizes` attribute shows the chunk sizes of the `tas` variable on disk. I then re-open the dataset with matching Dask chunk sizes for efficient memory alignment. \n", "\n", "When you open a dataset using chunking, it is represented as Dask-backed arrays, where only the metadata is initially loaded, and the data itself remains on disk until needed. Dask uses lazy evalution, meaning it doesn't immediately perform the operation but instead builds a computation graph. This graph tracks the sequence of operations, delaying execution until `compute()` is called. During computation, Dask reads and processes data in chunks and loads only the necessary parts into memory. \n", "\n", "Here's an example showing the difference in computation times when the dataset is loaded with unaligned versus aligned chunking. In this example, I resample daily ERA5-Land `tas` data to yearly means. To keep the computation time manageable, I limit the calculation to a single year and focus on the Eastern North America domain. Given the large spatial coverage and shorter time span, a logical choice might be to chunk the data in-memory for a few days over the `time` dimension, while keeping the entire spatial domain in the `lat` and `lon` dimensions:" ] }, { "cell_type": "code", "execution_count": 3, "id": "5", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.11 s, sys: 5.7 s, total: 7.82 s\n", "Wall time: 3min 47s\n" ] } ], "source": [ "# NBVAL_SKIP\n", "\n", "# open dataset with unaligned chunks\n", "ds = xr.open_dataset(url, chunks={\"time\": 10, \"lat\": -1, \"lon\": -1})\n", "\n", "# resample the 'tas' variable to yearly means\n", "%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()\n", "del ds" ] }, { "cell_type": "markdown", "id": "6", "metadata": { "tags": [] }, "source": [ "However, this original chunking choice resulted in inefficient computation because the on-disk chunk size for the `time` dimension was much larger than 10. Processing such small chunks led to excessive overhead and slow performance. Instead of arbitrary chunking, I align the chunks with the on-disk structure by setting the `time` chunk size to match the smallest on-disk chunk and choosing spatial chunks in multiples of 50 to avoid creating too many small chunks:" ] }, { "cell_type": "code", "execution_count": 4, "id": "7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.87 s, sys: 3.3 s, total: 5.17 s\n", "Wall time: 29 s\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "# open dataset with aligned chunks\n", "ds = xr.open_dataset(url, chunks={\"time\": 366, \"lat\": 50 * 5, \"lon\": 50 * 5})\n", "\n", "# resample the 'tas' variable to yearly means\n", "%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()\n", "del ds" ] }, { "cell_type": "markdown", "id": "8", "metadata": {}, "source": [ "The computation time dropped from approximately 1 minute to 10 seconds after aligning the chunks, as this approach better managed memory and I/O operations, leading to faster execution." ] }, { "cell_type": "markdown", "id": "9", "metadata": {}, "source": [ "### How to do parallel processing with Dask Distributed Client?\n", "\n", "Processing large climate datasets can be computationally intensive and time-consuming due to their size and complexity. Parallel processing divides a computational task into smaller, independent subtasks that can be executed simultaneously across multiple CPU cores. This can minimize memory overhead and significantly reduce computation time. \n", "\n", "One way to implement parallel processing is through the Dask Distributed Client. Initializing a Dask Client establishes a connection to a system of interconnected computers or processors, known as a cluster. This connection enables interaction with two key components: the scheduler and the workers. The scheduler is the central component that manages task distribution across the workers while the workers are responsible for executing these tasks using one or more CPU cores and threads, depending on the configuration.\n", "\n", "A core is a physical processing unit within a CPU that can execute tasks independently. Modern CPUs often have multiple cores, allowing them to perform multiple operations in parallel. A thread, on the other hand, is the smallest sequence of programmed instructions that the CPU can manage independently. Threads run within cores, and each core can handle multiple threads through a process called multithreading. While cores provide true parallelism by handling separate tasks simultaneously, threads allow for more efficient use of each core by managing multiple tasks in a way that can overlap their execution. For example, while one thread performs calculations, another thread within the same core can handle data loading or I/O operations, ensuring the core remains fully utilized.\n", "\n", "Let's look at an example of resampling daily ERA5-Land data into yearly means for multiple variables using the Dask Distributed Client. To manage system resources effectively, especially since PAVICS is a shared environment, I set the total memory limit to 20 GB. After experimenting with different configurations, I found that using 5 workers, each with 2 threads and 4 GB memory, provides a good balance between reducing overhead and optimizing computation time. I track memory usage and task progress in real time by displaying the Dask dashboard.\n", "\n", "To create delayed tasks, I set `compute=False` with `to_zarr`. This delays the entire pipeline —from reading the data, performing computations, to writing the output— and allows Dask to optimize the execution plan. By scheduling tasks efficiently, Dask reduces unnecessary data movement and enables each worker to write data independently to Zarr format. This approach is much faster than using sequential formats like NetCDF, which don’t support parallel writes as effectively.\n", "\n", "Finally, to prevent memory leaks, which occur when the program retains memory it no longer needs, potentially slowing down the system or causing it to crash, I use the Client within a context manager. This ensures that resources are released after execution. " ] }, { "cell_type": "code", "execution_count": 5, "id": "10", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-a0483b16-34ed-11f1-8101-0242ac13001d

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

d2a57873

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Workers: 5\n", "
\n", " Total threads: 10\n", " \n", " Total memory: 18.63 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-2b9d6d63-de3b-444b-887a-e64fd8f708ac

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:40397\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:36279\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/46103/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:35331\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-cpu5t0h5\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:40409\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43857/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:43225\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-4i5ur90w\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:38241\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/35981/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:41913\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-qbki5m65\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:45039\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/36607/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:41539\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-ziyn1029\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 4

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:41939\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/38347/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:35215\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-66xkrwcw\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Total memory usage across all workers: 1.36 GB\n", "Total computation time: 29.26 seconds\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "start_time = time.time()\n", "\n", "# function to compute the yearly mean for each variable\n", "\n", "\n", "def compute_annual_mean(dataset, variable_name):\n", " var = dataset[variable_name]\n", " annual_mean = var.resample(time=\"YS\").mean()\n", " return annual_mean\n", "\n", "\n", "# set up Dask client within a context manager\n", "with Client(n_workers=5, threads_per_worker=2, memory_limit=\"4GB\") as client:\n", " # display the Dask Dashboard\n", " display(client)\n", "\n", " # open the dataset with on-disk chunking structure\n", " url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", " ds = xr.open_dataset(url, chunks={\"time\": 366, \"lat\": 50, \"lon\": 50})\n", "\n", " # focus on Eastern North America and a single year for this example\n", " ds = ds.sel(\n", " time=slice(\"1981-01-01\", \"1981-12-31\"), lat=slice(35, 65), lon=slice(-100, -60)\n", " )\n", "\n", " # define the variables to compute yearly means for\n", " variables = [\"tas\", \"tasmin\", \"tasmax\"]\n", "\n", " # create a list to hold delayed tasks\n", " tasks = []\n", " with TemporaryDirectory(prefix=\"output\") as tempdir:\n", " for var_name in variables:\n", " output_path = Path(tempdir).joinpath(\n", " f\"var_means/{var_name}_1981_yearly_mean.zarr\"\n", " )\n", " if not output_path.exists():\n", " yearly_mean = compute_annual_mean(ds, var_name)\n", " # save to Zarr with compute=False to get a delayed task object\n", " delayed_task = yearly_mean.chunk(\n", " {\"time\": -1, \"lat\": 50, \"lon\": 50}\n", " ).to_zarr(output_path, mode=\"w\", compute=False)\n", " tasks.append(delayed_task)\n", "\n", " # trigger the execution of all delayed tasks\n", " compute(*tasks)\n", "\n", " # fetch memory usage from all workers and display the total usage\n", " worker_memory = client.run(lambda: psutil.Process().memory_info().rss)\n", " total_memory = sum(worker_memory.values())\n", " print(f\"Total memory usage across all workers: {total_memory / 1e9:.2f} GB\")\n", "\n", "end_time = time.time()\n", "elapsed_time = end_time - start_time\n", "print(f\"Total computation time: {elapsed_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "Let’s look at a more complex analysis involving heatwave indicators. Here, I calculate two climate indicators, `heat_wave_total_length` and `heat_wave_frequency`, using the `atmos` submodule from the `xclim` library. Both indicators rely on the same input data (`tasmin` and `tasmax`), so I create a pipeline of delayed tasks, which minimizes I/O operations by keeping the data in memory until both indicators are calculated and saved." ] }, { "cell_type": "code", "execution_count": 6, "id": "12", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-b1bacb05-34ed-11f1-8101-0242ac13001d

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

f203e209

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Workers: 5\n", "
\n", " Total threads: 10\n", " \n", " Total memory: 18.63 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-cae23072-37b5-4dae-a0fe-a8059f08df09

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:44205\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:37081\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/45511/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:39183\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-a99kso_x\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:40235\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43067/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:35711\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-k3oh1t96\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:43197\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/41631/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:45713\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-pek18iw5\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:37749\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43889/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:38377\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-50h6ceka\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 4

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:40493\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/41791/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:37129\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-2rn36lk5\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Total memory usage across all workers: 1.40 GB\n", "Total computation time: 32.63 seconds\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "start_time = time.time()\n", "\n", "with Client(n_workers=5, threads_per_worker=2, memory_limit=\"4GB\") as client:\n", " display(client)\n", "\n", " # load data using on-disk chunk sizes\n", " url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", " ds = xr.open_dataset(url, chunks={\"time\": 366, \"lat\": 50, \"lon\": 50})\n", "\n", " # focus on Eastern North America and a single year\n", " ds = ds.sel(\n", " time=slice(\"1981-01-01\", \"1981-12-31\"), lat=slice(35, 65), lon=slice(-100, -60)\n", " )\n", "\n", " # variables 'tasmax' and 'tasmin' has 'cell_methods' set to 'time: point'\n", " # update it to conform to CF conventions expected by xclim\n", " ds[\"tasmax\"].attrs[\"cell_methods\"] = \"time: maximum\"\n", " ds[\"tasmin\"].attrs[\"cell_methods\"] = \"time: minimum\"\n", "\n", " # list of heatwave indicator functions\n", " indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]\n", "\n", " tasks = []\n", " with TemporaryDirectory(prefix=\"output1\") as tempdir1:\n", " for indicator in indicators:\n", " ds_out = xr.Dataset(\n", " attrs=ds.attrs\n", " ) # create a new dataset for each indicator\n", "\n", " out = indicator(ds=ds, freq=\"YS\")\n", "\n", " out = out.chunk({\"time\": -1, \"lat\": 50, \"lon\": 50})\n", " ds_out[out.name] = out\n", "\n", " output_path = Path(tempdir1).joinpath(f\"heatwaves_ex1/{out.name}_1981.zarr\")\n", " output_path.parent.mkdir(parents=True, exist_ok=True)\n", "\n", " if not output_path.exists():\n", " # save to Zarr with compute=False to get a delayed task object\n", " delayed_task = ds_out.to_zarr(output_path, mode=\"w\", compute=False)\n", " tasks.append(delayed_task)\n", "\n", " # trigger computation\n", " compute(*tasks)\n", "\n", " # fetch memory usage from all workers and display the total usage\n", " worker_memory = client.run(lambda: psutil.Process().memory_info().rss)\n", " total_memory = sum(worker_memory.values())\n", " print(f\"Total memory usage across all workers: {total_memory / 1e9:.2f} GB\")\n", "\n", "end_time = time.time()\n", "print(f\"Total computation time: {end_time - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "13", "metadata": { "tags": [] }, "source": [ "### What can we do when we have large task graphs / large memory footprint?\n", "One downside of using a fully delayed computation approach is that it can lead to the creation of large task graphs that are difficult to manage. This can result in excessive memory consumption as the Dask scheduler struggles to handle numerous interdependent tasks.\n", "\n", "To address this issue, we can simplify the task graph by computing and saving each indicator one at a time. This method ensures that Dask completes the calculation and writing of the first indicator, then releases the memory used for that computation before moving on to the second indicator. By processing each indicator sequentially, the memory footprint is reduced, and the scheduler has fewer tasks to manage at any given time." ] }, { "cell_type": "code", "execution_count": 7, "id": "14", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-c52f3be9-34ed-11f1-8101-0242ac13001d

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

143dede8

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Workers: 5\n", "
\n", " Total threads: 10\n", " \n", " Total memory: 18.63 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-c4de4a28-a3d0-4754-a8f4-497a0cd4babd

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:44979\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:36509\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/40021/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:46695\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-z_96j7u2\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:45709\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/36751/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:38459\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-jriey8s1\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:33615\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/39673/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:36995\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-evmg1c2o\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:34003\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/38473/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:40147\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-mg6bbifp\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 4

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:41439\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/32853/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:43441\n", "
\n", " Local directory: /tmp/dask-scratch-space/worker-agjf_zoz\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Total memory usage across all workers: 1.38 GB\n", "Total computation time: 39.99 seconds\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "start_time = time.time()\n", "\n", "with Client(n_workers=5, threads_per_worker=2, memory_limit=\"4GB\") as client:\n", " display(client)\n", "\n", " url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", " ds = xr.open_dataset(url, chunks={\"time\": 366, \"lat\": 50, \"lon\": 50})\n", "\n", " ds = ds.sel(\n", " time=slice(\"1981-01-01\", \"1981-12-31\"), lat=slice(35, 65), lon=slice(-100, -60)\n", " )\n", "\n", " ds[\"tasmax\"].attrs[\"cell_methods\"] = \"time: maximum\"\n", " ds[\"tasmin\"].attrs[\"cell_methods\"] = \"time: minimum\"\n", "\n", " indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]\n", "\n", " with TemporaryDirectory(prefix=\"output2\") as tempdir2:\n", " for indicator in indicators:\n", " ds_out = xr.Dataset(attrs=ds.attrs)\n", "\n", " out = indicator(ds=ds, freq=\"YS\")\n", "\n", " out = out.chunk({\"time\": -1, \"lat\": 50, \"lon\": 50})\n", " ds_out[out.name] = out\n", "\n", " output_path = Path(tempdir2).joinpath(f\"heatwaves_ex2/{out.name}_1981.zarr\")\n", " output_path.parent.mkdir(parents=True, exist_ok=True)\n", "\n", " if not output_path.exists():\n", " # save to Zarr, triggering computation immediately\n", " ds_out.to_zarr(output_path)\n", "\n", " # fetch memory usage from all workers and display the total usage\n", " worker_memory = client.run(lambda: psutil.Process().memory_info().rss)\n", " total_memory = sum(worker_memory.values())\n", " print(f\"Total memory usage across all workers: {total_memory / 1e9:.2f} GB\")\n", "\n", "end_time = time.time()\n", "print(f\"Total computation time: {end_time - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "15", "metadata": {}, "source": [ "Even with the sequential computation approach, there may be scenarios where the output data is still too large to write in a single step, for example when calculating heatwave indicators over the entire North America domain. In such cases, an effective strategy is to split the dataset into smaller, manageable spatial chunks, which would allow for more efficient processing and data writing.\n", "\n", "For instance, we can calculate the heatwave indicators over the entire dataset spanning North America, and then divide the results into smaller latitude bins (e.g., groups of 50 latitudes each). Each bin can then be processed and saved individually to a temporary `.zarr` file using Dask. Once all bins are saved, they can be merged back into a single dataset and written out as a final `.zarr` file. This method distributes the data processing load and helps minimize the strain on system resources." ] }, { "cell_type": "code", "execution_count": 8, "id": "16", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-d77141bd-34ee-11f1-8101-0242ac13001d

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

aa5a919e

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Workers: 0\n", "
\n", " Total threads: 0\n", " \n", " Total memory: 0 B\n", "
Status: closedUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-e519941a-9748-4634-bf35-d225816d3771

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:44891\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Total computation time: 440.55 seconds\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "start_time = time.time()\n", "\n", "url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", "ds = xr.open_dataset(url, chunks={\"time\": 366, \"lat\": 50, \"lon\": 50})\n", "\n", "# keep the entire North America domain and select a single year\n", "ds = ds.sel(time=slice(\"1981-01-01\", \"1981-12-31\"))\n", "\n", "ds[\"tasmax\"].attrs[\"cell_methods\"] = \"time: maximum\"\n", "ds[\"tasmin\"].attrs[\"cell_methods\"] = \"time: minimum\"\n", "\n", "# create an empty output dataset\n", "dsout = xr.Dataset(attrs=ds.attrs)\n", "\n", "# calculation on the entire dataset\n", "out = atmos.heat_wave_total_length(ds=ds, freq=\"YS\")\n", "dsout[out.name] = out\n", "\n", "with TemporaryDirectory(prefix=\"output3\") as tempdir3:\n", " outdir_ex3 = Path(tempdir3).joinpath(f\"heatwaves_ex3/spatial_ex\")\n", " outdir_ex3.mkdir(parents=True, exist_ok=True)\n", " outzarr3 = outdir_ex3.joinpath(f\"{out.name}_1981.zarr\")\n", "\n", " if not outzarr3.exists():\n", "\n", " # if data is too big to write in single step - make individual datasets by binning the `lat` dim (approx n=50 latitudes at a time)\n", " grp_dim = \"lat\"\n", " bins = round(len(dsout.lat) / 50)\n", " _, datasets = zip(*dsout.groupby_bins(grp_dim, bins))\n", " print(f\"Number of individual datasets: {len(datasets)}\")\n", " if sum([len(d[grp_dim]) for d in datasets]) == len(dsout[grp_dim]):\n", " print(\"Data is binned correctly.\")\n", "\n", " # export each chunk of 50-latitudes to a temporary location\n", " with TemporaryDirectory(prefix=\"output3_nested\") as outtmp:\n", " for ii, dds in enumerate(datasets):\n", " dds = dds.chunk(time=-1, lon=50, lat=50)\n", " filename = Path(outtmp).joinpath(f\"{ii}.zarr\")\n", " with Client(\n", " n_workers=5, threads_per_worker=2, memory_limit=\"4GB\"\n", " ) as client:\n", " display(client)\n", " save_to_zarr(\n", " ds=dds,\n", " filename=filename,\n", " mode=\"o\",\n", " )\n", " # clear the output for the next iteration\n", " clear_output(wait=True)\n", "\n", " # reassemble pieces and export joined\n", " inzarrs = sorted(list(filename.parent.glob(f\"*.zarr\")))\n", "\n", " # open the files as a combined multi-file dataset\n", " ds = xr.open_mfdataset(inzarrs, engine=\"zarr\", decode_timedelta=False)\n", "\n", " # define the final chunking configuration\n", " final_chunks = dict(time=-1, lon=50, lat=50)\n", "\n", " # save the final combined dataset\n", " tmpzarr = Path(outtmp).joinpath(outzarr3.name)\n", " with Client(n_workers=10) as c:\n", " display(client)\n", " save_to_zarr(\n", " ds=ds.chunk(final_chunks),\n", " filename=tmpzarr,\n", " mode=\"o\",\n", " )\n", " # move the final combined file to the output location\n", " outzarr3.parent.mkdir(exist_ok=True, parents=True)\n", " shutil.move(tmpzarr, outzarr3)\n", "\n", "end_time = time.time()\n", "print(f\"Total computation time: {end_time - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "17", "metadata": {}, "source": [ "Alternatively, if the goal is to calculate heatwave indicators over a longer time period but for a smaller spatial extent, memory usage can be managed more efficiently by splitting the data along the time dimension. In this example, I calculate the indicator over a 20-year period so, after performing the heatwave calculations, I split the results into 5-year intervals, save each interval as a smaller Zarr file, and reassemble them into the final dataset in the last step. You can try different interval sizes depending on your period of analysis.\n", "\n", "In addition, when initally loading the input dataset, I set the chunk size of the `time` dimension to 4 years and 1 day. This choice is based on the need to account for leap years, ensuring that each chunk contains a consistent number of days while spanning multiple years. Chunking by 4 years instead of 1 year improves computational efficiency and reduces excessive I/O operations for this specific case, as the heatwave indicator analysis spans multiple years. " ] }, { "cell_type": "code", "execution_count": 9, "id": "18", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-eedc2eef-34ee-11f1-8101-0242ac13001d

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

00edc20f

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Workers: 0\n", "
\n", " Total threads: 0\n", " \n", " Total memory: 0 B\n", "
Status: closedUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-9f90d0c4-1b94-4d9f-beb8-6656e8874fc2

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:39383\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Total computation time: 38.29 seconds\n" ] } ], "source": [ "# NBVAL_IGNORE_OUTPUT\n", "\n", "start_time = time.time()\n", "\n", "url = \"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml\"\n", "\n", "# set the chunk size of the time dimension to 4 years + 1 day\n", "ds = xr.open_dataset(url, chunks={\"time\": (365 * 4) + 1, \"lat\": 50, \"lon\": 50})\n", "\n", "# this time, let's focus our analysis on a smaller region and a longer time period\n", "ds = ds.sel(\n", " time=slice(\"1981-01-01\", \"1990-12-31\"),\n", " lat=slice(42.1, 65.0),\n", " lon=(slice(-80.0, -73.5)),\n", ")\n", "\n", "ds[\"tasmax\"].attrs[\"cell_methods\"] = \"time: maximum\"\n", "ds[\"tasmin\"].attrs[\"cell_methods\"] = \"time: minimum\"\n", "\n", "dsout = xr.Dataset(attrs=ds.attrs)\n", "\n", "out = atmos.heat_wave_total_length(ds=ds, freq=\"YS\")\n", "dsout[out.name] = out\n", "\n", "with TemporaryDirectory(prefix=\"output4\") as tempdir4:\n", " outdir4 = Path(tempdir4).joinpath(f\"heatwaves_ex3/temporal_ex\")\n", " outdir4.mkdir(parents=True, exist_ok=True)\n", " outzarr4 = outdir4.joinpath(f\"{out.name}_1981-1990.zarr\")\n", "\n", " if not outzarr4.exists():\n", "\n", " # resample into 5-year intervals\n", " _, datasets = zip(*dsout.resample(time=\"5YS\"))\n", " print(f\"Number of individual datasets: {len(datasets)}\")\n", "\n", " # export each 5-year chunk to a temporary location\n", " with TemporaryDirectory(prefix=\"output4_nested\") as outtmp:\n", " for ii, dds in enumerate(datasets):\n", " dds = dds.chunk(dict(time=-1, lon=50, lat=50))\n", " filename = Path(outtmp).joinpath(f\"{ii}.zarr\")\n", " with Client(\n", " n_workers=5, threads_per_worker=2, memory_limit=\"4GB\"\n", " ) as client:\n", " display(client)\n", " save_to_zarr(\n", " ds=dds,\n", " filename=filename,\n", " mode=\"o\",\n", " )\n", " # clear the output for the next iteration\n", " clear_output(wait=True)\n", "\n", " # reassemble the 5-year chunks into a single dataset\n", " inzarrs = sorted(list(filename.parent.glob(f\"*.zarr\")))\n", "\n", " # open the files as a combined multi-file dataset\n", " ds = xr.open_mfdataset(inzarrs, engine=\"zarr\", decode_timedelta=False)\n", "\n", " # define the final chunking configuration\n", " final_chunks = dict(time=12 * 50, lon=50, lat=50)\n", "\n", " # save the final combined dataset\n", " tmpzarr = Path(outtmp).joinpath(outzarr4.name)\n", " with Client(n_workers=10) as c:\n", " display(client)\n", " save_to_zarr(\n", " ds=ds.chunk(final_chunks),\n", " filename=tmpzarr,\n", " mode=\"o\",\n", " )\n", " # move the final combined file to the output location\n", " outzarr4.parent.mkdir(exist_ok=True, parents=True)\n", " shutil.move(tmpzarr, outzarr4)\n", "\n", "end_time = time.time()\n", "print(f\"Total computation time: {end_time - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "19", "metadata": {}, "source": [ "## Key takeaways from this tutorial:\n", "\n", "* Dask is a parallel computing library that enables efficient processing of large datasets and complex computations by distributing tasks across multiple cores or machines\n", "* When opening a dataset, align in-memory chunks with the dataset's on-disk chunking to minimize memory usage and reduce unnecessary I/O operations\n", "* Use delayed tasks to optimize the execution plan and save outputs in Zarr format, which supports parallel data writing\n", "* When dealing with large task graphs, simplify the process by handling tasks sequentially (e.g., process one climate indicator at a time) to keep the computation managable \n", "* If the data is too large to write in a single step, split the output into smaller, temporary Zarr files based on spatial or temporal segments relevant to your analysis; after splitting, reassemble these smaller files into the final Zarr file\n", "* Monitor memory usage and task progress using the Dask Dashboard, and adjust the number of workers, threads, and memory limits as needed to maintain balanced resource utilization" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.13" } }, "nbformat": 4, "nbformat_minor": 5 }