Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break apart uneven array-of-int slicing to separate chunks #3648

Merged
merged 6 commits into from
Jun 25, 2018

Conversation

mrocklin
Copy link
Member

Previously when slicing into a dask array with a list/array of integers we would do one of two things:

Sorted, respect chunking

If the array was sorted then it would return an array that was chunked similarly to the input

import dask.array as da
x = da.ones((20, 20), chunks=(4, 5))
x.chunks
# ((4, 4, 4, 4, 4), (5, 5, 5, 5))

import numpy as np

x[np.arange(20), :].chunks
# ((4, 4, 4, 4, 4), (5, 5, 5, 5))

x[np.arange(20) // 2, :].chunks
# ((8, 8, 4), (5, 5, 5, 5))

Unsorted, single chunk

However if the index wasn't sorted then it would put everything into one big chunk

index = np.arange(20) % 10
# array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

x[index, :].chunks
# ((20,), (5, 5, 5, 5))

Now

Now, we pick out different pieces of the index and use them to slice into the dask array.

x = da.ones((20, 20), chunks=(4, 5))
index = np.arange(20) % 10
# array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

x[index].chunks
# Before: ((20,), (5, 5, 5, 5))
# Now: ((4, 4, 2, 4, 4, 2), (5, 5, 5, 5))

This is good because it allows more granular chunking when slicing with a semi-sorted index, but is bad in that it can easily create very many chunks in pathological situations, like indexing with a random index.

cc @shoyer @rabernat @jakirkham

Follows on discussion from pydata/xarray#2237

@mrocklin
Copy link
Member Author

This currently fails pretty hard on the random shuffle case. It would possibly create as many chunks as there are elements in the index.

@rabernat
Copy link
Contributor

It would possibly create as many chunks as there are elements in the index.

I would generally be fine with this in many of my use cases when I am dealing with very big datasets. I usually start with data that has singleton chunks in time. Then I want to remove the climatology from each timestep. If the result comes back with one chunk per element in the index, that is how the data started out anyway.

@shoyer
Copy link
Member

shoyer commented Jun 20, 2018

It would possibly create as many chunks as there are elements in the index.
I would generally be fine with this in many of my use cases when I am dealing with very big datasets. I usually start with data that has singleton chunks in time. Then I want to remove the climatology from each timestep. If the result comes back with one chunk per element in the index, that is how the data started out anyway.

Indeed, I think we get lucky in a lot of dask.array use-cases, because it's easy to amortize scheduler overhead by making other dimensions large instead. Full shuffles of individual array elements are pretty rare, because grouped operations usually only go over one dimensions (e.g., time).

@jakirkham
Copy link
Member

So there are two cases where we slice an array.

  1. Slicing a single frame or range of neighboring frames for visualization
  2. To shuffle the array into random order for feeding into a machine learning algorithm

Sounds like this doesn't matter for case 1, but makes 2 much worse.

Also it sounds like how we chunk our data is different from climatology data. Namely we have many frames per chunk (~100 frames/chunk) as frames are smallish. So this penalty in case 2 is a bit more unpleasant for us to deal with.

Given this, could you please elaborate on where this will be used and what this is intended to help with? Are there open issues or algorithms planned where this is needed?

@jakirkham
Copy link
Member

Admittedly we would not rely on slicing by randomized indices if we have a proper path for shuffling.

xref: #3409

@mrocklin
Copy link
Member Author

@jakirkham can you give some representative examples for 2 that you do now, perhaps using random data? I'm curious to know what your array sizes are and how you index currently.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

  1. To shuffle the array into random order for feeding into a machine learning algorithm

Sounds like this doesn't matter for case 1, but makes 2 much worse.

The current algorithm already needs to make many small getitem tasks for full shuffles, but automatically concatenates them all together afterwards into a single chunk -- look for np.concatenate for the old implementation in the diff. For your use case, I think it would be equivalent to use the solution in this PR, but immediately add rechunk() afterwards to consolidate chunks.

This could be a good use case for automatic rechunking along the indexed dimension, per #3587. The main challenge would be figuring out how to expose control of auto rechunking, given that there are no keyword arguments for __getitem__.

We could also reduce the scheduler overhead of full reshuffles on individual elements by adding intermediate steps, similar to what we currently do in dask.array.rechunk. But this would be a follow-on improvement: full reshuffles are already very expensive with dask.array (O(m*n) tasks, where m=len(array) and n=len(index)).

Actually, this is wrong... still trying to understand existing code...

Given this, could you please elaborate on where this will be used and what this is intended to help with? Are there open issues or algorithms planned where this is needed?

pydata/xarray#2237 is probably the best description of why the current approach is problematic. Xarray does some out of order indexing at the end as part of groupby-transform, which currently doesn't really work with dask because the resulting array is unchunked.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

The crux of the existing solution (for non-sorted indices) are these lines:

dask/dask/array/slicing.py

Lines 590 to 596 in 5fb3854

rev_index = np.searchsorted(sorted_idx, index)
vals = [(getitem, (np.concatenate,
[(getitem, ((inname, ) + d[:axis] + (i, ) + d[axis + 1:]),
((colon, ) * axis + (IL, ) + (colon, ) * (n - axis - 1)))
for i, IL in enumerate(index_lists) if len(IL)], axis),
((colon, ) * axis + (rev_index, ) + (colon, ) * (n - axis - 1)))
for d in product(*dims)]

If I understand correctly, its approach is:

  1. Sort indices
  2. Index each chunk with the relevant indices
  3. Concatenate chunks together along the indexed dimension
  4. Reorder (with rev_index) along the indexed dimension

With this approach, we would lose the efficiency gains from sorting indices, which keeps the same number of chunks in step (2) as the original array.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

Steps (3) and (4) is where reshuffling happens. Here we have two additional concerns:

  1. It may not be a full reshuffle: some indices are likely in contiguous blocks.
  2. If it fit into memory, it is advantageous to reshuffle chunks that are as large as possible (currently, we use a single chunk), because the naive cost of reshuffling is quadratic.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 21, 2018 via email

@mrocklin
Copy link
Member Author

Also @rabernat it would be good if you could try this out on a semi-realistic problem and see what the performance is like.

@rabernat
Copy link
Contributor

I just tried this out on a very realistic problem, the original climatology / anomaly use case that motivated pydata/xarray#2237.

It worked exactly as I hoped!

I started with a DataArray that looked like this

<xarray.DataArray 'SST' (time: 1095, nlat: 2400, nlon: 3600)>
dask.array<shape=(1095, 2400, 3600), dtype=float32, chunksize=(1, 2400, 3600)>
Coordinates:
  * time     (time) datetime64[ns] 2016-01-01 2016-01-02 2016-01-03 ...
Dimensions without coordinates: nlat, nlon

I ran this code:

sst_clim = sst.groupby('time.month').mean(dim='time')
sst_anom = sst.groupby('time.month') - sst_clim
sst_anom
<xarray.DataArray 'SST' (time: 1095, nlat: 2400, nlon: 3600)>
dask.array<shape=(1095, 2400, 3600), dtype=float32, chunksize=(1, 2400, 3600)>
Coordinates:
  * time     (time) datetime64[ns] 2016-01-01 2016-01-02 2016-01-03 ...
    month    (time) int64 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 ...
Dimensions without coordinates: nlat, nlon

The result has the exact chunking structure I hoped for. I can then load a single value of sst_anom, i.e. sst_anom[0].load(), and only have the necessary computations triggered.

Even better, I can call sst_clim.persist(), which drastically speeds up the calculation of specific sst_anom anom values (since the expensive aggregation is already done).

So from my perspective, I would be very glad to see this feature merged asap.

@jetesdal has been struggling with this in his use case, so maybe he will want to try out this PR as well.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

It occurs to me that once we know the "right" output chunks, an algorithmic approach that works in all cases is somewhat straightforward:

  1. Create a "indexing task" for transferring data from chunk i of the input to chunk j of the output for all relevant chunk index pairs (i, j).
  2. Create a "concatenate task" for joining together and reorder the indexing tasks for each output chunk.

The total number of tasks becomes at worse O(num_input_chunks * num_output_chunks), which is quadratic in the number of chunks but certainly better than O(num_input_chunks * output_size) (which I think is the worst case behavior we have here).

This is basically the algorithm that that @crusaderky implements in #3407 for indexing dask arrays by dask arrays, but with one very important opportunity for optimization: if the indexer is a NumPy array, we should avoid creating "indexing tasks" for empty data. (We can efficiently determine necessary inputs for each output chunk by using binary search on the input array indices.) So for the typical case where input chunks can be mapped to ~1 output chunks, the number of tasks is still linear in the number of chunks.

So the final ingredient we need is some heuristics and/or explicit arguments for deciding output chunks. Probably simply copying the typical input chunk size would be fine.

@mrocklin
Copy link
Member Author

So the final ingredient we need is some heuristics and/or explicit arguments for deciding output chunks. Probably simply copying the typical input chunk size would be fine.

I'm inclined not to do anything beyond what is necessary and allow the user to rechunk as they see fit. Decreasing the number of output chunks by concatenating after slicing doesn't reduce the administrative load of this operation at all. If we're going to do any sort of automatic rechunking I'd be more inclined to do it on the next operation if it sees fit (though obviously this would be a moderate-future change).

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

I'm inclined not to do anything beyond what is necessary and allow the user to rechunk as they see fit. Decreasing the number of output chunks by concatenating after slicing doesn't reduce the administrative load of this operation at all.

Agreed, but output chunk size also determines how many tasks are created inside indexing operation itself. If output chunks are too small (e.g., size 1 for unsorted indices with this PR), this could result in a very large number of small tasks, which could be quite slow to compute. I think this is what @jakirkham is concerned about here.

@mrocklin
Copy link
Member Author

I think that we're going to be hit by that anyway. We're going to want all of the individual getitem calls to be separate tasks. Otherwise we'll collect a bunch of input chunks onto one machine before slicing into them.

After we have computed all of the individual slices then we might choose to concatenate or not, but we've already lost the "avoid too many tasks" game.

@mrocklin
Copy link
Member Author

More concretely, we're going to want to do this:

{(y, 0): (getitem, (x, i), ...),
 (y, 1): (getitem, (x, j), ...),
 (y, 2): (getitem, (x, k), ...),
 (z, 0): (np.concatenate, [(y, 0), (y, 1)]),
 (z, 1): (np.concatenate, [(y, 2)]),
}

And not this

{
 (z, 0): (np.concatenate, [(getitem, (x, i), ...), (getitem, (x, j), ...)]),
 (z, 1): (np.concatenate, [(getitem, (x, k), ...)]),
}

In the first case, I'm proposing that we just leave off the concatenate tasks.

The second case is sub-optimal because it forces the full chunks of (x, i) and (x, j) to be on the same machine before we slice into them.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

I think that we're going to be hit by that anyway. We're going to want all of the individual getitem calls to be separate tasks. Otherwise we'll collect a bunch of input chunks onto one machine before slicing into them.

Consider this example:

array = da.arange(100, chunks=10)

# [0, 10, 20, ... 90, 1, 11, 21, ... 91, ...]
index = np.arange(100).reshape(10, 10).ravel(order='F')

We want to compute array[index], say with a chunksize of 10.

With this PR, we end up with 100*10 = 1000 tasks for indexing. This is wrong. We only get 100 tasks, because we index only the necessary chunk for each task.

If we group the index into chunks of size 10, we end up with 10*10 = 100 tasks for indexing.

OK, I've convinced myself that this is about the best we can do.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2018

I had a bad mental model. I think @mrocklin is correct and this is about the best we can do in general.

@mrocklin
Copy link
Member Author

OK, so now that a 0.18.1 bugfix release is out I'm inclined to merge something like this in and see how it goes. However, first I think that we should establish some behavior to warn users off of pathological behavior. I see two approaches if we detect that the result will have too many chunks:

  1. Warn
  2. Rechunk the input array into a single chunk (old behavior)

Any thoughts or preferences?

@mrocklin
Copy link
Member Author

I've added a warning if we increase the number of chunks by a factor of ten or more

if len(plan) >= len(chunks[axis]) * 10:
factor = math.ceil(len(plan) / len(chunks[axis]))
warnings.warn("Slicing with an out-of-order index is generating %d "
"times more chunks" % factor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions to make this a little more usable:

  1. Make an Warning subclass (e.g., dask.array.PerformanceWarning) to issue here instead of the default runtime warning. We could use this in unify_chunks, too. Then at least you can write warnings.simplefilter('ignore', PerformanceWarning) to silence this.
  2. Include stacklevel (if possible) to point the warning message to the relevant line of indexing code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I wasn't aware of stacklevel before. That's really nice!

@shoyer
Copy link
Member

shoyer commented Jun 22, 2018

I've added a warning if we increase the number of chunks by a factor of ten or more

Certainly it's better to warn than to silently rechunk, and this is indeed probably the most user friendly thing to do. I do wish there was a more convenient way to silence warnings though... two lines and an import statement feel more painful than necessary.

import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore", da.PerformanceWarning)
    ...

@shoyer
Copy link
Member

shoyer commented Jun 22, 2018

It still suspect there are cases (e.g., indexing to randomize the order of an array of with 1e6 elements) where doing indexing in multiple passes could make a big difference. This sort of reshuffling has a very similar feel to rechunk().

@mrocklin
Copy link
Member Author

mrocklin commented Jun 22, 2018

Yeah, there are still definitely situations where multi-pass shuffling would improve things. Probably the right way to do this is to construct a few indexes to pass in sequence:

x[index] -> x[index1][index2][index3]

So a shuffle algorithm might be posed as a function that transforms an index into a list of indexes.

@mrocklin
Copy link
Member Author

This is far-future work though.

@shoyer
Copy link
Member

shoyer commented Jun 22, 2018

So a shuffle algorithm might be posed as a function that transforms an index into a list of indexes.

+1

This is far-future work though.

Yes, agreed!

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

@@ -125,3 +125,7 @@ def safe_wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS):
return functools.wraps(wrapped, assigned=assigned)
else:
return lambda x: x


class PerformanceWarning(Warning):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also defined this in dask/array/core.py :)

@@ -70,6 +70,11 @@ def register_sparse():
tensordot_lookup.register(sparse.COO, sparse.tensordot)


class PerformanceWarning(Warning):
""" A warning given when bad chunking may cause poor performance """
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's no need for pass if you supply a docstring.

@jakirkham
Copy link
Member

Sorry for the long delay here. Took a bit to pull my thoughts together and edit them down to digestible pieces.

Thanks for the preface, @shoyer, that really helped bring it into context. While we are doing different things with different end goals in mind. Did note there were some places where we overlapped and/or ran into the same issue(s). In particular, this comment stuck out to me as we ran into the exact same problem. We've also needed to workaround this somehow. Would agree it needs a proper fix.

So don't have a great example or solution currently, @mrocklin. Partially because we are changing some stuff after the recent sprint. That said, this comment does a pretty good job outlining it. Should add we don't select all indices at once. So it ends up being several chunks we combine. Expect there are better algorithms for randomizing large data, but have not had the opportunity to look into it more closely yet. Suggestions welcome.

Right now this is small as we are exploring this procedure with small datasets to see how it performs and slowly increasing size, but would expect this strategy falls apart as the size increases. That said, we don't need all of the randomized data all at once. In fact, we will only need a chunks worth at a time. So expect there is room for optimization by feeding in pieces of randomized data at a time. Based on recent experiments it seems that the randomizing (despite how suboptimal it is) is not actually the worst step performance wise. So having a bunch of singleton chunks is probably fine for now (and certainly better than the current behavior).

Python 2 stops warnings after they have been raised once
@mrocklin
Copy link
Member Author

I now think that this is probably a good move, and is probably on the path towards a full shuffle operation for dask.array.

I plan to merge this in 24 hours if there are no further comments.

@mrocklin mrocklin merged commit 704470a into dask:master Jun 25, 2018
@mrocklin mrocklin deleted the slicing-semi-sorted branch June 25, 2018 13:11
@mrocklin
Copy link
Member Author

This is in. Thanks all

@rabernat
Copy link
Contributor

rabernat commented Jun 25, 2018 via email

@mrocklin
Copy link
Member Author

mrocklin commented Jun 25, 2018 via email

@jakirkham
Copy link
Member

Can always install this from development.

convexset added a commit to convexset/dask that referenced this pull request Jul 1, 2018
….com/convexset/dask into fix-tsqr-case-chunk-with-zero-height

* 'fix-tsqr-case-chunk-with-zero-height' of https://github.com/convexset/dask:
  fixed typo in documentation and improved clarity
  Implement .blocks accessor (dask#3689)
  Fix wrong names (dask#3695)
  Adds endpoint and retstep support for linspace (dask#3675)
  Add the @ operator to the delayed objects (dask#3691)
  Align auto chunks to provided chunks, rather than shape (dask#3679)
  Adds quotes to source pip install (dask#3678)
  Prefer end-tasks with low numbers of dependencies when ordering (dask#3588)
  Reimplement argtopk to release the GIL (dask#3610)
  Note `da.pad` can be used with `map_overlap` (dask#3672)
  Allow tasks back onto ordering stack if they have one dependency (dask#3652)
  Fix extra progressbar (dask#3669)
  Break apart uneven array-of-int slicing to separate chunks (dask#3648)
  fix for `dask.array.linalg.tsqr` fails tests (intermittently) with arrays of uncertain dimensions (dask#3662)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants