Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-generic pipes #19

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/libs/H.Formatters.Inferno/PipeClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static class PipeClientExtensions
Action<Exception>? exceptionAction = null)
{
client = client ?? throw new ArgumentNullException(nameof(client));
client.Connected += async (o, args) =>
client.Connected += async (_, args) =>
{
try
{
Expand All @@ -36,24 +36,26 @@ public static class PipeClientExtensions
using var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = source.Token;

var client = new SingleConnectionPipeClient<byte[]>(pipeName, args.Connection.ServerName, formatter: args.Connection.Formatter);
var client = new SingleConnectionPipeClient(pipeName, args.Connection.ServerName, formatter: args.Connection.Formatter);

client.ExceptionOccurred += (_, args) =>
{
Debug.WriteLine($"{nameof(EnableEncryption)} client returns exception: {args.Exception}");

exceptionAction?.Invoke(args.Exception);
};

await using (client.ConfigureAwait(false))
{
using var _keyPair = new KeyPair();
await client.WriteAsync(_keyPair.PublicKey, cancellationToken).ConfigureAwait(false);
using var keyPair = new KeyPair();
await client.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false);

var response = await client.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var response = await client.WaitMessageAsync<byte[]>(cancellationToken: cancellationToken).ConfigureAwait(false);
var serverPublicKey = response.Message;

args.Connection.Formatter = new InfernoFormatter(
args.Connection.Formatter,
_keyPair.GenerateSharedKey(serverPublicKey));
keyPair.GenerateSharedKey(serverPublicKey));
}
}
catch (Exception exception)
Expand All @@ -65,7 +67,7 @@ await using (client.ConfigureAwait(false))
exceptionAction?.Invoke(exception);
}
};
client.Disconnected += (o, args) =>
client.Disconnected += (_, args) =>
{
if (args.Connection.Formatter is not InfernoFormatter infernoFormatter)
{
Expand Down
5 changes: 2 additions & 3 deletions src/libs/H.Formatters.Inferno/PipeConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ public static class PipeConnectionExtensions
/// <summary>
/// Waits key exchange.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="connection"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="ArgumentNullException"></exception>
public static async Task WaitExchangeAsync<T>(
this PipeConnection<T> connection,
public static async Task WaitExchangeAsync(
this IPipeConnection connection,
CancellationToken cancellationToken = default)
{
connection = connection ?? throw new ArgumentNullException(nameof(connection));
Expand Down
6 changes: 4 additions & 2 deletions src/libs/H.Formatters.Inferno/PipeServerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ public static class PipeServerExtensions
var cancellationToken = source.Token;

var pipeName = $"{args.Connection.PipeName}_Inferno";
var server = new SingleConnectionPipeServer<byte[]>(pipeName, args.Connection.Formatter);
var server = new SingleConnectionPipeServer(pipeName, args.Connection.Formatter);

server.ExceptionOccurred += (_, args) =>
{
Debug.WriteLine($"{nameof(EnableEncryption)} server returns exception: {args.Exception}");

exceptionAction?.Invoke(args.Exception);
};

await using (server.ConfigureAwait(false))
{
await server.StartAsync(cancellationToken).ConfigureAwait(false);

var response = await server.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var response = await server.WaitMessageAsync<byte[]>(cancellationToken: cancellationToken).ConfigureAwait(false);
var clientPublicKey = response.Message;

using var keyPair = new KeyPair();
Expand Down
2 changes: 1 addition & 1 deletion src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static class PipeServerExtensions
{
/// <summary>
/// Sets <see cref="PipeSecurity"/>'s for each <see cref="NamedPipeServerStream"/> that will be created by <see cref="PipeServer{T}"/> <br/>
/// Overrides <see cref="PipeServer{T}.CreatePipeStreamFunc"/>
/// Overrides <see cref="PipeServer.CreatePipeStreamFunc"/>
/// </summary>
/// <param name="server"></param>
/// <param name="pipeSecurity"></param>
Expand Down
7 changes: 3 additions & 4 deletions src/libs/H.Pipes/Args/ConnectionEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
/// <summary>
/// Handles new connections.
/// </summary>
/// <typeparam name="T">Reference type</typeparam>
public class ConnectionEventArgs<T> : EventArgs
public class ConnectionEventArgs : EventArgs
{
/// <summary>
/// Connection
/// </summary>
public PipeConnection<T> Connection { get; }
public PipeConnection Connection { get; }

/// <summary>
///
/// </summary>
/// <param name="connection"></param>
public ConnectionEventArgs(PipeConnection<T> connection)
public ConnectionEventArgs(PipeConnection connection)
{
Connection = connection ?? throw new ArgumentNullException(nameof(connection));
}
Expand Down
5 changes: 2 additions & 3 deletions src/libs/H.Pipes/Args/ConnectionExceptionEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
/// <summary>
/// Handles exceptions thrown during read/write operations.
/// </summary>
/// <typeparam name="T">Reference type</typeparam>
public class ConnectionExceptionEventArgs<T> : ConnectionEventArgs<T>
public class ConnectionExceptionEventArgs : ConnectionEventArgs
{
/// <summary>
/// The exception that was thrown
Expand All @@ -16,7 +15,7 @@ public class ConnectionExceptionEventArgs<T> : ConnectionEventArgs<T>
/// </summary>
/// <param name="connection"></param>
/// <param name="exception"></param>
public ConnectionExceptionEventArgs(PipeConnection<T> connection, Exception exception) : base(connection)
public ConnectionExceptionEventArgs(PipeConnection connection, Exception exception) : base(connection)
{
Exception = exception ?? throw new ArgumentNullException(nameof(exception));
}
Expand Down
10 changes: 7 additions & 3 deletions src/libs/H.Pipes/Args/ConnectionMessageEventArgs.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
namespace H.Pipes.Args;
using H.Formatters;
using H.Pipes.Extensions;

namespace H.Pipes.Args;


/// <summary>
/// Handles messages received from a named pipe.
/// </summary>
/// <typeparam name="T">Reference type</typeparam>
public class ConnectionMessageEventArgs<T> : ConnectionEventArgs<T>
public class ConnectionMessageEventArgs<T> : ConnectionEventArgs
{
/// <summary>
/// Message sent by the other end of the pipe
Expand All @@ -16,7 +20,7 @@ public class ConnectionMessageEventArgs<T> : ConnectionEventArgs<T>
/// </summary>
/// <param name="connection"></param>
/// <param name="message"></param>
public ConnectionMessageEventArgs(PipeConnection<T> connection, T message) : base(connection)
public ConnectionMessageEventArgs(PipeConnection connection, T message) : base(connection)
{
Message = message;
}
Expand Down
10 changes: 5 additions & 5 deletions src/libs/H.Pipes/Extensions/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ public static class ConnectionExtensions
/// <param name="cancellationToken"></param>
/// <exception cref="OperationCanceledException"></exception>
/// <returns></returns>
public static async Task<ConnectionMessageEventArgs<T>> WaitMessageAsync<T>(this IPipeConnection<T> connection, Func<CancellationToken, Task>? func = null, CancellationToken cancellationToken = default)
public static async Task<ConnectionMessageEventArgs<T>> WaitMessageAsync<T>(this IPipe connection, Func<CancellationToken, Task>? func = null, CancellationToken cancellationToken = default)
{
return await connection.WaitEventAsync<ConnectionMessageEventArgs<T>>(
func ?? (token => Task.Delay(TimeSpan.Zero, cancellationToken)),
nameof(connection.MessageReceived),
func ?? (_ => Task.Delay(TimeSpan.Zero, cancellationToken)),
nameof(IPipe.MessageReceived),
cancellationToken).ConfigureAwait(false);
}

Expand All @@ -35,10 +35,10 @@ public static async Task<ConnectionMessageEventArgs<T>> WaitMessageAsync<T>(this
/// <param name="func"></param>
/// <exception cref="OperationCanceledException"></exception>
/// <returns></returns>
public static async Task<ConnectionMessageEventArgs<T>> WaitMessageAsync<T>(this IPipeConnection<T> connection, TimeSpan timeout, Func<CancellationToken, Task>? func = null)
public static async Task<ConnectionMessageEventArgs<T>> WaitMessageAsync<T>(this IPipe connection, TimeSpan timeout, Func<CancellationToken, Task>? func = null)
{
using var tokenSource = new CancellationTokenSource(timeout);

return await connection.WaitMessageAsync(func, tokenSource.Token).ConfigureAwait(false);
return await connection.WaitMessageAsync<T>(func, tokenSource.Token).ConfigureAwait(false);
}
}
60 changes: 60 additions & 0 deletions src/libs/H.Pipes/Extensions/FormatterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using H.Formatters;

namespace H.Pipes.Extensions;

/// <summary>
/// Class FormatterExtensions.
/// </summary>
public static class FormatterExtensions
{
#region Methods

/// <summary>
/// Uses the <paramref name="formatter" /> to serialize the given object into a byte
/// array.
/// </summary>
/// <typeparam name="T">Object type</typeparam>
/// <param name="value">The object instance.</param>
/// <param name="formatter">The formatter</param>
/// <param name="cancellationToken">
/// The cancellation token that can be used by other objects or
/// threads to receive notice of cancellation.
/// </param>
/// <returns>Serialized object.</returns>
public static async Task<byte[]> SerializeAsync<T>(
this IFormatter formatter,
T value,
CancellationToken cancellationToken)
{
if (formatter == null)
throw new ArgumentNullException(nameof(formatter));

return formatter is IAsyncFormatter asyncFormatter
? await asyncFormatter.SerializeAsync(value, cancellationToken).ConfigureAwait(false)
: formatter.Serialize(value);
}

/// <summary>Deserializes the bytes into the specified type using <paramref name="formatter" />.</summary>
/// <typeparam name="T"></typeparam>
/// <param name="bytes">The bytes.</param>
/// <param name="formatter">The formatter.</param>
/// <param name="cancellationToken">
/// The cancellation token that can be used by other objects or
/// threads to receive notice of cancellation.
/// </param>
/// <returns>System.Nullable&lt;T&gt;.</returns>
public static async Task<T?> DeserializeAsync<T>(
this IFormatter formatter,
byte[] bytes,
CancellationToken cancellationToken)
{
if (formatter == null)
throw new ArgumentNullException(nameof(formatter));

return formatter is IAsyncFormatter asyncFormatter
? await asyncFormatter.DeserializeAsync<T>(bytes, cancellationToken).ConfigureAwait(false)
: formatter.Deserialize<T>(bytes);
}

#endregion
}
53 changes: 53 additions & 0 deletions src/libs/H.Pipes/IPipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using H.Formatters;
using H.Pipes.Args;

namespace H.Pipes;

/// <summary>
/// Base class of all connections
/// </summary>
public interface IPipe : IAsyncDisposable
{
#region Properties

/// <summary>
/// Used formatter
/// </summary>
public IFormatter Formatter { get; }

#endregion

#region Events

/// <summary>
/// Invoked whenever a message is received.
/// </summary>
event EventHandler<ConnectionMessageEventArgs<byte[]?>>? MessageReceived;

/// <summary>
/// Invoked whenever an exception is thrown during a read or write operation on the named pipe.
/// </summary>
event EventHandler<ExceptionEventArgs>? ExceptionOccurred;

#endregion

#region Methods

/// <summary>
/// Sends a message over a named pipe. <br/>
/// </summary>
/// <param name="value">Message to send</param>
/// <param name="cancellationToken"></param>
/// <exception cref="InvalidOperationException"></exception>
Task WriteAsync(byte[] value, CancellationToken cancellationToken = default);

/// <summary>
/// Sends a message to all connected clients asynchronously.
/// This method returns immediately, possibly before the message has been sent to all clients.
/// </summary>
/// <param name="value"></param>
/// <param name="cancellationToken"></param>
Task WriteAsync<T>(T value, CancellationToken cancellationToken = default);

#endregion
}
40 changes: 35 additions & 5 deletions src/libs/H.Pipes/IPipeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,40 @@
namespace H.Pipes;

/// <summary>
/// Wraps a <see cref="NamedPipeClientStream"/>.
/// Specialized version of <see cref="IPipeClient"/> for communications based
/// on a single type
/// </summary>
/// <typeparam name="T">Reference type to read/write from the named pipe</typeparam>
public interface IPipeClient<T> : IPipeConnection<T>
/// <seealso cref="H.Pipes.IPipeClient" />
public interface IPipeClient<T> : IPipeClient
{

#region Events

/// <summary>
/// Invoked whenever a message is received.
/// </summary>
new event EventHandler<ConnectionMessageEventArgs<T?>>? MessageReceived;

#endregion

#region Methods

/// <summary>
/// Sends a message over a named pipe. <br/>
/// </summary>
/// <param name="value">Message to send</param>
/// <param name="cancellationToken"></param>
/// <exception cref="InvalidOperationException"></exception>
Task WriteAsync(T value, CancellationToken cancellationToken = default);

#endregion
}

/// <summary>
/// Wraps a <see cref="NamedPipeClientStream"/>.
/// </summary>
public interface IPipeClient : IPipe
{
#region Properties

Expand Down Expand Up @@ -46,7 +76,7 @@ public interface IPipeClient<T> : IPipeConnection<T>
/// <summary>
/// Active connection.
/// </summary>
public PipeConnection<T>? Connection { get; }
public PipeConnection? Connection { get; }

#endregion

Expand All @@ -55,12 +85,12 @@ public interface IPipeClient<T> : IPipeConnection<T>
/// <summary>
/// Invoked after each the client connect to the server (include reconnects).
/// </summary>
event EventHandler<ConnectionEventArgs<T>>? Connected;
event EventHandler<ConnectionEventArgs>? Connected;

/// <summary>
/// Invoked when the client disconnects from the server (e.g., the pipe is closed or broken).
/// </summary>
event EventHandler<ConnectionEventArgs<T>>? Disconnected;
event EventHandler<ConnectionEventArgs>? Disconnected;

#endregion

Expand Down
Loading