Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Source.grouped* operators #158

Merged
merged 24 commits into from
Jun 26, 2024
Merged

Add Source.grouped* operators #158

merged 24 commits into from
Jun 26, 2024

Conversation

micossow
Copy link
Contributor

closes #34

@micossow micossow changed the title grouped Source operator Add Source.grouped* operators Jun 24, 2024
@micossow micossow marked this pull request as ready for review June 24, 2024 08:12
core/src/main/scala/ox/channels/SourceOps.scala Outdated Show resolved Hide resolved
core/src/main/scala/ox/channels/SourceOps.scala Outdated Show resolved Hide resolved
core/src/main/scala/ox/channels/SourceOps.scala Outdated Show resolved Hide resolved
core/src/main/scala/ox/channels/SourceOps.scala Outdated Show resolved Hide resolved
if isValue then
buffer = Vector.empty
accumulatedCost = 0
timeoutFork.foreach(_.cancelNow())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We should cancel even if isValue is false, just start new fork if it's true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I made the fork conditional only - buffer should look empty when channel closes

else true
case Received(t) =>
buffer = buffer :+ t
accumulatedCost += costFn(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Since costFn is provided by the user, maybe we should handle it with try/catch? Then c2.errorOrClosed with catched NonFatal exception, cancel the fork and gently exit the while loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the same try/catch pattern as in other operators (these catch Throwable)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I made this mistake again. Indeed we intentionally catch all Throwables, here's a nice explanation why.

receiveOrClosed() match
case ChannelClosed.Done =>
if buffer.nonEmpty then c2.sendOrClosed(buffer).discard
c2.doneOrClosed();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: No need for ;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

else true
case Received(t) =>
buffer = buffer :+ t
accumulatedCost += costFn(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I made this mistake again. Indeed we intentionally catch all Throwables, here's a nice explanation why.

c2.doneOrClosed();
false
case ChannelClosed.Error(r) =>
c2.errorOrClosed(r); false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I think this one-line syntax with ; is useful only if there are multiple concise cases one after another, like

          case ChannelClosed.Done     => c2.doneOrClosed(); false
          case ChannelClosed.Error(r) => c2.errorOrClosed(r); false

Here I would just make this multiline without ;
Similarly in try-catch below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

def groupedWithin(n: Int, duration: FiniteDuration)(using Ox, StageCapacity): Source[Seq[T]] = groupedWeightedWithin(n, duration)(_ => 1)

/** Chunks up the elements into groups received within a time window or limited by the cumulative weight being greater or equal to the
* `minWeight`, whatever happens first. Timeout is counted since the last group has been emitted or no timeout is being counted. If this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The timeout is reset after a group is emitted. If timeout expires and the buffer is empty, nothing is emitted. As soon as a new element is received, the source will emit it as a single-element group and reset the timer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected


/** Chunks up the elements into groups received within a time window or limited by the cumulative weight being greater or equal to the
* `minWeight`, whatever happens first. Timeout is counted since the last group has been emitted or no timeout is being counted. If this
* source is failed then failure is passed to the returned channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If this source is failed... => Upstream error or exception thrown by costFn will result in this Source failing with that error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected

@micossow micossow merged commit 883f546 into master Jun 26, 2024
5 checks passed
@micossow micossow deleted the 34-grouped-operator branch June 26, 2024 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add grouped (with variants) operator
2 participants