-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1295 from SpineEventEngine/improve-command-filter…
…-rejections-support Improve support of rejections thrown from the `CommandBus` filters
- Loading branch information
Showing
42 changed files
with
1,414 additions
and
221 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright 2020, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
package io.spine.grpc; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.grpc.stub.StreamObserver; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
|
||
/** | ||
* A stream observer which delegates calls to multiple other observers with the same target type. | ||
* | ||
* <p>The observers are called in the exact same order in which they are specified on the object | ||
* construction. | ||
* | ||
* @param <T> | ||
* the observed type | ||
*/ | ||
public final class CompositeObserver<T> implements StreamObserver<T> { | ||
|
||
private final ImmutableList<StreamObserver<? super T>> observers; | ||
|
||
public CompositeObserver(Iterable<StreamObserver<? super T>> observers) { | ||
checkNotNull(observers); | ||
this.observers = ImmutableList.copyOf(observers); | ||
} | ||
|
||
@Override | ||
public void onNext(T value) { | ||
observers.forEach(o -> o.onNext(value)); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
observers.forEach(o -> o.onError(t)); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
observers.forEach(StreamObserver::onCompleted); | ||
} | ||
} |
84 changes: 84 additions & 0 deletions
84
client/src/test/java/io/spine/grpc/CompositeObserverTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright 2020, TeamDev. All rights reserved. | ||
* | ||
* Redistribution and use in source and/or binary forms, with or without | ||
* modification, must retain the above copyright notice and the following | ||
* disclaimer. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
|
||
package io.spine.grpc; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.testing.NullPointerTester; | ||
import io.grpc.stub.StreamObserver; | ||
import org.junit.jupiter.api.DisplayName; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import static com.google.common.truth.Truth.assertThat; | ||
|
||
@DisplayName("`CompositeObserver` should") | ||
class CompositeObserverTest { | ||
|
||
@Test | ||
@DisplayName("not accept `null` observer list on construction") | ||
void passNullToleranceCheck() { | ||
new NullPointerTester() | ||
.testAllPublicConstructors(CompositeObserver.class); | ||
} | ||
|
||
@Test | ||
@DisplayName("delegate `onNext` calls to wrapped observers") | ||
void delegateOnNext() { | ||
MemoizingObserver<String> observer1 = StreamObservers.memoizingObserver(); | ||
MemoizingObserver<String> observer2 = StreamObservers.memoizingObserver(); | ||
StreamObserver<String> observer = | ||
new CompositeObserver<>(ImmutableList.of(observer1, observer2)); | ||
|
||
String value = "String value."; | ||
observer.onNext(value); | ||
|
||
assertThat(observer1.firstResponse()).isEqualTo(value); | ||
assertThat(observer2.firstResponse()).isEqualTo(value); | ||
} | ||
|
||
@Test | ||
@DisplayName("delegate `onError` calls to wrapped observers") | ||
void delegateOnError() { | ||
MemoizingObserver<String> observer1 = StreamObservers.memoizingObserver(); | ||
MemoizingObserver<String> observer2 = StreamObservers.memoizingObserver(); | ||
StreamObserver<String> observer = | ||
new CompositeObserver<>(ImmutableList.of(observer1, observer2)); | ||
|
||
RuntimeException error = new RuntimeException("An error."); | ||
observer.onError(error); | ||
|
||
assertThat(observer1.getError()).isEqualTo(error); | ||
assertThat(observer2.getError()).isEqualTo(error); | ||
} | ||
|
||
@Test | ||
@DisplayName("delegate `onCompleted` calls to wrapped observers") | ||
void delegateOnCompleted() { | ||
MemoizingObserver<String> observer1 = StreamObservers.memoizingObserver(); | ||
MemoizingObserver<String> observer2 = StreamObservers.memoizingObserver(); | ||
StreamObserver<String> observer = | ||
new CompositeObserver<>(ImmutableList.of(observer1, observer2)); | ||
|
||
observer.onCompleted(); | ||
|
||
assertThat(observer1.isCompleted()).isTrue(); | ||
assertThat(observer2.isCompleted()).isTrue(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.