Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Fix groupBy not canceling upstream due to group abandonment #6642

Merged
merged 4 commits into from
Aug 29, 2019

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Aug 29, 2019

This PR fixes the issue when a group is not subscribed to, the upstream may never cancel due to seemingly open groups.

The fix is a tradeoff with group abandonment and possible excessive group re-creation so that elements are not lost in case the groups do get subscribed to a bit later. Therefore, the groups should be subscribed to immediately and synchronously:

Observable.range(1, 1000)
.groupBy(v -> v % 10)
.flatMap(v -> {
    System.out.println("New group: " + v.getKey());
    return v;
})
.subscribe();

Observable.range(1, 1000)
.groupBy(v -> v % 10)
.flatMap(v -> {
    System.out.println("New group: " + v.getKey());
    return v.observeOn(Schedulers.io()); //  <-------------------------------- OK
})
.blockingSubscribe();

Consequently, the following setups will result in constant group recreations:

Observable.range(1, 1000)
.groupBy(v -> v % 10)
.observeOn(Schedulers.io()) // <------------------------------------- TROUBLE
.flatMap(v -> {
    System.out.println("New group: " + v.getKey());
    return v;
})
.blockingSubscribe();

Observable.range(1, 1000)
.groupBy(v -> v % 10)
.flatMap(v -> {
    System.out.println("New group: " + v.getKey());
    return v.subscribeOn(Schedulers.io()); // <----------------------- TROUBLE
})
.blockingSubscribe();

For the subscribeOn "trouble", since groups were essentially unicast subjects/processors, subscribeOn had no practical use on them and instead observeOn should be used to move the observation of the group's items to the desired thread.

Resolves #6596

@akarnokd akarnokd added this to the 3.0 milestone Aug 29, 2019
@codecov
Copy link

codecov bot commented Aug 29, 2019

Codecov Report

❗ No coverage uploaded for pull request base (3.x@c9204b5). Click here to learn what that means.
The diff coverage is 89.58%.

Impacted file tree graph

@@          Coverage Diff           @@
##             3.x    #6642   +/-   ##
======================================
  Coverage       ?   98.11%           
  Complexity     ?     6192           
======================================
  Files          ?      678           
  Lines          ?    44721           
  Branches       ?     6176           
======================================
  Hits           ?    43879           
  Misses         ?      298           
  Partials       ?      544
Impacted Files Coverage Δ Complexity Δ
.../main/java/io/reactivex/rxjava3/core/Flowable.java 100% <ø> (ø) 561 <0> (?)
...ain/java/io/reactivex/rxjava3/core/Observable.java 100% <ø> (ø) 537 <0> (?)
...3/internal/operators/flowable/FlowableGroupBy.java 95.2% <88.46%> (ø) 3 <0> (?)
...ternal/operators/observable/ObservableGroupBy.java 95.5% <90.9%> (ø) 2 <0> (?)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c9204b5...a532b6e. Read the comment docs.

@akarnokd akarnokd merged commit 9a36930 into ReactiveX:3.x Aug 29, 2019
@akarnokd akarnokd deleted the GroupByFix3x branch August 29, 2019 14:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3.x: groupBy may never cancel the upstream if the group is dropped
2 participants