Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription sending delete events being skipped #3888

Merged
merged 2 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: fix
issue: 3887
jira: SMILE-3975
title: "Removed a bug with subscriptions that supported sending deletes. Previously, if there was a previously-registered subscription which did not support deletes, that would shortcircuit processing and cause subsequent
subscriptions not to fire. This has been corrected."
Original file line number Diff line number Diff line change
Expand Up @@ -122,98 +122,107 @@ private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();

ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
boolean resourceMatched = false;
boolean anySubscriptionsMatchedResource = false;

for (ActiveSubscription nextActiveSubscription : subscriptions) {
// skip if the partitions don't match
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
continue;
}
String nextSubscriptionId = getId(nextActiveSubscription);

if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
continue;
}
}
anySubscriptionsMatchedResource |= processSubscription(theMsg, resourceId, nextActiveSubscription);
}

if (!resourceTypeIsAppropriateForSubscription(nextActiveSubscription, resourceId)) {
continue;
}
if (!anySubscriptionsMatchedResource) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
}
}

if (theMsg.getOperationType().equals(DELETE)) {
if (!nextActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return;
}
}
/**
* Returns true if subscription matched, and processing completed successfully, and the message was sent to the delivery channel. False otherwise.
*
*/
private boolean processSubscription(ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) {
// skip if the partitions don't match
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
return false;
}
String nextSubscriptionId = getId(theActiveSubscription);

InMemoryMatchResult matchResult;
if (nextActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
continue;
}
ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
return false;
}
}

if (!resourceTypeIsAppropriateForSubscription(theActiveSubscription, theResourceId)) {
return false;
}

IBaseResource payload = theMsg.getNewPayload(myFhirContext);
if (theMsg.getOperationType().equals(DELETE)) {
if (!theActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return false;
}
}

EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
InMemoryMatchResult matchResult;
if (theActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(theActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
return false;
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ourLog.debug("Subscription {} was matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
}

ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
IBaseResource payload = theMsg.getNewPayload(myFhirContext);

if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);

// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, nextActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
return;
}
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());

resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg);
if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);

if (!resourceMatched) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", nextSubscriptionId);
return false;
}

return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
}

private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -402,9 +403,13 @@ public class TestDeleteMessages {
SubscriptionRegistry mySubscriptionRegistry;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ActiveSubscription myActiveSubscription;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ActiveSubscription myNonDeleteSubscription;
@Mock
CanonicalSubscription myCanonicalSubscription;
@Mock
CanonicalSubscription myNonDeleteCanonicalSubscription;
@Mock
SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;

@Test
Expand Down Expand Up @@ -445,6 +450,31 @@ public void matchActiveSubscriptionsChecksSendDeleteMessagesExtensionFlag() {
verify(myCanonicalSubscription, atLeastOnce()).getSendDeleteMessages();
}

@Test
public void testMultipleSubscriptionsDoNotEarlyReturn() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);

when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
when(message.getPayloadId(null)).thenReturn(new IdDt("Patient", 123L));
when(myNonDeleteCanonicalSubscription.getSendDeleteMessages()).thenReturn(false);
when(mySubscriptionRegistry.getAll()).thenReturn(List.of(myNonDeleteSubscription ,myActiveSubscription));
when(myActiveSubscription.getSubscription()).thenReturn(myCanonicalSubscription);
when(myActiveSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myActiveSubscription.getId()).thenReturn("Patient/123");
when(myNonDeleteSubscription.getSubscription()).thenReturn(myNonDeleteCanonicalSubscription);
when(myNonDeleteSubscription.getCriteria()).thenReturn(mySubscriptionCriteria);
when(myNonDeleteSubscription.getId()).thenReturn("Patient/123");
when(mySubscriptionCriteria.getType()).thenReturn(STARTYPE_EXPRESSION);

subscriber.matchActiveSubscriptionsAndDeliver(message);

verify(myNonDeleteCanonicalSubscription, times(1)).getSendDeleteMessages();
verify(myCanonicalSubscription, times(1)).getSendDeleteMessages();
}

@Test
public void matchActiveSubscriptionsAndDeliverSetsPartitionId() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
Expand Down