Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal to Introduce generate Method and Enhance create for Concurrency in the FiberInterop #99

Open
denyshorman opened this issue Jun 13, 2024 · 0 comments

Comments

@denyshorman
Copy link

The FiberInterop library allows creating an RxJava Flowable stream easily, as demonstrated in the following example:

var executor = Executors.newVirtualThreadPerTaskExecutor();

var flow = FiberInterop.create(emitter -> {
    for (var i = 0; i < 10; i++) {
        Thread.sleep(500);
        emitter.emit(i);
    }
}, executor);

flow.blockingForEach(value -> {
    System.out.println(value);
});

This example works perfectly for simple sequential flow generation.
However, issues arise when concurrency is introduced into the flow generation process. For example:

var executor = Executors.newVirtualThreadPerTaskExecutor();
var taskCount = 10;

while (true) {
    var flow = FiberInterop.create(emitter -> {
        try (var scope = Executors.newVirtualThreadPerTaskExecutor()) {
            var latch = new CountDownLatch(1);

            for (var i = 0; i < taskCount; i++) {
                final var taskId = i;

                scope.submit(() -> {
                    try {
                        latch.await();
                        emitter.emit(taskId);
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                });
            }

            latch.countDown();
        }
    }, executor);

    var size = flow.toList().blockingGet().size();

    if (size < taskCount) {
        System.out.println(STR."Expected \{taskCount} but got \{size}");
        break;
    }
}

As can be seen from the example above, the create method does not handle concurrency, leading to a situation where the resulting flow size can be less than taskCount. This behavior is not documented, which can confuse users who might expect concurrent generation to work seamlessly.

To address this issue, it would be beneficial to differentiate between methods designed for simple non-concurrent workflows and those that handle concurrency by default. Specifically:

  1. Introduce a generate method. This method would be intended for simple, non-concurrent workflows. It would clearly indicate to users that it is not designed for concurrent flow generation.
  2. Enhance the create method. Modify the create method to handle concurrency properly by default. This would involve ensuring that concurrent tasks are managed and their emissions are correctly handled to prevent missing items.

By introducing the generate method and enhancing the create method to handle concurrency, users would have the opportunity to select an appropriate approach for handling their tasks.
This would provide functionality similar to the distinction between flow and channelFlow in Kotlin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant