Skip to content

Commit

Permalink
feat: Added support for WriteOnly PipeServer.
Browse files Browse the repository at this point in the history
  • Loading branch information
HavenDV committed Dec 8, 2023
1 parent 598067f commit b54523c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/libs/H.Pipes/IO/PipeStreamWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public sealed class PipeStreamWrapper : IDisposable
/// </returns>
public bool CanWrite => BaseStream.CanWrite;

private PipeStream BaseStream { get; }
internal PipeStream BaseStream { get; }
private PipeStreamReader Reader { get; }
private PipeStreamWriter Writer { get; }

Expand Down
5 changes: 5 additions & 0 deletions src/libs/H.Pipes/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public void Start()

ReadWorker = new TaskWorker(async cancellationToken =>
{
if (!PipeStreamWrapper.BaseStream.CanRead)
{
return;
}
while (!cancellationToken.IsCancellationRequested && IsConnected)
{
try
Expand Down
112 changes: 112 additions & 0 deletions src/tests/H.Pipes.Tests/Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,117 @@ public async Task Interrupted()

isConnected.Should().BeTrue();
}

[TestMethod]
public async Task WriteOnlyServer()
{
using var source = new CancellationTokenSource(TimeSpan.FromSeconds(11));
var cancellationToken = source.Token;
var isConnected = false;

var exceptions = new ConcurrentBag<Exception>();
const string pipeName = "wos";
try
{

Console.WriteLine($"PipeName: {pipeName}");

await using var server = new PipeServer<byte[]>(pipeName);
server.CreatePipeStreamFunc = static pipeName => new NamedPipeServerStream(
pipeName,
PipeDirection.Out,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous | PipeOptions.WriteThrough);

server.ClientConnected += async (_, args) =>
{
Console.WriteLine($"Client {args.Connection.PipeName} is now connected!");
try
{
await args.Connection.WriteAsync(new byte[]
{
1, 2, 3, 4, 5,
}, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
exceptions.Add(exception);
}
};
server.ClientDisconnected += (_, args) =>
{
Console.WriteLine($"Client {args.Connection.PipeName} disconnected");
};
server.MessageReceived += (_, args) =>
{
Console.WriteLine($"Client {args.Connection.PipeName} says: {args.Message}");
};
server.ExceptionOccurred += (_, args) => exceptions.Add(args.Exception);

_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
Console.WriteLine($"Sent to {server.ConnectedClients.Count} clients");
await server.WriteAsync(new byte[]
{
1, 2, 3, 4, 5,
}, cancellationToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
exceptions.Add(exception);
}
}
}, cancellationToken);

Console.WriteLine("Server starting...");

await server.StartAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

Console.WriteLine("Server is started!");

var isClientReceivedMessage = new TaskCompletionSource<bool>();
await using var client = new PipeClient<byte[]>(pipeName);
client.MessageReceived += (_, _) =>
{
_ = isClientReceivedMessage.TrySetResult(true);
};
await client.ConnectAsync(cancellationToken);

await isClientReceivedMessage.Task;

isClientReceivedMessage.Task.Result.Should().BeTrue();

isConnected = true;
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
exceptions.Add(exception);
}

if (!exceptions.IsEmpty)
{
throw new AggregateException(exceptions);
}

isConnected.Should().BeTrue();
}
}
#endif

0 comments on commit b54523c

Please sign in to comment.