Skip to content

Commit

Permalink
Subscription sending delete events being skipped (#3888)
Browse files Browse the repository at this point in the history
* fixed bug and added test

* refactor
  • Loading branch information
tadgh committed Aug 8, 2022
1 parent e95a929 commit 686c606
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: fix
issue: 3887
jira: SMILE-3975
title: "Removed a bug with subscriptions that supported sending deletes. Previously, if there was a previously-registered subscription which did not support deletes, that would shortcircuit processing and cause subsequent
subscriptions not to fire. This has been corrected."
Original file line number Diff line number Diff line change
Expand Up @@ -122,98 +122,107 @@ private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();

ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
boolean resourceMatched = false;
boolean anySubscriptionsMatchedResource = false;

for (ActiveSubscription nextActiveSubscription : subscriptions) {
// skip if the partitions don't match
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
continue;
}
String nextSubscriptionId = getId(nextActiveSubscription);

if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
continue;
}
}
anySubscriptionsMatchedResource |= processSubscription(theMsg, resourceId, nextActiveSubscription);
}

if (!resourceTypeIsAppropriateForSubscription(nextActiveSubscription, resourceId)) {
continue;
}
if (!anySubscriptionsMatchedResource) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
}
}

if (theMsg.getOperationType().equals(DELETE)) {
if (!nextActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return;
}
}
/**
* Returns true if subscription matched, and processing completed successfully, and the message was sent to the delivery channel. False otherwise.
*
*/
private boolean processSubscription(ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) {
// skip if the partitions don't match
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
return false;
}
String nextSubscriptionId = getId(theActiveSubscription);

InMemoryMatchResult matchResult;
if (nextActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
continue;
}
ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
return false;
}
}

if (!resourceTypeIsAppropriateForSubscription(theActiveSubscription, theResourceId)) {
return false;
}

IBaseResource payload = theMsg.getNewPayload(myFhirContext);
if (theMsg.getOperationType().equals(DELETE)) {
if (!theActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return false;
}
}

EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
InMemoryMatchResult matchResult;
if (theActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(theActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
return false;
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ourLog.debug("Subscription {} was matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
}

ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
IBaseResource payload = theMsg.getNewPayload(myFhirContext);

if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);

// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, nextActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
return;
}
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());

resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg);
if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);

if (!resourceMatched) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", nextSubscriptionId);
return false;
}

return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
}

private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -402,9 +403,13 @@ public class TestDeleteMessages {
SubscriptionRegistry mySubscriptionRegistry;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ActiveSubscription myActiveSubscription;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ActiveSubscription myNonDeleteSubscription;
@Mock
CanonicalSubscription myCanonicalSubscription;
@Mock
CanonicalSubscription myNonDeleteCanonicalSubscription;
@Mock
SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;

@Test
Expand Down Expand Up @@ -445,6 +450,31 @@ public void matchActiveSubscriptionsChecksSendDeleteMessagesExtensionFlag() {
verify(myCanonicalSubscription, atLeastOnce()).getSendDeleteMessages();
}

@Test
public void testMultipleSubscriptionsDoNotEarlyReturn() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);

when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
when(message.getPayloadId(null)).thenReturn(new IdDt("Patient", 123L));
when(myNonDeleteCanonicalSubscription.getSendDeleteMessages()).thenReturn(false);
when(mySubscriptionRegistry.getAll()).thenReturn(List.of(myNonDeleteSubscription ,myActiveSubscription));
when(myActiveSubscription.getSubscription()).thenReturn(myCanonicalSubscription);
when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myActiveSubscription.getId()).thenReturn("Patient/123");
when(myNonDeleteSubscription.getSubscription()).thenReturn(myNonDeleteCanonicalSubscription);
when(myNonDeleteSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myNonDeleteSubscription.getId()).thenReturn("Patient/123");
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);

subscriber.matchActiveSubscriptionsAndDeliver(message);

verify(myNonDeleteCanonicalSubscription, times(1)).getSendDeleteMessages();
verify(myCanonicalSubscription, times(1)).getSendDeleteMessages();
}

@Test
public void matchActiveSubscriptionsAndDeliverSetsPartitionId() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
Expand Down

0 comments on commit 686c606

Please sign in to comment.