Skip to content

Commit

Permalink
refactor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiningRush committed Jan 15, 2018
1 parent 0958875 commit faf458d
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 14 deletions.
45 changes: 45 additions & 0 deletions src/ServiceAnt/IServiceBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ public enum LogLevel
/// <param name="ex">if happended exception, it will return it</param>
public delegate void LogBusMessage(LogLevel msgLevel, string message, Exception ex);

/// <summary>
/// TriggerOpion
/// </summary>
public class TriggerOption
{
/// <summary>
/// this value indicate whether ignore exception when triggering
/// </summary>
public bool IsIgnoreException { get; set; }

/// <summary>
/// ctor
/// </summary>
public TriggerOption(bool isIgnoreException = true)
{
IsIgnoreException = isIgnoreException;
}
}

/// <summary>
/// It used to publish event or send a request
/// </summary>
Expand All @@ -51,6 +70,14 @@ public interface IServiceBus : IAddSubscription, IAddRequestHandler
/// <returns></returns>
Task Publish(IEventTrigger @event);

/// <summary>
/// Publish a event
/// </summary>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
Task Publish(IEventTrigger @event, TriggerOption triggerOption);

/// <summary>
/// Send a request sync
/// </summary>
Expand All @@ -59,12 +86,30 @@ public interface IServiceBus : IAddSubscription, IAddRequestHandler
/// <returns></returns>
T Send<T>(IRequestTrigger @event);

/// <summary>
/// Send a request sync
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
T Send<T>(IRequestTrigger @event, TriggerOption triggerOption);

/// <summary>
/// Send a request async
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <returns></returns>
Task<T> SendAsync<T>(IRequestTrigger @event);

/// <summary>
/// Send a request async
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
Task<T> SendAsync<T>(IRequestTrigger @event, TriggerOption triggerOption);
}
}
59 changes: 47 additions & 12 deletions src/ServiceAnt/InProcessServiceBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,18 @@ public void RemoveDynamicSubscription(string eventName, Func<dynamic, Task> acti
/// <returns></returns>
public Task Publish(IEventTrigger @event)
{
var asyncTask = ProcessEvent(_subcriptionManager.GetEventName(@event.GetType()), JsonConvert.SerializeObject(@event));
return asyncTask;
return Publish(@event, new TriggerOption());
}

/// <summary>
/// Publish a event
/// </summary>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
public Task Publish(IEventTrigger @event, TriggerOption triggerOption)
{
return ProcessEvent(_subcriptionManager.GetEventName(@event.GetType()), JsonConvert.SerializeObject(@event), triggerOption);
}

/// <summary>
Expand All @@ -132,11 +142,11 @@ public Task Publish(IEventTrigger @event)
/// <param name="event"></param>
public void PublishSync(IEventTrigger @event)
{
var asyncTask = ProcessEvent(_subcriptionManager.GetEventName(@event.GetType()), JsonConvert.SerializeObject(@event));
var asyncTask = Publish(@event, new TriggerOption());
asyncTask.Wait();
}

private async Task ProcessEvent(string eventName, string message)
private async Task ProcessEvent(string eventName, string message, TriggerOption triggerOption)
{
var handlerFactories = _subcriptionManager.GetHandlerFactoriesForEvent(eventName);
foreach (var aHandlerFactory in handlerFactories)
Expand Down Expand Up @@ -165,6 +175,8 @@ private async Task ProcessEvent(string eventName, string message)
catch (Exception ex)
{
LogMessage( LogLevel.ERROR, "There has caught a error when publishing event.", ex);
if (!triggerOption.IsIgnoreException)
throw ex;
}
}
}
Expand Down Expand Up @@ -244,7 +256,19 @@ public void RemoveDynamicRequestHandler(string eventName, Func<dynamic, IRequest
/// <returns></returns>
public T Send<T>(IRequestTrigger @event)
{
var asyncTask = ProcessRequest<T>(_requestHandlerManager.GetRequestName(@event.GetType()), JsonConvert.SerializeObject(@event));
return Send<T>(@event, new TriggerOption());
}

/// <summary>
/// Send a request sync
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
public T Send<T>(IRequestTrigger @event, TriggerOption triggerOption)
{
var asyncTask = SendAsync<T>(@event, triggerOption);
asyncTask.ConfigureAwait(false);
return asyncTask.Result;
}
Expand All @@ -255,12 +279,24 @@ public T Send<T>(IRequestTrigger @event)
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <returns></returns>
public async Task<T> SendAsync<T>(IRequestTrigger @event)
public Task<T> SendAsync<T>(IRequestTrigger @event)
{
return SendAsync<T>(@event, new TriggerOption(false));
}

/// <summary>
/// Send a request async
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="triggerOption"></param>
/// <returns></returns>
public Task<T> SendAsync<T>(IRequestTrigger @event, TriggerOption triggerOption)
{
return await ProcessRequest<T>(_requestHandlerManager.GetRequestName(@event.GetType()), JsonConvert.SerializeObject(@event));
return ProcessRequest<T>(_requestHandlerManager.GetRequestName(@event.GetType()), JsonConvert.SerializeObject(@event), triggerOption);
}

private async Task<T> ProcessRequest<T>(string eventName, string message)
private async Task<T> ProcessRequest<T>(string eventName, string message, TriggerOption triggerOption)
{
var handlerFactories = _requestHandlerManager.GetHandlerFactoriesForRequest(eventName);
var requestContext = new RequestHandlerContext();
Expand Down Expand Up @@ -296,6 +332,8 @@ private async Task<T> ProcessRequest<T>(string eventName, string message)
catch (Exception ex)
{
LogMessage(LogLevel.ERROR, "There has raised a error when send request.", ex);
if (!triggerOption.IsIgnoreException)
throw ex;
}
}

Expand All @@ -306,10 +344,7 @@ private async Task<T> ProcessRequest<T>(string eventName, string message)

private void LogMessage(LogLevel type, string value, Exception ex)
{
if (OnLogBusMessage != null)
OnLogBusMessage(type, value, ex);
else
throw ex;
OnLogBusMessage?.Invoke(type, value, ex);
}
}
}
22 changes: 20 additions & 2 deletions test/ServiceAnt.test/InProcessEventBus_Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void MutipleGenericRequestHandler_ByDifferentNameSpace_ShouldResponse()

[TestMethod]
[ExpectedException(typeof(Exception))]
public async Task ShouldRaiseException_WhenNoLogDelate()
public async Task ShouldRaiseException_WhenPublishSetOption()
{
var eventBus = new InProcessServiceBus();

Expand All @@ -476,7 +476,25 @@ public async Task ShouldRaiseException_WhenNoLogDelate()
});

var testEventData = new TestEventData() { Msg = "success" };
await eventBus.Publish(testEventData);
await eventBus.Publish(testEventData, new TriggerOption(false));
}

[TestMethod]
[ExpectedException(typeof(Exception))]
public async Task ShouldRaiseException_WhenSendSetOption()
{
var eventBus = new InProcessServiceBus();

eventBus.AddDynamicRequestHandler(typeof(TestRequestData).Name, (requestData, requetContext) =>
{
return Task.Run(() =>
{
throw new Exception("Test Exception");
});
});

var testRequestData = new TestRequestData() { Msg = "success" };
await eventBus.SendAsync<string>(testRequestData, new TriggerOption(false));
}

[TestMethod]
Expand Down

0 comments on commit faf458d

Please sign in to comment.