Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR]: Add Parallel Computing with Dask Jupyter Notebook #489

Merged
merged 45 commits into from
Jun 10, 2024

Conversation

tomvothecoder
Copy link
Collaborator

@tomvothecoder tomvothecoder commented May 23, 2023

Description

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • My changes generate no new warnings
  • Any dependent changes have been merged and published in downstream modules

If applicable:

  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass with my changes (locally and CI/CD build)
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have noted that this is a breaking change for a major release (fix or feature that would cause existing functionality to not work as expected)

@tomvothecoder tomvothecoder self-assigned this May 23, 2023
@tomvothecoder tomvothecoder added the type: docs Updates to documentation label May 23, 2023
@tomvothecoder tomvothecoder changed the title Add Parallel Computing with Dask Jupyrer Notebook Add Parallel Computing with Dask Jupyter Notebook May 23, 2023
@codecov-commenter
Copy link

codecov-commenter commented May 23, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (e69693d) to head (0d61eb5).

Current head 0d61eb5 differs from pull request most recent head b3ed34c

Please upload reports for the commit b3ed34c to get more accurate results.

Additional details and impacted files
@@            Coverage Diff            @@
##              main      #489   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           15        15           
  Lines         1542      1542           
=========================================
  Hits          1542      1542           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented May 23, 2023

Hi @xCDAT/core-developers, I need some help developing test cases for this Dask notebook.

Do any of you have any datasets with the following sizes in mind, and can you provide me a link to them on ESGF and locally on the Climate filesystem?

  • 1 GB to 10 GB
  • 10 GB to 25 GB
  • 25GB to 50 GB
  • 50 GB to 100 GB

I will be comparing the sequential and parallel performance of xCDAT's spatial and temporal APIs against CDAT.

@pochedls
Copy link
Collaborator

  • 7 GB: /p/css03/esgf_publish/CMIP6/CMIP/NCAR/CESM2/historical/r1i1p1f1/day/tas/gn/v20190308/
  • 17 GB: /p/css03/cmip5_css01/data/cmip5/output1/CNRM-CERFACS/CNRM-CM5/historical/day/atmos/day/r1i1p1/v20120530/ta/
  • 12 GB: /p/css03/esgf_publish/CMIP6/CMIP/MRI/MRI-ESM2-0/amip/r1i1p1f1/3hr/tas/gn/v20190829/
  • 22 GB: /p/css03/esgf_publish/CMIP6/CMIP/MOHC/UKESM1-0-LL/historical/r5i1p1f3/day/ta/gn/v20191115/
  • 50 GB: /p/css03/esgf_publish/CMIP6/CMIP/NCAR/CESM2/historical/r1i1p1f1/day/ta/gn/v20190308/
  • 74 GB: /p/css03/esgf_publish/CMIP6/CMIP/CCCma/CanESM5/historical/r1i1p2f1/CFday/ta/gn/v20190429/
  • 105 GB: /p/css03/esgf_publish/CMIP6/CMIP/MOHC/HadGEM3-GC31-MM/historical/r2i1p1f3/day/ta/gn/v20191218/

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Jun 6, 2023

105 GB: /p/css03/esgf_publish/CMIP6/CMIP/MOHC/HadGEM3-GC31-MM/historical/r2i1p1f3/day/ta/gn/v20191218/

Hey @pochedls, any chance you can find another 100GB multi-file dataset that has a different calendar type? This multi-file dataset has calendar="360_day" which is only compatible in Xarray using cftime objects. Dask does not support auto chunking on cftime because it is dtype=object (source 1, source 2).

UPDATE: Nevermind, I opened up the dataset using xc.open_mfdataset which works with this dataset unlike xr.open_mfdataset. I think this is because xCDAT sets decode_times=False when merging the datasets, chunks, then lazily decodes cftime objects (refer to comment below). I didn't realize xCDAT has this advantage over Xarray until now.

Example script

import os

import xarray as xr
import xcdat as xc


# 1. Set directory and get the absolute filepaths for each dataset
dir = (
    "/p/css03/esgf_publish/CMIP6/CMIP/MOHC/HadGEM3-GC31-MM/historical/r2i1p1f3"
    "/day/ta/gn/v20191218"
)

filepaths = []
for root, dirs, files in os.walk(os.path.abspath(dir)):
    for file in files:
        filepaths.append(os.path.join(root, file))

# 2. Attempt to open all of the files with auto chunking -- Breaks
# NotImplementedError: Can not use auto rechunking with object dtype. We are unable to estimate the size in bytes of object data
ds = xr.open_mfdataset(f"{dir}/*.nc", chunks="auto")

# 3. Check the dataset dtypes
ds = xr.open_dataset(filepaths[0])

print(ds.coords.dtypes)
# Frozen({'time': dtype('O'), 'plev': dtype('float64'), 'lat': dtype('float64'), 'lon': dtype('float64')})

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Jun 6, 2023

Some notes about Xarray loading cftime objects into memory because cftime.num2date is not vectorized:

  • The answer to source 1 states that cftime objects are loaded into memory, which I assume happens when they are decoded. Xarray decodes cftime objects using cftime.num2date, which is not vectorized.
  • In xCDAT, we implemented _decode_time and _get_cftime_coords which should lazily decode cftime objects unlike cftime.num2date. I recall measuring performance when opening multifile datasets with this function and noticed upfront improvements over the serial version.
  • There is an Xarray issue open to include the Pandas 2.0 nanosecond support. This support should allow us to move away from cftime to simplify the internal decoding logic significantly.

@tomvothecoder
Copy link
Collaborator Author

Notes from 5/24/23 meeting:

  • Peter will open up a GH discussions post on 3TB data + cmorizing
    • issue with 3 hourly data bombing after a few years, obscure error message
  • Jiwoo - 40 GB dataset, crashes
    • workaround read each year, get monthly time series from daily time series to get annual cycle
    • Jiwoo is looping over data, chunking lat/lon
    • Is the problem related to xCDAT or limitations to xarray/Dask?
    • Groupby is sequential, flox parallelism it

@tomvothecoder tomvothecoder changed the title Add Parallel Computing with Dask Jupyter Notebook [PR]: Add Parallel Computing with Dask Jupyter Notebook Feb 12, 2024
@tomvothecoder tomvothecoder marked this pull request as ready for review February 28, 2024 00:12
@tomvothecoder
Copy link
Collaborator Author

Hey @jasonb5 and @pochedls, this parallel computing notebook is ready for review. I tried to make the notebook easy to read for users with simple code examples. If we want more advanced code examples for things like horizontal regridding, we can either update this notebook or update other notebooks with Dask usage.

@jasonb5 can you make sure everything is accurate? And @pochedls can you review how well the notebook flows and if it is understandable. I'm also open to suggestions.

Here's the link: https://xcdat.readthedocs.io/en/doc-485-dask-guide/examples/parallel-computing-with-dask.html

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Mar 28, 2024

Todo (3/28/24)

  • Add content about requesting interactive jobs and starting a Dask cluster, then connecting to it via the Dask Client in Jupyter
  • Add call to .visualize() to show Dask task graph before calling .compute()

@tomvothecoder
Copy link
Collaborator Author

FYI @pochedls @jasonb5, I'm taking the Dask Training at NERSC and will update the notebook based on my experience.

I'll let you both know when it is ready.

@tomvothecoder tomvothecoder marked this pull request as draft March 28, 2024 18:10
@pochedls
Copy link
Collaborator

@tomvothecoder – as you work on this – I did skim this notebook, which looks like an amazing resource. Some things that I was wondering about:

  • Could you have jumped straight to xcdat in creating the monthly climatology? (Also – I'm not sure first xcdat calculation does anything)?
  • Would it make sense to use a larger dataset and try to illustrate the performance advantages in using dask?
  • Can warnings for client = Client() be suppressed?

@jasonb5
Copy link
Collaborator

jasonb5 commented Apr 24, 2024

@tomvothecoder The notebook looks great, lots of good information and resources.

I ran into something on Nimbus that could occur on other similar environments where memory_limit="auto" would set the per worker memory too low. I'm assuming this occurs because an environment like Nimbus doesn't correctly report the system memory. I was able to work past by setting the value to either 4G and 0.5.

Not sure if this should be addressed in the notebook but maybe the value should be set to a hard minimum requirement for the example e.g. 4G.

@tomvothecoder
Copy link
Collaborator Author

Thanks for the review @jasonb5! I'll add your suggestion and make a note about the memory_limit="auto" sometimes setting the limit too low for the workers, which can cause bottlenecks. I'll also add a suggestion for the user to know their memory availability so they maximize resource usage for their workers if needed.

@tomvothecoder tomvothecoder marked this pull request as ready for review April 30, 2024 16:59
@tomvothecoder
Copy link
Collaborator Author

Hey @pochedls, the notebook is ready for review. Here's a link to the current version of the notebook.

Summary of changes:

  • Updated env setup instructions to follow same structure as other notebooks
  • Moved resources links to the bottom of the notebook
  • Removed Xarray content
  • Added information about Dask schedulers and clusters
  • Split up xCDAT examples to parallelizing on local machine/login node and HPC/interactive node

RE: Your comments from above:

  1. Could you have jumped straight to xcdat in creating the monthly climatology? (Also – I'm not sure first xcdat calculation does anything)?
    • I removed the Xarray portion and focused on xCDAT parallelization. I made sure to call .compute() this time around.
  2. Would it make sense to use a larger dataset and try to illustrate the performance advantages in using dask?
    • I added a warning box to let the user know the notebook is intended on demonstrating Dask usage to get up and running quickly (hence small dataset), rather than Dask performance. I added resources and examples of configuring Dask for optimal performance, based on the user's needs and computational resources.
  3. Can warnings for client = Client() be suppressed?
    • The output is now much smaller since I requested only a few Dask workers instead of as many as possible.
    • It is a good idea to keep the Dask output so the user can familiarize themselves with it.

@tomvothecoder
Copy link
Collaborator Author

@tomvothecoder – thank you for putting together this notebook with a lot of helpful information. I am good with this PR. I have listed some minor detailed comments at the bottom and a suggestion for companion documentation next.

Companion Notebook: I think there is scope for a second parallel notebook (someday) that aims to show more xcdat-oriented practical examples (without the complications of the dask cluster stuff). I imagine it would do things like this:

* Download a dataset that is a few GBs to disk (e.g., a piControl file via wget)

* Show how chunking in time-versus-space affects performance for a given operation (e.g., spatial averaging probably needs time-chunks, but temporal averaging might do fine with space chunks?)

* Walk through how you might decide on a chunk size (e.g., this is a 4 GB dataset with 100 years or 1200 timepoints, so breaking it into decades [120 months], would give me pretty manageable 400 MB chunks to work on with 5 workers).

* Maybe show a `dask.delayed` example, e.g.,
  
  * First download a number of netcdfs (e.g., a CMIP historical tas simulation broken into ~12 files)
  * Do a glob to get the file list
  * Create a function that opens a file, computes the spatial average, returns the spatial average
  * run `results = dask.delayed(...)`
  * Use `xr.concat` to combine the results into one dataset
  * Compare `dask.delayed` to serial performance

* Talk about some xCDAT-specific parallelization considerations (e.g., the FAQs in this notebook)

If you agree a practical guide to parallelization in xcdat would be a nice complement, I can open an issue (maybe copying some of this comment).

I like your idea of having a companion notebook to showcase real-world examples of parallelizing xCDAT analysis code. You can open another issue for this.

I should have emphasized that the current notebook is intended to introduce scientists to the basic concepts of Dask while connecting it to Xarray and xCDAT (at a very high-level). There are a ton of great resources that are cited throughout notebook and I was hoping to consolidate the most important info in this notebook.


Addressed your list of comments below:

  • This notebook excerpts a large amount of information from other documentation pages (with attribution). To make this more clear, consider noting that you excerpt documentation from X, Y, Z sources (with links) under “Overview” so that this is extremely clear. Alternatively, you could consider distilling this information down further with xCDAT examples and context. -- added links under "Sources" cell near the top
  • Consider re-ordering the topics under Overview so that they appear in the same order as the notebook -- fixed
  • Bold “Orient your chunks” under “Dask Best Practices” -- fixed
  • Some text under “Xarray and Dask" may not be rendering (e.g., “How do I use Dark arrays in an ``xarray.Dataset’’” -- fixed
  • Should the questions under “Xarray and Dask" have question marks? -- fixed
  • The attribution to Xarray for the “Xarray and Dask” section is within a quote block (it should be outside the quote block) [This seems to apply to other sections as well] -- fixed
  • “Chunking in Xarray and xCDAT” repeats some of the advice from the “An Overview of Chunking in Dask.” Consider removing the Dask Chunking section and boiling down the key points into the “Chunking in Xarray and xCDAT” section (and link to the dask chunking overview). -- fixed to remove redundant information about chunking in "Chunking in Xarray and xCDAT"
  • Under “How to use chunks” – should you also list an integer chunking example? -- added this example
  • Under “1. Setup the Dask Cluster” – consider saying “Note” instead of “Notice” -- fixed
  • Under “threads_per_worker=1” – was there more to this description? -- fixed
  • Did you mean to show a screenshot with the output of client.cluster? -- fixed to show output of cell with cluster info
  • Under “Notice all of the sub-operations…”: Should this be “Dask dashboards UI”? -- removed this cell because it takes forever to run tas_avg.dask_graph()
  • Calling tas_avg.dask_graph() seemed to crash my Jupyter Notebook… -- removed this cell because it takes forever to run
  • Under the description for .compute(): I didn’t fully understand the difference between copy/compute. Is it that, for .compute(), a new dataset is returned to the user (i.e., a copy) instead of simply creating a reference to the original dataset (which sometimes happens in Python) – is this right? -- compute() returns a new xr.Dataset object, while .load() alters the original xr.Dataset object in memory. I updated the code to tas_avg_res = tas_avg.compute() with a comment that the user can also just run tas_avg.load() instead
  • Under FAQs: “followin" to “following” -- fixed

@tomvothecoder
Copy link
Collaborator Author

Need to address #662 before this PR can be merged

@tomvothecoder tomvothecoder force-pushed the doc/485-dask-guide branch 4 times, most recently from f3720b5 to 1302acb Compare June 10, 2024 20:49
@tomvothecoder tomvothecoder merged commit 9ad91a0 into main Jun 10, 2024
5 checks passed
@tomvothecoder tomvothecoder deleted the doc/485-dask-guide branch June 10, 2024 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good-first-issue Good first issue for new contributors type: docs Updates to documentation
Projects
Status: Done
4 participants