Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up Bucket get_min and get_max #368

Merged
merged 28 commits into from
May 6, 2022
Merged

Conversation

zxdawn
Copy link
Member

@zxdawn zxdawn commented Jun 5, 2021

  • Tests added
  • Tests passed
  • Passes git diff origin/main **/*py | flake8 --diff
  • Fully documented

The pandas method is quite slow. I come up with the faster method by combing numpy.ufunc.reduceat() and xr.apply_ufunc().

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@zxdawn
Copy link
Member Author

zxdawn commented Jun 5, 2021

The method sounds reasonable to me, but the result is wrong:
image

It seems that get_min and get_max functions are applied over some part of the data repeatedly ...
Do you have any ideas? @pnuu

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@zxdawn
Copy link
Member Author

zxdawn commented Jun 6, 2021

@pnuu I fixed the bug and the result looks well now:
image

However, the speed is slow again ...

I created a simple example below to test what leads to slow speed.
The pure numpy is much quicker (x4300) than dask method!
Any ideas? @djhoese @mraspaud

To reproduce this example, you can copy them into jupyter notebook cells and execute them.
Here's the illustration to help you understand what's going on:
bin_min

Cell1: Create sample data

Click to show the code
import random
import numpy as np
import xarray as xr
import pandas as pd
import dask.array as da

# create example data
out_size = int(1e5)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

weights = idxs
slices = np.digitize(idxs, bins)

Cell2: Calculate bin_min using pure numpy

Click to show the code
%%timeit
# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]

# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]

unique_slices, unique_index = np.unique(valid_slices_sort, return_index=True)
res_sub = np.minimum.reduceat(valid_weights_sort, unique_index)

# initilize result
res = xr.DataArray(np.full((out_size), np.nan), dims=['x'])
res.loc[unique_slices-1] = res_sub
9.6 ms ± 34.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Cell3: Calculate bin_min using dask and xarray

Click to show the code
def numpy_reduceat(data, bins, statistic_method):
    '''Calculate the bin_statistic using numpy.ufunc.reduceat'''
    if statistic_method == 'min':
        return np.minimum.reduceat(data, bins)
    elif statistic_method == 'max':
        return np.maximum.reduceat(data, bins)

# convert weights to DataArray using idxs as coords
weights = xr.DataArray(da.from_array(weights), coords=[idxs], dims=['x'])
slices = xr.DataArray(da.from_array(np.digitize(idxs, bins)), dims=['x'])

%%timeit
# convert weights to DataArray using idxs as coords
mask = xr.DataArray((idxs >= bins.min()) & (idxs <= bins.max()), dims=['x'])
valid_weights = weights.where(mask, drop=True)
valid_slices = slices.where(mask, drop=True)

# sort the slices
sort_index = da.map_blocks(np.argsort, valid_slices.data)
valid_slices = valid_slices[sort_index]
valid_weights = valid_weights[sort_index]

# get the unique slices (for assignment later) and bins (for numpy_reduceat)
unique_slices, unique_bins = da.unique(valid_slices.data, return_index=True)
statistics_sub = xr.apply_ufunc(numpy_reduceat,
                                valid_weights,
                                unique_bins.compute_chunk_sizes(),
                                kwargs={'statistic_method': 'min'},
                                input_core_dims=[['x'], ['new_x']],
                                exclude_dims=set(('x',)),
                                output_core_dims=[['new_x'],],
                                dask="parallelized",
                                output_dtypes=[weights.dtype],
                                dask_gufunc_kwargs={'allow_rechunk': True},
                            )

# initialize the output DataArray with np.nan
statistics = xr.DataArray(da.from_array(np.full((out_size), np.nan)), dims=['x'])
# assign the binned statistics
statistics.loc[unique_slices.astype('int')-1] = statistics_sub
statistics.compute()
41.3 s ± 91.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Cell4: Calculate bin_min using pandas

Click to show the code
%%timeit
xr_data = xr.DataArray(idxs, dims=['x'])
df = pd.DataFrame({'values': idxs})
groups = df.groupby(slices)
bin_min = groups['values'].min()
bin_min = (bin_min + pd.Series(np.zeros(out_size)))#.fillna(0)
6.88 s ± 236 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Jun 15, 2021

Codecov Report

Merging #368 (8106676) into main (1928289) will increase coverage by 0.06%.
The diff coverage is 97.43%.

@@            Coverage Diff             @@
##             main     #368      +/-   ##
==========================================
+ Coverage   93.89%   93.95%   +0.06%     
==========================================
  Files          65       65              
  Lines       11130    11269     +139     
==========================================
+ Hits        10450    10588     +138     
- Misses        680      681       +1     
Flag Coverage Δ
unittests 93.95% <97.43%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pyresample/bucket/__init__.py 97.85% <97.29%> (+0.29%) ⬆️
pyresample/test/test_bucket.py 100.00% <100.00%> (ø)
pyresample/test/utils.py 74.76% <0.00%> (-0.94%) ⬇️
pyresample/ewa/dask_ewa.py 90.16% <0.00%> (-0.82%) ⬇️
pyresample/area_config.py 90.55% <0.00%> (ø)
pyresample/test/test_utils.py 100.00% <0.00%> (ø)
pyresample/test/test_bilinear.py 100.00% <0.00%> (ø)
pyresample/test/test_spherical.py 100.00% <0.00%> (ø)
pyresample/test/test_geometry.py 99.47% <0.00%> (+0.01%) ⬆️
pyresample/test/test_dask_ewa.py 98.94% <0.00%> (+0.02%) ⬆️
... and 5 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1928289...8106676. Read the comment docs.

@coveralls
Copy link

coveralls commented Jun 15, 2021

Coverage Status

Coverage increased (+0.07%) to 93.781% when pulling 8106676 on zxdawn:speedup_bucket into 1928289 on pytroll:main.

Copy link
Member

@djhoese djhoese left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The algorithm needs to be broken up into different parts. It seems there are parts that are chunk-friendly and parts that aren't. It also seems like you've tried converting the SO answer into dask by switching some np. calls to da.. In some cases this may be unnecessary as numpy will call dask's methods properly. In other cases it has no effect or is actually worse. For example, da.empty will create an empty dask array, but then you need to do assignments on it which will generally not perform well with dask.

You may also want to look at dask's blockwise if your algorithm ends up going from some chunked array shape to another chunked array shape which is typical for resampling. For example if you had input arrays to an algorithm that were (bins, y, x) and needs to go to (bins, y2, x2) then blockwise would be a good fit. https://docs.dask.org/en/latest/array-api.html#dask.array.blockwise

If you can separate the part of the algorithm that is dealing with multiple chunks at a time (sorting, assigning to random areas of the output array, etc) into its own function and then wrap that in delayed, you can do whatever you want with numpy functions inside that function.

The algorithm you were provided may be the fastest numpy algorithm and it will still perform well in a delayed function but there is probably another algorithm (or a version of this algorithm) that would use dask chunks in a better way. Even if they did this it might not perform as well as the pure numpy version, but it would at least be able to operate per-chunk and never load more data than it needed. How this dask-friendly algorithm would be coded is not obvious and would probably not be worth the effort unless you want to use the opportunity to learn how dask graphs work (really not that bad).

This would be much easier to talk about in person with a whiteboard in front of us.

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@zxdawn
Copy link
Member Author

zxdawn commented Jun 15, 2021

Thank you very much, @djhoese. The explanation is quite clear and I will try my best to improve this PR for dask tomorrow ;)

@zxdawn
Copy link
Member Author

zxdawn commented Jun 16, 2021

@djhoese Following your suggestions, I modified the creation of empty array and switched to to_delayed() and da.from_delayed(). But, I met two problems.

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@zxdawn
Copy link
Member Author

zxdawn commented Jun 18, 2021

@djhoese The purpose of Iterating the blocks is to deal with the memory error problem.

Now, I moved them into a delayed function and get the memory like this now:

    mask_buffer = (bins[:, None] <= idxs_sorted[None, :-1]) &\
numpy.core._exceptions.MemoryError: Unable to allocate 12.8 TiB for an array with shape (3750000, 3749999) and data type bool

@djhoese
Copy link
Member

djhoese commented Jun 18, 2021

What size data are you running this with (source size, target area size)? If the algorithm really is trying to take this much memory then it needs to be rewritten for full dask chunking. I'm sorry I don't have the time to help with this more. It sounds interesting and there is likely something that can be done...I just don't have the time.

@zxdawn
Copy link
Member Author

zxdawn commented Jun 18, 2021

@djhoese The source and target area have the same size (ABI CONUS 2 km): 1500*2500 = 3750000.
I'm using the following code to resample the corrected GLM lon, lat to ABI area and get the min/max of channel in the ABI area:

resampler_glm = BucketResampler(abi_area,
                                lon_glm_corr.chunk(chunks={'y':100, 'x':100}).data,
                                lat_glm_corr.chunk(chunks={'y':100, 'x':100}).data)

corr_data = xr.DataArray(resampler_glm.get_min(scn_glm[channel].isel(time=i)),
                                        dims=['y', 'x'],
                                        coords=[abi_y, abi_x])

@zxdawn
Copy link
Member Author

zxdawn commented Jun 21, 2021

@djhoese @pnuu Finally, I find the quickest method !!! Combing np.digitize() and np.unique() can avoid the memory error caused by the large 2D maks array.

@zxdawn
Copy link
Member Author

zxdawn commented Jun 21, 2021

Em ... It seems that the test error AttributeError: 'tuple' object has no attribute 'dtype' isn't caused by my changes.

@gerritholl gerritholl requested a review from pnuu May 5, 2022 11:27
@djhoese
Copy link
Member

djhoese commented May 5, 2022

@gerritholl @zxdawn What is the current state of this PR as far as performance? Are there still some things you can't explain or you expect or want to be faster? Or do you think you've caught all the edges cases and it is as fast as it can reasonably be?

@gerritholl
Copy link
Collaborator

gerritholl commented May 5, 2022

I can't promise we've caught all edge cases and it's as fast as it can possibly be, but as far as I'm concerned we've made very good progress. I am satisfied with this PR.

Copy link
Member

@djhoese djhoese left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty nice. Good job.

I had a couple small suggestions and then I have one big one that I would like tested out either in this PR or a promise that it will be done in a future PR. Right now the general flow of the dask arrays goes like this:

data -> delayed(_sort_weights) -> indexing and linspace -> delayed(_get_bin_statistics) -> reshape

To me there isn't much benefit to having that middle indexing and linspace section of code. The only benefit I see is if the indexing is much faster when done in parallel with dask, but since the indexes are coming from a delayed function (no real chunking) and it is then fed into the second delayed function right away. I think all this logic should be one giant delayed function. It'd be nice if it could be done with map_blocks but I don't see a way to do that. The only reason not to do this would be if combining all of these resulted in a large memory usage because all of the temporary arrays (del idxs_sorted might help with that).

Lastly, while looking at how self.idxs is generated, I think that entire function (_get_indices) should be turned into a map_blocks call. There are a lot of little intermediate operations there that are just adding more overhead to the dask graph. However, now that I say that I see that y_idxs, x_idxs, and idxs are all assigned to self in this method so maybe it isn't as pretty as I had hoped. Wait! No, y_idxs and x_idxs are never used outside this method, right? Having a map_blocks function that took lons and lats as inputs and returned the equivalent of self.idxs should clean up the dask graph a bit and I would hope improve memory usage and maybe execution time. Bare minimum the dask graph would look better.

pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
pyresample/bucket/__init__.py Outdated Show resolved Hide resolved
@gerritholl
Copy link
Collaborator

This is currently the dask graph:

br-max-dask-graph

In the bucket resampler, use the direct numpy dtype rather than the
detour via a string
@gerritholl
Copy link
Collaborator

gerritholl commented May 6, 2022

No, y_idxs and x_idxs are never used outside this method, right?

They're not used within this class, but they're exposed as public API, so removing them would affect backward compatibility. This behaviour is currently tested in test_get_bucket_indices. @pnuu are x_idxs and y_idxs needed as public attributes?

@pnuu
Copy link
Member

pnuu commented May 6, 2022

No, I don't think x_idxs and y_idxs are needed outside of the resampler.

In the bucket resampler, simplify the implementation of get_max to use
only one rather than two dask.delayed functions.
@gerritholl
Copy link
Collaborator

Simplifying to only one dask.delayed-function as suggested by @djhoese speeds up a test get_max() call from 9 seconds with max 9 GB RAM to 5 seconds with max 4.5 GB RAM. See 57fa302.


return weights_sorted[weight_idx] # last value of weigths_sorted always nan

@dask.delayed(pure=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E302 expected 2 blank lines, found 1

# set bin without data to fill value
statistics = da.where(counts == 0, fill_value, statistics)
statistics = da.from_delayed(
_get_statistics(statistic_method, data, self.idxs, out_size),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E126 continuation line over-indented for hanging indent

@gerritholl
Copy link
Collaborator

However, there still seems to be significant room for improvement. A single task reshape(get_statistics(...)) is taking more than half the time and is not parallelised:

image

@gerritholl
Copy link
Collaborator

I tried if bringing the .reshape(...) call inside the @dask.delayed-decorated function would resolve this, but it just causes another task (_get_statistics) to take over the role of dominating the calculation time. I suppose I don't quite understand what I'm doing here.

For the bucket resampler get_max/get_min, another (slight) speedup
by reshaping inside rather than outside the dask delayed function.
My test runs about 0.8 seconds faster on my system, but the dask graph
is still dominated by a single task taking 60% of the time.
@djhoese
Copy link
Member

djhoese commented May 6, 2022

So the entire top portion (or most of it) of that dask graph is the .idxs creation. If you look at the names of the nodes they all map to the operations in that idxs method (subtract, divide, floor). I'm not saying there would necessarily be any performance improvements, but the dask graph and debugging might be easier if all of that was in a map_blocks function. You can see in the graph that there are single paths/lines going into this huge chunk of tasks and then a single line comes out. That says "this needs map_blocks" to me.

@djhoese
Copy link
Member

djhoese commented May 6, 2022

...and yeah until we come up with a better algorithm I'm not sure we can get away from the single task doing most of the work. That said, there is that .rechunk right before the delayed function is called. That probably isn't needed anymore since the result goes straight into the delayed function.

@gerritholl
Copy link
Collaborator

In the spirit of incremental PRs that do one thing, I'm a bit hesitant to break open the idxs, x_idxs, and y_idxs attributes and _get_indices(). self.idxs is used not only in get_max() and get_min(), but also in get_count() and get_sum(), and therefore by get_average(). It would broaden the scope of the PR from speeding up get_max() and get_min() to refactoring the whole BucketResampler class. Therefore, I would prefer to defer changing _get_indices() to another PR.

@djhoese
Copy link
Member

djhoese commented May 6, 2022

I'm fine with that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants