-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Source.grouped* operators #158
Conversation
…ter group is emitted
…n timed-out state
if isValue then | ||
buffer = Vector.empty | ||
accumulatedCost = 0 | ||
timeoutFork.foreach(_.cancelNow()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: We should cancel even if isValue
is false, just start new fork if it's true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I made the fork conditional only - buffer should look empty when channel closes
else true | ||
case Received(t) => | ||
buffer = buffer :+ t | ||
accumulatedCost += costFn(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: Since costFn
is provided by the user, maybe we should handle it with try/catch
? Then c2.errorOrClosed
with catched NonFatal
exception, cancel the fork and gently exit the while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the same try/catch pattern as in other operators (these catch Throwable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I made this mistake again. Indeed we intentionally catch all Throwables, here's a nice explanation why.
receiveOrClosed() match | ||
case ChannelClosed.Done => | ||
if buffer.nonEmpty then c2.sendOrClosed(buffer).discard | ||
c2.doneOrClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: No need for ;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
else true | ||
case Received(t) => | ||
buffer = buffer :+ t | ||
accumulatedCost += costFn(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I made this mistake again. Indeed we intentionally catch all Throwables, here's a nice explanation why.
c2.doneOrClosed(); | ||
false | ||
case ChannelClosed.Error(r) => | ||
c2.errorOrClosed(r); false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I think this one-line syntax with ;
is useful only if there are multiple concise cases one after another, like
case ChannelClosed.Done => c2.doneOrClosed(); false
case ChannelClosed.Error(r) => c2.errorOrClosed(r); false
Here I would just make this multiline without ;
Similarly in try-catch below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
def groupedWithin(n: Int, duration: FiniteDuration)(using Ox, StageCapacity): Source[Seq[T]] = groupedWeightedWithin(n, duration)(_ => 1) | ||
|
||
/** Chunks up the elements into groups received within a time window or limited by the cumulative weight being greater or equal to the | ||
* `minWeight`, whatever happens first. Timeout is counted since the last group has been emitted or no timeout is being counted. If this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The timeout is reset after a group is emitted. If timeout expires and the buffer is empty, nothing is emitted. As soon as a new element is received, the source will emit it as a single-element group and reset the timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected
|
||
/** Chunks up the elements into groups received within a time window or limited by the cumulative weight being greater or equal to the | ||
* `minWeight`, whatever happens first. Timeout is counted since the last group has been emitted or no timeout is being counted. If this | ||
* source is failed then failure is passed to the returned channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: If this source is failed...
=> Upstream error or exception thrown by costFn
will result in this Source
failing with that error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected
closes #34