-
Notifications
You must be signed in to change notification settings - Fork 446
Fix #324 by adding streaming support to the server #461
Conversation
{ | ||
public class Streaming : Hub | ||
{ | ||
[return: Streaming] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remind me why we need the attribute [Streaming]
? Either way I think we can just get rid of it and just always stream IO<T>
and ReadableChannel<T>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally had that thought but when we talked last we decided to use an attribute to make it clear. It's a little less clear around something like IAsyncEnumerable<T>
. Do you want to load up all the data before returning a result or stream it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if you think it's clear that IAsyncEnumerable<T>
should be streamed, we can get rid of the attribute.
@@ -295,6 +307,68 @@ public class HubEndPoint<THub, TClient> : EndPoint, IInvocationBinder where THub | |||
} | |||
} | |||
|
|||
private bool IsChannel(Type type, out Type payloadType) | |||
{ | |||
var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look very efficient 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll go away when we have IAsyncEnumerable<T>
, because we can just cast and call GetEnumerator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we get C# 8? 😄 We'll need to make this better in the mean time, file an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be fair, the type doesn't have to wait for C# 8, and wouldn't require a new netstandard version.
else | ||
{ | ||
// This type cannot be streamed | ||
throw new NotSupportedException($"Cannot stream results of type: {result.GetType().FullName}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Observable of 1 item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep the attribute, sure. But if we remove the attribute then single results just won't be streamed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes let's nuke it
while (!_output.TryWrite(value)) | ||
{ | ||
// Wait for a spot | ||
if (!_output.WaitToWriteAsync(_cancellationToken).Result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task queue would be better...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we still have to block this thread to create backpressure. Otherwise the caller will just keep invoking OnNext and the tasks will queue up. That's why IObservable
is less ideal. If we're OK overrunning the Task Queue, then we can just queue things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but the you can block after some threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that threshold just defined by the channel we create? Right now it's unbounded, but we can create a bounded channel. Also, how does the blocking work with the Task queue? Do we need to track how many outstanding tasks we have in the Adapter and then block when it hits a certain threshold?
Using the channel for this seems simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that makes sense.
We need either the C# client or the typescript client to be part of this PR. Otherwise I'm not sure we have anything real. |
Running the ChannelCounter fails in the sample. You need more tests:
I did this hackery to make it work: public static IAsyncEnumerator<object> FromChannel(object channel, Type itemType)
{
return (IAsyncEnumerator<object>)_fromChannelMethod
.MakeGenericMethod(itemType)
.Invoke(null, new[] { channel });
}
public static IAsyncEnumerator<object> FromChannel<T>(ReadableChannel<T> readableChannel)
{
return FromObservable(readableChannel.AsObservable());
} The co-variance doesn't work for value types so |
We need to review the client API before we can go very far there. Does the client have to call a different method? |
|
I'm going to drop the TS client and spike out a C# client API to unblock the PR. The TS client requires building a whole primitive for streaming, whereas the C# one does not. |
Isn't "out" - co-variant and "in" contravariant? |
That's totally what I said. Check again. |
@@ -9,6 +9,9 @@ | |||
</PropertyGroup> | |||
|
|||
<ItemGroup> | |||
</ItemGroup> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up
@@ -18,5 +18,11 @@ | |||
<li><a href="hubs.html?transport=ServerSentEvents">Server Sent Events</a></li> | |||
<li><a href="hubs.html?transport=WebSockets">Web Sockets</a></li> | |||
</ul> | |||
<h1>ASP.NET SignalR (Streaming)</h1> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add Core
to all these
@@ -0,0 +1,87 @@ | |||
using System; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copyright
Small incomplete update to fix simple comments (Channel streaming co-variance, some JavaScript bugs). I think I'm going to add automated tests for the TypeScript client and then look in to C#. Still to come:
|
@@ -276,17 +281,24 @@ public class HubEndPoint<THub, TClient> : EndPoint, IInvocationBinder where THub | |||
result = methodExecutor.Execute(hub, invocationMessage.Arguments); | |||
} | |||
|
|||
return CompletionMessage.WithResult(invocationMessage.InvocationId, result); | |||
if (hasResult && IsStreamed(methodExecutor, result, out var channel)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think hasResult
will be true here
for void
methods. Not sure if this is harmful but it will be surprising if someone needs to investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I don't know what the value of result
here is but if it is null than we will possibly fail in a weird way (like NRE) if someone puts the attribute on a void method.
} | ||
|
||
public next(item: T): void { | ||
for (let observer of this.observers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these loops could be just forEach
invocationCounter += 1; | ||
addLine('resultsList', `running ${method}(${id}) ...`); | ||
connection.stream(method, 10, (Math.random() * 5) * 200).subscribe({ | ||
closed: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed at all? or should be just null because we don't maintain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's part of the interface, but I believe it's optional.
var channel = Channel.CreateUnbounded<object>(); | ||
var cancellationTokenSource = new CancellationTokenSource(); | ||
|
||
var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Out, cancellationTokenSource.Token)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just CancellationToken.None
instead of cts.Token
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because at one point I planned to cancel it, but it turns out there's nowhere to do so. Will fix :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You actually do need to make cancellation work here. I was just thinking about this a bit. The token here needs to represent the connection lifetime. If you're streaming and the connection dies, we need to notify something. There are some challenges though:
- If the particular stream was created as part of that invocation, then completing the stream makes sense if the connection goes away.
- If the stream is shared, you can't assume that ending a single connection should kill the stream.
This means that we need to expose a cancellation token on the connection so that the hub can choose what to do when a connection goes away. We can also think about ways to make things a bit more automatic (something declarative).
if (!_output.WaitToWriteAsync(_cancellationToken).Result) | ||
{ | ||
// Channel was closed. | ||
throw new InvalidOperationException("Output channel was closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this crash the process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will crash the thing that drives the Observer, which depends on the underlying implementation of IObservable. We could suppress the exception and just drop items I guess.
@@ -518,6 +600,63 @@ private IServiceProvider CreateServiceProvider(Action<ServiceCollection> addServ | |||
return services.BuildServiceProvider(); | |||
} | |||
|
|||
public class StreamingHub : TestHub | |||
{ | |||
[return: Streaming] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
e9660f4
to
2f6790b
Compare
🆙 📅 and rebase. I got the C# client support added and tests in place for it and the TS client. |
build/dependencies.props
Outdated
@@ -12,6 +12,7 @@ | |||
<RedisVersion>1.2.3</RedisVersion> | |||
<TestSdkVersion>15.3.0-*</TestSdkVersion> | |||
<XunitVersion>2.3.0-beta2-*</XunitVersion> | |||
<RxVersion>3.1.1</RxVersion> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need permission to use this?
/cc @Eilon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shoot, I meant to wait to push this until I got confirmation from @Eilon, but I've already started the process with him.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got approval from @Eilon
public CancellationToken CancellationToken { get; } | ||
public string InvocationId { get; } | ||
|
||
public Task<object> Task => _completionSource?.Task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A null Task is pretty crappy. Make it return a completed task if there's nothing.
public InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, bool streaming) | ||
{ | ||
_logger = loggerFactory.CreateLogger<InvocationRequest>(); | ||
_cancellationTokenRegistration = cancellationToken.Register(() => _completionSource.TrySetCanceled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove allocation.
|
||
public void StreamItem(object item) | ||
{ | ||
if(_streaming) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space
_completionSource.TrySetCanceled(); | ||
} | ||
|
||
_cancellationTokenRegistration.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure things like this don't dead lock (because dispose waits on cancel callbacks to finish).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TCS is set to run continuations asynchronously, and firing that CTS is the only cancel callback.
|
||
private async Task StreamResultsAsync(string invocationId, Connection connection, IHubProtocol protocol, IAsyncEnumerator<object> enumerator) | ||
{ | ||
// TODO: Cancellation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I like changing the client representation to Channel for now (on both the server and client). I think we should commit what you have in this PR and keep iterating. |
The server representation is actually |
479e832
to
16b03e9
Compare
Rebased and reworked the client to use Channel as the primitive for Streaming. Will merge when I have green builds (as per @davidfowl 's comment), and we can iterate on it moving forward. |
_subject.TryOnNext(item); | ||
while(!_channel.Out.TryWrite(item)) | ||
{ | ||
if(!await _channel.Out.WaitToWriteAsync()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spaces (and on ln. 99)
@@ -115,7 +116,7 @@ public void On(string methodName, Type[] parameterTypes, Action<object[]> handle | |||
_handlers.AddOrUpdate(methodName, invocationHandler, (_, __) => invocationHandler); | |||
} | |||
|
|||
public IObservable<object> Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args) | |||
public ReadableChannel<object> Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args) | |||
{ | |||
var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var observable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename observable to channel
@@ -203,7 +204,7 @@ private void OnDataReceived(byte[] data, MessageType messageType) | |||
_logger.LogWarning("Dropped unsolicited Stream Item message for invocation '{invocationId}'", streamItem.InvocationId); | |||
return; | |||
} | |||
DispatchInvocationStreamItem(streamItem, irq); | |||
DispatchInvocationStreamItemAsync(streamItem, irq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this introduces a race which can result in items not being reported in order. We might need a queue for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to consider ordering of streamed items, that's a whole new design concept as the frames could get reordered in transit (so it doesn't even matter how we queue on the client). It's not necessarily required, as the user can stream items tagged with an order and do buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what circumstances would frames be reordered in transit?
{ | ||
_logger.LogTrace("Received StreamItem for Invocation #{invocationId}", streamItem.InvocationId); | ||
|
||
if (irq.CancellationToken.IsCancellationRequested) | ||
{ | ||
_logger.LogTrace("Canceling dispatch of StreamItem message for Invocation {invocationId}. The invocation was cancelled.", irq.InvocationId); | ||
} | ||
else | ||
else if (!await irq.StreamItem(streamItem.Item)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only in the Streaming case, if someone else closes the write end of the channel (which only we control). I'll add some guard code though since this runs in a background thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - I was afraid of an unhandled exception on a background thread which would crash the process and which could not be caught/handled in any way.
_completionSource.TrySetException(new InvalidOperationException("Streaming methods must be invoked using HubConnection.Stream")); | ||
|
||
// We "delivered" the stream item successfully as far as the caller cares | ||
return Task.FromResult(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't cached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird MSBuild hang:
|
I have seen this happen twice yesterday. I don't know how to get this file (or if it is even possible): |
We've been running on the container-based infrastructure across the platform for a while now and problems haven't been that consistent. Plus, MSBuild should work in containers :). |
I agree in general. I was just wondering if because it's beta it still may have some issues where a process can be randomly killed which would affect the build. |
There are a few small WIP items:
StreamingAttribute
to target the method, not the return value. The return value may be the technically correct place to annotate this but the method is the intuitive placeWe only support
ReadableChannel<T>
andIObservable<T>
right now.Stream
is a little trickier so I haven't done it yet.