Skip to content

Commit

Permalink
thread safe broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemurray committed May 24, 2024
1 parent 15bf22a commit 7d72f43
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class PersonArgConstructor
## Fixes

- Fix issue where a subscription execution had access to a disposed `IServiceProvider`
- `Broadcaster` is thread safe when removing observers

# 5.3.0

Expand Down
27 changes: 19 additions & 8 deletions src/EntityGraphQL/Subscriptions/Broadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ namespace EntityGraphQL.Subscriptions;
public class Broadcaster<TType> : IObservable<TType>, IDisposable
{
public List<IObserver<TType>> Subscribers { get; } = [];

public Action<IObserver<TType>>? OnUnsubscribe { get; set; }

/// <summary>
Expand All @@ -42,7 +41,10 @@ public class Broadcaster<TType> : IObservable<TType>, IDisposable
/// <returns></returns>
public virtual IDisposable Subscribe(IObserver<TType> observer)
{
Subscribers.Add(observer);
lock (Subscribers)
{
Subscribers.Add(observer);
}
return new GraphQLSubscription<TType>(this, observer);
}

Expand All @@ -61,26 +63,35 @@ public virtual void Unsubscribe(IObserver<TType> observer)
/// <param name="value"></param>
public virtual void OnNext(TType value)
{
foreach (var observer in Subscribers)
lock (Subscribers)
{
observer.OnNext(value);
foreach (var observer in Subscribers)
{
observer.OnNext(value);
}
}
}

public virtual void OnError(Exception ex)
{
foreach (var observer in Subscribers)
lock (Subscribers)
{
observer.OnError(ex);
foreach (var observer in Subscribers)
{
observer.OnError(ex);
}
}
}

public virtual void Dispose()
{
GC.SuppressFinalize(this);
foreach (var observer in Subscribers)
lock (Subscribers)
{
observer.OnCompleted();
foreach (var observer in Subscribers)
{
observer.OnCompleted();
}
}
}
}

0 comments on commit 7d72f43

Please sign in to comment.