Processing Large Climate Datasets with Dask and Xarray¶
Efficient data processing on PAVICS, or using Xarray/Dask in general, typically requires understanding two different notions of data chunking: 1) on-disk chunking or the way the data is stored in the filesystem and 2) in-memory chunking, or the way that Dask will break up a large dataset into manageable portions and process the data in parallel. In the case of in-memory chunks, it is possible to specify any size of in-memory chunking based on system resources and the analysis at hand. However, this does not guarantee efficient computation. Indeed, to efficiently process large datasets and prevent memory overloads, aligning in-memory chunks with the on-disk chunk structure is essential. This allows Dask to load and process data in a way that minimizes memory usage and I/O operations, speeding up computation.
import shutil
import time
import warnings
from pathlib import Path
from tempfile import TemporaryDirectory
import psutil
import xarray as xr
from dask import compute
from dask.distributed import Client
from IPython.display import clear_output
# ignore warnings about xsdba
warnings.simplefilter("ignore", category=UserWarning)
from xclim import atmos
from xscen.io import save_to_zarr
Working with Dask and Xarray¶
How to align memory chunks with on-disk chunks?¶
We begin by checking the on-disk chunk structure of my dataset by loading it with decode_times=False. This skips time decoding and loads only metadata without reading the data into memory.
# open the daily ERA5-land dataset as an example
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
# use decode_times=False to skip time decoding and load only metadata
ds = xr.open_dataset(url, decode_times=False, chunks={})
# `._ChunkSizes` attribute shows the chunk structure of the variable
print(ds["tas"]._ChunkSizes)
[1461 50 50]
The ._ChunkSizes attribute shows the chunk sizes of the tas variable on disk. I then re-open the dataset with matching Dask chunk sizes for efficient memory alignment.
When you open a dataset using chunking, it is represented as Dask-backed arrays, where only the metadata is initially loaded, and the data itself remains on disk until needed. Dask uses lazy evalution, meaning it doesn’t immediately perform the operation but instead builds a computation graph. This graph tracks the sequence of operations, delaying execution until compute() is called. During computation, Dask reads and processes data in chunks and loads only the necessary parts into memory.
Here’s an example showing the difference in computation times when the dataset is loaded with unaligned versus aligned chunking. In this example, I resample daily ERA5-Land tas data to yearly means. To keep the computation time manageable, I limit the calculation to a single year and focus on the Eastern North America domain. Given the large spatial coverage and shorter time span, a logical choice might be to chunk the data in-memory for a few days over the time dimension, while keeping the entire spatial domain in the lat and lon dimensions:
# NBVAL_SKIP
# open dataset with unaligned chunks
ds = xr.open_dataset(url, chunks={"time": 10, "lat": -1, "lon": -1})
# resample the 'tas' variable to yearly means
%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()
del ds
CPU times: user 2.11 s, sys: 5.7 s, total: 7.82 s
Wall time: 3min 47s
However, this original chunking choice resulted in inefficient computation because the on-disk chunk size for the time dimension was much larger than 10. Processing such small chunks led to excessive overhead and slow performance. Instead of arbitrary chunking, I align the chunks with the on-disk structure by setting the time chunk size to match the smallest on-disk chunk and choosing spatial chunks in multiples of 50 to avoid creating too many small chunks:
# NBVAL_IGNORE_OUTPUT
# open dataset with aligned chunks
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50 * 5, "lon": 50 * 5})
# resample the 'tas' variable to yearly means
%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()
del ds
CPU times: user 1.87 s, sys: 3.3 s, total: 5.17 s
Wall time: 29 s
The computation time dropped from approximately 1 minute to 10 seconds after aligning the chunks, as this approach better managed memory and I/O operations, leading to faster execution.
How to do parallel processing with Dask Distributed Client?¶
Processing large climate datasets can be computationally intensive and time-consuming due to their size and complexity. Parallel processing divides a computational task into smaller, independent subtasks that can be executed simultaneously across multiple CPU cores. This can minimize memory overhead and significantly reduce computation time.
One way to implement parallel processing is through the Dask Distributed Client. Initializing a Dask Client establishes a connection to a system of interconnected computers or processors, known as a cluster. This connection enables interaction with two key components: the scheduler and the workers. The scheduler is the central component that manages task distribution across the workers while the workers are responsible for executing these tasks using one or more CPU cores and threads, depending on the configuration.
A core is a physical processing unit within a CPU that can execute tasks independently. Modern CPUs often have multiple cores, allowing them to perform multiple operations in parallel. A thread, on the other hand, is the smallest sequence of programmed instructions that the CPU can manage independently. Threads run within cores, and each core can handle multiple threads through a process called multithreading. While cores provide true parallelism by handling separate tasks simultaneously, threads allow for more efficient use of each core by managing multiple tasks in a way that can overlap their execution. For example, while one thread performs calculations, another thread within the same core can handle data loading or I/O operations, ensuring the core remains fully utilized.
Let’s look at an example of resampling daily ERA5-Land data into yearly means for multiple variables using the Dask Distributed Client. To manage system resources effectively, especially since PAVICS is a shared environment, I set the total memory limit to 20 GB. After experimenting with different configurations, I found that using 5 workers, each with 2 threads and 4 GB memory, provides a good balance between reducing overhead and optimizing computation time. I track memory usage and task progress in real time by displaying the Dask dashboard.
To create delayed tasks, I set compute=False with to_zarr. This delays the entire pipeline —from reading the data, performing computations, to writing the output— and allows Dask to optimize the execution plan. By scheduling tasks efficiently, Dask reduces unnecessary data movement and enables each worker to write data independently to Zarr format. This approach is much faster than using sequential formats like NetCDF, which don’t support parallel writes as effectively.
Finally, to prevent memory leaks, which occur when the program retains memory it no longer needs, potentially slowing down the system or causing it to crash, I use the Client within a context manager. This ensures that resources are released after execution.
# NBVAL_IGNORE_OUTPUT
start_time = time.time()
# function to compute the yearly mean for each variable
def compute_annual_mean(dataset, variable_name):
var = dataset[variable_name]
annual_mean = var.resample(time="YS").mean()
return annual_mean
# set up Dask client within a context manager
with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
# display the Dask Dashboard
display(client)
# open the dataset with on-disk chunking structure
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})
# focus on Eastern North America and a single year for this example
ds = ds.sel(
time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
)
# define the variables to compute yearly means for
variables = ["tas", "tasmin", "tasmax"]
# create a list to hold delayed tasks
tasks = []
with TemporaryDirectory(prefix="output") as tempdir:
for var_name in variables:
output_path = Path(tempdir).joinpath(
f"var_means/{var_name}_1981_yearly_mean.zarr"
)
if not output_path.exists():
yearly_mean = compute_annual_mean(ds, var_name)
# save to Zarr with compute=False to get a delayed task object
delayed_task = yearly_mean.chunk(
{"time": -1, "lat": 50, "lon": 50}
).to_zarr(output_path, mode="w", compute=False)
tasks.append(delayed_task)
# trigger the execution of all delayed tasks
compute(*tasks)
# fetch memory usage from all workers and display the total usage
worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
total_memory = sum(worker_memory.values())
print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Total computation time: {elapsed_time:.2f} seconds")
Client
Client-a0483b16-34ed-11f1-8101-0242ac13001d
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status |
Cluster Info
LocalCluster
d2a57873
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Workers: 5 |
| Total threads: 10 | Total memory: 18.63 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-2b9d6d63-de3b-444b-887a-e64fd8f708ac
| Comm: tcp://127.0.0.1:40397 | Workers: 0 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:36279 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/46103/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:35331 | |
| Local directory: /tmp/dask-scratch-space/worker-cpu5t0h5 | |
Worker: 1
| Comm: tcp://127.0.0.1:40409 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43857/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:43225 | |
| Local directory: /tmp/dask-scratch-space/worker-4i5ur90w | |
Worker: 2
| Comm: tcp://127.0.0.1:38241 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/35981/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:41913 | |
| Local directory: /tmp/dask-scratch-space/worker-qbki5m65 | |
Worker: 3
| Comm: tcp://127.0.0.1:45039 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/36607/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:41539 | |
| Local directory: /tmp/dask-scratch-space/worker-ziyn1029 | |
Worker: 4
| Comm: tcp://127.0.0.1:41939 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/38347/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:35215 | |
| Local directory: /tmp/dask-scratch-space/worker-66xkrwcw | |
Total memory usage across all workers: 1.36 GB
Total computation time: 29.26 seconds
Let’s look at a more complex analysis involving heatwave indicators. Here, I calculate two climate indicators, heat_wave_total_length and heat_wave_frequency, using the atmos submodule from the xclim library. Both indicators rely on the same input data (tasmin and tasmax), so I create a pipeline of delayed tasks, which minimizes I/O operations by keeping the data in memory until both indicators are calculated and saved.
# NBVAL_IGNORE_OUTPUT
start_time = time.time()
with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
display(client)
# load data using on-disk chunk sizes
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})
# focus on Eastern North America and a single year
ds = ds.sel(
time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
)
# variables 'tasmax' and 'tasmin' has 'cell_methods' set to 'time: point'
# update it to conform to CF conventions expected by xclim
ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"
# list of heatwave indicator functions
indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]
tasks = []
with TemporaryDirectory(prefix="output1") as tempdir1:
for indicator in indicators:
ds_out = xr.Dataset(
attrs=ds.attrs
) # create a new dataset for each indicator
out = indicator(ds=ds, freq="YS")
out = out.chunk({"time": -1, "lat": 50, "lon": 50})
ds_out[out.name] = out
output_path = Path(tempdir1).joinpath(f"heatwaves_ex1/{out.name}_1981.zarr")
output_path.parent.mkdir(parents=True, exist_ok=True)
if not output_path.exists():
# save to Zarr with compute=False to get a delayed task object
delayed_task = ds_out.to_zarr(output_path, mode="w", compute=False)
tasks.append(delayed_task)
# trigger computation
compute(*tasks)
# fetch memory usage from all workers and display the total usage
worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
total_memory = sum(worker_memory.values())
print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")
end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")
Client
Client-b1bacb05-34ed-11f1-8101-0242ac13001d
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status |
Cluster Info
LocalCluster
f203e209
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Workers: 5 |
| Total threads: 10 | Total memory: 18.63 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-cae23072-37b5-4dae-a0fe-a8059f08df09
| Comm: tcp://127.0.0.1:44205 | Workers: 0 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:37081 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/45511/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:39183 | |
| Local directory: /tmp/dask-scratch-space/worker-a99kso_x | |
Worker: 1
| Comm: tcp://127.0.0.1:40235 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43067/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:35711 | |
| Local directory: /tmp/dask-scratch-space/worker-k3oh1t96 | |
Worker: 2
| Comm: tcp://127.0.0.1:43197 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/41631/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:45713 | |
| Local directory: /tmp/dask-scratch-space/worker-pek18iw5 | |
Worker: 3
| Comm: tcp://127.0.0.1:37749 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/43889/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:38377 | |
| Local directory: /tmp/dask-scratch-space/worker-50h6ceka | |
Worker: 4
| Comm: tcp://127.0.0.1:40493 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/41791/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:37129 | |
| Local directory: /tmp/dask-scratch-space/worker-2rn36lk5 | |
Total memory usage across all workers: 1.40 GB
Total computation time: 32.63 seconds
What can we do when we have large task graphs / large memory footprint?¶
One downside of using a fully delayed computation approach is that it can lead to the creation of large task graphs that are difficult to manage. This can result in excessive memory consumption as the Dask scheduler struggles to handle numerous interdependent tasks.
To address this issue, we can simplify the task graph by computing and saving each indicator one at a time. This method ensures that Dask completes the calculation and writing of the first indicator, then releases the memory used for that computation before moving on to the second indicator. By processing each indicator sequentially, the memory footprint is reduced, and the scheduler has fewer tasks to manage at any given time.
# NBVAL_IGNORE_OUTPUT
start_time = time.time()
with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
display(client)
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})
ds = ds.sel(
time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
)
ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"
indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]
with TemporaryDirectory(prefix="output2") as tempdir2:
for indicator in indicators:
ds_out = xr.Dataset(attrs=ds.attrs)
out = indicator(ds=ds, freq="YS")
out = out.chunk({"time": -1, "lat": 50, "lon": 50})
ds_out[out.name] = out
output_path = Path(tempdir2).joinpath(f"heatwaves_ex2/{out.name}_1981.zarr")
output_path.parent.mkdir(parents=True, exist_ok=True)
if not output_path.exists():
# save to Zarr, triggering computation immediately
ds_out.to_zarr(output_path)
# fetch memory usage from all workers and display the total usage
worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
total_memory = sum(worker_memory.values())
print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")
end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")
Client
Client-c52f3be9-34ed-11f1-8101-0242ac13001d
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status |
Cluster Info
LocalCluster
143dede8
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Workers: 5 |
| Total threads: 10 | Total memory: 18.63 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-c4de4a28-a3d0-4754-a8f4-497a0cd4babd
| Comm: tcp://127.0.0.1:44979 | Workers: 0 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:36509 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/40021/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:46695 | |
| Local directory: /tmp/dask-scratch-space/worker-z_96j7u2 | |
Worker: 1
| Comm: tcp://127.0.0.1:45709 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/36751/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:38459 | |
| Local directory: /tmp/dask-scratch-space/worker-jriey8s1 | |
Worker: 2
| Comm: tcp://127.0.0.1:33615 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/39673/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:36995 | |
| Local directory: /tmp/dask-scratch-space/worker-evmg1c2o | |
Worker: 3
| Comm: tcp://127.0.0.1:34003 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/38473/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:40147 | |
| Local directory: /tmp/dask-scratch-space/worker-mg6bbifp | |
Worker: 4
| Comm: tcp://127.0.0.1:41439 | Total threads: 2 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/32853/status | Memory: 3.73 GiB |
| Nanny: tcp://127.0.0.1:43441 | |
| Local directory: /tmp/dask-scratch-space/worker-agjf_zoz | |
Total memory usage across all workers: 1.38 GB
Total computation time: 39.99 seconds
Even with the sequential computation approach, there may be scenarios where the output data is still too large to write in a single step, for example when calculating heatwave indicators over the entire North America domain. In such cases, an effective strategy is to split the dataset into smaller, manageable spatial chunks, which would allow for more efficient processing and data writing.
For instance, we can calculate the heatwave indicators over the entire dataset spanning North America, and then divide the results into smaller latitude bins (e.g., groups of 50 latitudes each). Each bin can then be processed and saved individually to a temporary .zarr file using Dask. Once all bins are saved, they can be merged back into a single dataset and written out as a final .zarr file. This method distributes the data processing load and helps minimize the strain on system resources.
# NBVAL_IGNORE_OUTPUT
start_time = time.time()
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})
# keep the entire North America domain and select a single year
ds = ds.sel(time=slice("1981-01-01", "1981-12-31"))
ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"
# create an empty output dataset
dsout = xr.Dataset(attrs=ds.attrs)
# calculation on the entire dataset
out = atmos.heat_wave_total_length(ds=ds, freq="YS")
dsout[out.name] = out
with TemporaryDirectory(prefix="output3") as tempdir3:
outdir_ex3 = Path(tempdir3).joinpath(f"heatwaves_ex3/spatial_ex")
outdir_ex3.mkdir(parents=True, exist_ok=True)
outzarr3 = outdir_ex3.joinpath(f"{out.name}_1981.zarr")
if not outzarr3.exists():
# if data is too big to write in single step - make individual datasets by binning the `lat` dim (approx n=50 latitudes at a time)
grp_dim = "lat"
bins = round(len(dsout.lat) / 50)
_, datasets = zip(*dsout.groupby_bins(grp_dim, bins))
print(f"Number of individual datasets: {len(datasets)}")
if sum([len(d[grp_dim]) for d in datasets]) == len(dsout[grp_dim]):
print("Data is binned correctly.")
# export each chunk of 50-latitudes to a temporary location
with TemporaryDirectory(prefix="output3_nested") as outtmp:
for ii, dds in enumerate(datasets):
dds = dds.chunk(time=-1, lon=50, lat=50)
filename = Path(outtmp).joinpath(f"{ii}.zarr")
with Client(
n_workers=5, threads_per_worker=2, memory_limit="4GB"
) as client:
display(client)
save_to_zarr(
ds=dds,
filename=filename,
mode="o",
)
# clear the output for the next iteration
clear_output(wait=True)
# reassemble pieces and export joined
inzarrs = sorted(list(filename.parent.glob(f"*.zarr")))
# open the files as a combined multi-file dataset
ds = xr.open_mfdataset(inzarrs, engine="zarr", decode_timedelta=False)
# define the final chunking configuration
final_chunks = dict(time=-1, lon=50, lat=50)
# save the final combined dataset
tmpzarr = Path(outtmp).joinpath(outzarr3.name)
with Client(n_workers=10) as c:
display(client)
save_to_zarr(
ds=ds.chunk(final_chunks),
filename=tmpzarr,
mode="o",
)
# move the final combined file to the output location
outzarr3.parent.mkdir(exist_ok=True, parents=True)
shutil.move(tmpzarr, outzarr3)
end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")
Client
Client-d77141bd-34ee-11f1-8101-0242ac13001d
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status |
Cluster Info
LocalCluster
aa5a919e
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Workers: 0 |
| Total threads: 0 | Total memory: 0 B |
| Status: closed | Using processes: True |
Scheduler Info
Scheduler
Scheduler-e519941a-9748-4634-bf35-d225816d3771
| Comm: tcp://127.0.0.1:44891 | Workers: 0 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Total computation time: 440.55 seconds
Alternatively, if the goal is to calculate heatwave indicators over a longer time period but for a smaller spatial extent, memory usage can be managed more efficiently by splitting the data along the time dimension. In this example, I calculate the indicator over a 20-year period so, after performing the heatwave calculations, I split the results into 5-year intervals, save each interval as a smaller Zarr file, and reassemble them into the final dataset in the last step. You can try different interval sizes depending on your period of analysis.
In addition, when initally loading the input dataset, I set the chunk size of the time dimension to 4 years and 1 day. This choice is based on the need to account for leap years, ensuring that each chunk contains a consistent number of days while spanning multiple years. Chunking by 4 years instead of 1 year improves computational efficiency and reduces excessive I/O operations for this specific case, as the heatwave indicator analysis spans multiple years.
# NBVAL_IGNORE_OUTPUT
start_time = time.time()
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
# set the chunk size of the time dimension to 4 years + 1 day
ds = xr.open_dataset(url, chunks={"time": (365 * 4) + 1, "lat": 50, "lon": 50})
# this time, let's focus our analysis on a smaller region and a longer time period
ds = ds.sel(
time=slice("1981-01-01", "1990-12-31"),
lat=slice(42.1, 65.0),
lon=(slice(-80.0, -73.5)),
)
ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"
dsout = xr.Dataset(attrs=ds.attrs)
out = atmos.heat_wave_total_length(ds=ds, freq="YS")
dsout[out.name] = out
with TemporaryDirectory(prefix="output4") as tempdir4:
outdir4 = Path(tempdir4).joinpath(f"heatwaves_ex3/temporal_ex")
outdir4.mkdir(parents=True, exist_ok=True)
outzarr4 = outdir4.joinpath(f"{out.name}_1981-1990.zarr")
if not outzarr4.exists():
# resample into 5-year intervals
_, datasets = zip(*dsout.resample(time="5YS"))
print(f"Number of individual datasets: {len(datasets)}")
# export each 5-year chunk to a temporary location
with TemporaryDirectory(prefix="output4_nested") as outtmp:
for ii, dds in enumerate(datasets):
dds = dds.chunk(dict(time=-1, lon=50, lat=50))
filename = Path(outtmp).joinpath(f"{ii}.zarr")
with Client(
n_workers=5, threads_per_worker=2, memory_limit="4GB"
) as client:
display(client)
save_to_zarr(
ds=dds,
filename=filename,
mode="o",
)
# clear the output for the next iteration
clear_output(wait=True)
# reassemble the 5-year chunks into a single dataset
inzarrs = sorted(list(filename.parent.glob(f"*.zarr")))
# open the files as a combined multi-file dataset
ds = xr.open_mfdataset(inzarrs, engine="zarr", decode_timedelta=False)
# define the final chunking configuration
final_chunks = dict(time=12 * 50, lon=50, lat=50)
# save the final combined dataset
tmpzarr = Path(outtmp).joinpath(outzarr4.name)
with Client(n_workers=10) as c:
display(client)
save_to_zarr(
ds=ds.chunk(final_chunks),
filename=tmpzarr,
mode="o",
)
# move the final combined file to the output location
outzarr4.parent.mkdir(exist_ok=True, parents=True)
shutil.move(tmpzarr, outzarr4)
end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")
Client
Client-eedc2eef-34ee-11f1-8101-0242ac13001d
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status |
Cluster Info
LocalCluster
00edc20f
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Workers: 0 |
| Total threads: 0 | Total memory: 0 B |
| Status: closed | Using processes: True |
Scheduler Info
Scheduler
Scheduler-9f90d0c4-1b94-4d9f-beb8-6656e8874fc2
| Comm: tcp://127.0.0.1:39383 | Workers: 0 |
| Dashboard: https://pavics.ouranos.ca/jupyter/user/smith/proxy/8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Total computation time: 38.29 seconds
Key takeaways from this tutorial:¶
Dask is a parallel computing library that enables efficient processing of large datasets and complex computations by distributing tasks across multiple cores or machines
When opening a dataset, align in-memory chunks with the dataset’s on-disk chunking to minimize memory usage and reduce unnecessary I/O operations
Use delayed tasks to optimize the execution plan and save outputs in Zarr format, which supports parallel data writing
When dealing with large task graphs, simplify the process by handling tasks sequentially (e.g., process one climate indicator at a time) to keep the computation managable
If the data is too large to write in a single step, split the output into smaller, temporary Zarr files based on spatial or temporal segments relevant to your analysis; after splitting, reassemble these smaller files into the final Zarr file
Monitor memory usage and task progress using the Dask Dashboard, and adjust the number of workers, threads, and memory limits as needed to maintain balanced resource utilization