Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples of back pressure which can easily happen #726

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

musketyr
Copy link

@musketyr musketyr commented Jun 6, 2016

just for example. no need to merge if you don't want to implement back pressure handling. the test can run very long time or break the build with OOME.

cc @graemerocher

just for example. no need to merge if you don't want to implement back pressure handling.
@lhotari
Copy link
Contributor

lhotari commented Jun 7, 2016

Nice example @musketyr .

I tried using RxJava's support for reactive pull backpressure:

    void "example of using reactive pull"() {
        when:"All Freds are listed"
        AtomicInteger count = new AtomicInteger(0)
        CountDownLatch latch = new CountDownLatch(1)
        Simple.where {
            name ==~ ~/Fred.+/
        }
        .toObservable()
        .doOnNext { println "Now we have $it.name on the stage..."}
        .subscribe(new Subscriber<Simple>() {
            int inflight

            @Override
            void onStart() {
                inflight = 10
                request(10)
            }

            @Override
            void onCompleted() {
                latch.countDown()
            }

            @Override
            void onError(Throwable e) {

            }

            @Override
            void onNext(Simple simple) {
                // emulates slow processing
                Thread.sleep(1)
                count.incrementAndGet()
                if(--inflight == 0) {
                    inflight = 10
                    request(10)
                }
            }
        })

        then:"The results are correct"
        latch.await(60, TimeUnit.SECONDS)
        count.get() == 25000
    }

I think that the "reactive pull backpressure" pattern is meant to be used to prevent the backpressure problems you were describing. It would be nice to make sure that it's fully supported in RxGorm.

@musketyr
Copy link
Author

musketyr commented Jun 7, 2016

Yeah, I can see you have the same problem as me when I run it. The problem with the request(n) is that it must be supported by producer. e.g. see

https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/DetachedCriteriaOnSubscribe.java#L42

I've tried the batch operator doing the same reactive pull approach as you've did

https://github.com/MetadataRegistry/ModelCataloguePlugin/blob/2.x/ModelCatalogueCorePlugin/src/groovy/org/modelcatalogue/core/rx/BatchOperator.java#L99

but it doesn't work very well either as actually the batches were read one after another in high rate so I get into back pressure problem again. I know the solution lies somewhere here

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java#L110

or here

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/observables/AsyncOnSubscribe.java

but it's way beyond my Rx knowledge.

@graemerocher
Copy link
Member

We support this in cases that we have control over, in the case of the MongoDB implementation support would have to be written in the mongodb-rx drivers.

@lhotari
Copy link
Contributor

lhotari commented Jun 7, 2016

@musketyr @graemerocher It looks like the mongodb-rx drivers support the "reactive pull" backpressure (flow control) since the producer implements the request method. I assume it's a matter of making sure (=writing tests and fixing problems) that "reactive pull" is supported for all operators in the full chain up to the mongodb rx driver. One way to test it would be to inject some wrapper/proxy in the test that intercepts the interaction with the mongodb rx driver to see if demand is signaled correctly via request method calls. I guess it's easy to break "reactive pull" backpressure since supporting reactive backpressure is optional in RxJava v1.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants