-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Break apart uneven array-of-int slicing to separate chunks #3648
Conversation
This currently fails pretty hard on the random shuffle case. It would possibly create as many chunks as there are elements in the index. |
I would generally be fine with this in many of my use cases when I am dealing with very big datasets. I usually start with data that has singleton chunks in time. Then I want to remove the climatology from each timestep. If the result comes back with one chunk per element in the index, that is how the data started out anyway. |
Indeed, I think we get lucky in a lot of dask.array use-cases, because it's easy to amortize scheduler overhead by making other dimensions large instead. Full shuffles of individual array elements are pretty rare, because grouped operations usually only go over one dimensions (e.g., time). |
0ce2e35
to
97e0ea3
Compare
So there are two cases where we slice an array.
Sounds like this doesn't matter for case 1, but makes 2 much worse. Also it sounds like how we chunk our data is different from climatology data. Namely we have many frames per chunk (~100 frames/chunk) as frames are smallish. So this penalty in case 2 is a bit more unpleasant for us to deal with. Given this, could you please elaborate on where this will be used and what this is intended to help with? Are there open issues or algorithms planned where this is needed? |
Admittedly we would not rely on slicing by randomized indices if we have a proper path for shuffling. xref: #3409 |
@jakirkham can you give some representative examples for 2 that you do now, perhaps using random data? I'm curious to know what your array sizes are and how you index currently. |
97e0ea3
to
3c52bb0
Compare
Actually, this is wrong... still trying to understand existing code...
pydata/xarray#2237 is probably the best description of why the current approach is problematic. Xarray does some out of order indexing at the end as part of groupby-transform, which currently doesn't really work with dask because the resulting array is unchunked. |
The crux of the existing solution (for non-sorted indices) are these lines: Lines 590 to 596 in 5fb3854
If I understand correctly, its approach is:
With this approach, we would lose the efficiency gains from sorting indices, which keeps the same number of chunks in step (2) as the original array. |
Steps (3) and (4) is where reshuffling happens. Here we have two additional concerns:
|
@shoyer do you have thoughts on how the approach taken in this PR will
affect XArray's grouping of things? My guess is that it will make the
partitioned-by-day, group by month-of-year computations much more
pleasant. It would be useful to experiment with this though.
…On Wed, Jun 20, 2018 at 8:51 PM, Stephan Hoyer ***@***.***> wrote:
Steps (3) and (4) is where reshuffling happens. Here we have two
additional concerns:
1. It may not be a *full* reshuffle: some indices are likely in
contiguous blocks.
2. If it fit into memory, it is advantageous to reshuffle chunks that
are as large as possible (currently, we use a single chunk), because the
naive cost of reshuffling is quadratic.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#3648 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszB-kDU2Q63BgBgu_w29R-tiP0TKzks5t-u4EgaJpZM4Uvtjz>
.
|
Also @rabernat it would be good if you could try this out on a semi-realistic problem and see what the performance is like. |
I just tried this out on a very realistic problem, the original climatology / anomaly use case that motivated pydata/xarray#2237. It worked exactly as I hoped! I started with a DataArray that looked like this
I ran this code: sst_clim = sst.groupby('time.month').mean(dim='time')
sst_anom = sst.groupby('time.month') - sst_clim
sst_anom
The result has the exact chunking structure I hoped for. I can then load a single value of sst_anom, i.e. Even better, I can call So from my perspective, I would be very glad to see this feature merged asap. @jetesdal has been struggling with this in his use case, so maybe he will want to try out this PR as well. |
It occurs to me that once we know the "right" output chunks, an algorithmic approach that works in all cases is somewhat straightforward:
The total number of tasks becomes at worse This is basically the algorithm that that @crusaderky implements in #3407 for indexing dask arrays by dask arrays, but with one very important opportunity for optimization: if the indexer is a NumPy array, we should avoid creating "indexing tasks" for empty data. (We can efficiently determine necessary inputs for each output chunk by using binary search on the input array indices.) So for the typical case where input chunks can be mapped to ~1 output chunks, the number of tasks is still linear in the number of chunks. So the final ingredient we need is some heuristics and/or explicit arguments for deciding output chunks. Probably simply copying the typical input chunk size would be fine. |
I'm inclined not to do anything beyond what is necessary and allow the user to rechunk as they see fit. Decreasing the number of output chunks by concatenating after slicing doesn't reduce the administrative load of this operation at all. If we're going to do any sort of automatic rechunking I'd be more inclined to do it on the next operation if it sees fit (though obviously this would be a moderate-future change). |
Agreed, but output chunk size also determines how many tasks are created inside indexing operation itself. If output chunks are too small (e.g., size 1 for unsorted indices with this PR), this could result in a very large number of small tasks, which could be quite slow to compute. I think this is what @jakirkham is concerned about here. |
I think that we're going to be hit by that anyway. We're going to want all of the individual After we have computed all of the individual slices then we might choose to concatenate or not, but we've already lost the "avoid too many tasks" game. |
More concretely, we're going to want to do this: {(y, 0): (getitem, (x, i), ...),
(y, 1): (getitem, (x, j), ...),
(y, 2): (getitem, (x, k), ...),
(z, 0): (np.concatenate, [(y, 0), (y, 1)]),
(z, 1): (np.concatenate, [(y, 2)]),
} And not this {
(z, 0): (np.concatenate, [(getitem, (x, i), ...), (getitem, (x, j), ...)]),
(z, 1): (np.concatenate, [(getitem, (x, k), ...)]),
} In the first case, I'm proposing that we just leave off the concatenate tasks. The second case is sub-optimal because it forces the full chunks of |
Consider this example: array = da.arange(100, chunks=10)
# [0, 10, 20, ... 90, 1, 11, 21, ... 91, ...]
index = np.arange(100).reshape(10, 10).ravel(order='F') We want to compute
If we group the index into chunks of size 10, we end up with OK, I've convinced myself that this is about the best we can do. |
I had a bad mental model. I think @mrocklin is correct and this is about the best we can do in general. |
OK, so now that a 0.18.1 bugfix release is out I'm inclined to merge something like this in and see how it goes. However, first I think that we should establish some behavior to warn users off of pathological behavior. I see two approaches if we detect that the result will have too many chunks:
Any thoughts or preferences? |
I've added a warning if we increase the number of chunks by a factor of ten or more |
dask/array/slicing.py
Outdated
if len(plan) >= len(chunks[axis]) * 10: | ||
factor = math.ceil(len(plan) / len(chunks[axis])) | ||
warnings.warn("Slicing with an out-of-order index is generating %d " | ||
"times more chunks" % factor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two suggestions to make this a little more usable:
- Make an
Warning
subclass (e.g.,dask.array.PerformanceWarning
) to issue here instead of the default runtime warning. We could use this inunify_chunks
, too. Then at least you can writewarnings.simplefilter('ignore', PerformanceWarning)
to silence this. - Include
stacklevel
(if possible) to point the warning message to the relevant line of indexing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I wasn't aware of stacklevel
before. That's really nice!
Certainly it's better to warn than to silently rechunk, and this is indeed probably the most user friendly thing to do. I do wish there was a more convenient way to silence warnings though... two lines and an import statement feel more painful than necessary.
|
It still suspect there are cases (e.g., indexing to randomize the order of an array of with 1e6 elements) where doing indexing in multiple passes could make a big difference. This sort of reshuffling has a very similar feel to |
Yeah, there are still definitely situations where multi-pass shuffling would improve things. Probably the right way to do this is to construct a few indexes to pass in sequence:
So a shuffle algorithm might be posed as a function that transforms an index into a list of indexes. |
This is far-future work though. |
+1
Yes, agreed! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
dask/array/utils.py
Outdated
@@ -125,3 +125,7 @@ def safe_wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS): | |||
return functools.wraps(wrapped, assigned=assigned) | |||
else: | |||
return lambda x: x | |||
|
|||
|
|||
class PerformanceWarning(Warning): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also defined this in dask/array/core.py :)
dask/array/core.py
Outdated
@@ -70,6 +70,11 @@ def register_sparse(): | |||
tensordot_lookup.register(sparse.COO, sparse.tensordot) | |||
|
|||
|
|||
class PerformanceWarning(Warning): | |||
""" A warning given when bad chunking may cause poor performance """ | |||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there's no need for pass
if you supply a docstring.
Sorry for the long delay here. Took a bit to pull my thoughts together and edit them down to digestible pieces. Thanks for the preface, @shoyer, that really helped bring it into context. While we are doing different things with different end goals in mind. Did note there were some places where we overlapped and/or ran into the same issue(s). In particular, this comment stuck out to me as we ran into the exact same problem. We've also needed to workaround this somehow. Would agree it needs a proper fix. So don't have a great example or solution currently, @mrocklin. Partially because we are changing some stuff after the recent sprint. That said, this comment does a pretty good job outlining it. Should add we don't select all indices at once. So it ends up being several chunks we combine. Expect there are better algorithms for randomizing large data, but have not had the opportunity to look into it more closely yet. Suggestions welcome. Right now this is small as we are exploring this procedure with small datasets to see how it performs and slowly increasing size, but would expect this strategy falls apart as the size increases. That said, we don't need all of the randomized data all at once. In fact, we will only need a chunks worth at a time. So expect there is room for optimization by feeding in pieces of randomized data at a time. Based on recent experiments it seems that the randomizing (despite how suboptimal it is) is not actually the worst step performance wise. So having a bunch of singleton chunks is probably fine for now (and certainly better than the current behavior). |
6029c55
to
b8bf2d4
Compare
Python 2 stops warnings after they have been raised once
I now think that this is probably a good move, and is probably on the path towards a full shuffle operation for dask.array. I plan to merge this in 24 hours if there are no further comments. |
This is in. Thanks all |
Awesome! Any idea when we can expect the next dask release?
…Sent from my iPhone
On Jun 25, 2018, at 9:11 AM, Matthew Rocklin ***@***.***> wrote:
This is in. Thanks all
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
We released this last weekend, so probably not immediately.
On Mon, Jun 25, 2018 at 9:37 AM, Ryan Abernathey <[email protected]>
wrote:
… Awesome! Any idea when we can expect the next dask release?
Sent from my iPhone
> On Jun 25, 2018, at 9:11 AM, Matthew Rocklin ***@***.***>
wrote:
>
> This is in. Thanks all
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub, or mute the thread.
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#3648 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszKrVJIVIMoPfpP07Cu8xZcpMn7-Vks5uAOewgaJpZM4Uvtjz>
.
|
Can always install this from development. |
….com/convexset/dask into fix-tsqr-case-chunk-with-zero-height * 'fix-tsqr-case-chunk-with-zero-height' of https://github.com/convexset/dask: fixed typo in documentation and improved clarity Implement .blocks accessor (dask#3689) Fix wrong names (dask#3695) Adds endpoint and retstep support for linspace (dask#3675) Add the @ operator to the delayed objects (dask#3691) Align auto chunks to provided chunks, rather than shape (dask#3679) Adds quotes to source pip install (dask#3678) Prefer end-tasks with low numbers of dependencies when ordering (dask#3588) Reimplement argtopk to release the GIL (dask#3610) Note `da.pad` can be used with `map_overlap` (dask#3672) Allow tasks back onto ordering stack if they have one dependency (dask#3652) Fix extra progressbar (dask#3669) Break apart uneven array-of-int slicing to separate chunks (dask#3648) fix for `dask.array.linalg.tsqr` fails tests (intermittently) with arrays of uncertain dimensions (dask#3662)
Previously when slicing into a dask array with a list/array of integers we would do one of two things:
Sorted, respect chunking
If the array was sorted then it would return an array that was chunked similarly to the input
Unsorted, single chunk
However if the index wasn't sorted then it would put everything into one big chunk
Now
Now, we pick out different pieces of the index and use them to slice into the dask array.
This is good because it allows more granular chunking when slicing with a semi-sorted index, but is bad in that it can easily create very many chunks in pathological situations, like indexing with a random index.
cc @shoyer @rabernat @jakirkham
Follows on discussion from pydata/xarray#2237