Skip to content

Commit

Permalink
Re-throw errors in CommandAckMonitor and AckRejectionPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrykuzmin committed Aug 28, 2020
1 parent 16baa51 commit c7a3574
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.spine.core.Status;
import io.spine.server.event.EventBus;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.core.Status.StatusCase.REJECTION;

/**
Expand All @@ -48,6 +49,7 @@ final class AckRejectionPublisher implements StreamObserver<Ack> {

@Override
public void onNext(Ack value) {
checkNotNull(value);
Status status = value.getStatus();
if (status.getStatusCase() == REJECTION) {
eventBus.post(status.getRejection());
Expand All @@ -56,7 +58,7 @@ public void onNext(Ack value) {

@Override
public void onError(Throwable t) {
// NO-OP.
throw new IllegalStateException(t);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void onNext(Ack value) {

@Override
public void onError(Throwable t) {
// NO-OP.
throw new IllegalStateException(t);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2020, TeamDev. All rights reserved.
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.server.commandbus;

import io.spine.core.Ack;
import io.spine.core.Command;
import io.spine.core.CommandId;
import io.spine.server.BoundedContext;
import io.spine.server.BoundedContextBuilder;
import io.spine.server.bus.Acks;
import io.spine.server.bus.Listener;
import io.spine.server.event.EventBus;
import io.spine.server.event.RejectionEnvelope;
import io.spine.server.type.CommandEnvelope;
import io.spine.server.type.EventEnvelope;
import io.spine.test.commandbus.CmdBusCaffetteriaId;
import io.spine.test.commandbus.command.CmdBusEntryDenied;
import io.spine.testing.client.TestActorRequestFactory;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

@DisplayName("`AckRejectionPublisher` should")
class AckRejectionPublisherTest {

private MemoizingListener listener;
private AckRejectionPublisher publisher;

@BeforeEach
void createPublisher() {
listener = new MemoizingListener();
BoundedContext context = BoundedContextBuilder
.assumingTests()
.addEventListener(listener)
.build();
EventBus eventBus = context.eventBus();
publisher = new AckRejectionPublisher(eventBus);
}

@Test
@DisplayName("publish the passed rejection to event bus")
void publishRejection() {
TestActorRequestFactory requestFactory =
new TestActorRequestFactory(AckRejectionPublisherTest.class);
Command command = requestFactory.generateCommand();
CommandEnvelope origin = CommandEnvelope.of(command);

CmdBusEntryDenied throwableMessage = CmdBusEntryDenied
.newBuilder()
.setId(CmdBusCaffetteriaId.generate())
.setVisitorCount(15)
.setReason("Test command bus rejection.")
.build();
RejectionEnvelope rejection = RejectionEnvelope.from(origin, throwableMessage);
Ack ack = Acks.reject(CommandId.generate(), rejection);
publisher.onNext(ack);
assertThat(listener.lastReceived).isNotNull();
assertThat(listener.lastReceived.message()).isEqualTo(throwableMessage.messageThrown());
}

@Test
@DisplayName("ignore non-rejection `Ack`s")
void ignoreNonRejection() {
Ack ack = Acks.acknowledge(CommandId.generate());
publisher.onNext(ack);
assertThat(listener.lastReceived).isNull();
}

@Test
@DisplayName("re-throw an error passed to `onError` as `IllegalStateException`")
void rethrowError() {
RuntimeException exception = new RuntimeException("Test Ack publisher exception.");
IllegalStateException thrown = assertThrows(IllegalStateException.class,
() -> publisher.onError(exception));
assertThat(thrown.getCause()).isEqualTo(exception);
}

private static class MemoizingListener implements Listener<EventEnvelope> {

private @Nullable EventEnvelope lastReceived;

@Override
public void accept(EventEnvelope envelope) {
lastReceived = envelope;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.testing.NullPointerTester;
import com.google.common.truth.Truth;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import io.spine.base.CommandMessage;
Expand Down Expand Up @@ -187,6 +188,22 @@ void onRejection() {
}
}

@Test
@DisplayName("re-throw errors passed to `onError` as `IllegalStateException`")
void rethrowOnError() {
MemoizingWriteSide writeSide = MemoizingWriteSide.singleTenant();
CommandAckMonitor monitor = CommandAckMonitor
.newBuilder()
.setSystemWriteSide(writeSide)
.setTenantId(TenantId.getDefaultInstance())
.setPostedCommands(ImmutableSet.of(mockCommand))
.build();
RuntimeException error = new RuntimeException("The command Ack monitor test error.");
IllegalStateException exception = assertThrows(IllegalStateException.class,
() -> monitor.onError(error));
Truth.assertThat(exception).hasCauseThat().isEqualTo(error);
}

private static Ack okAck(CommandId commandId) {
return Acks.acknowledge(commandId);
}
Expand Down

0 comments on commit c7a3574

Please sign in to comment.