Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription sending delete events being skipped #3888

Merged
merged 2 commits into from
Aug 8, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor
  • Loading branch information
tadgh committed Aug 8, 2022
commit c97054a968e9dae45b491a13f96d6940a264c74b
Original file line number Diff line number Diff line change
Expand Up @@ -122,98 +122,107 @@ private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg
Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();

ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
boolean resourceMatched = false;
boolean anySubscriptionsMatchedResource = false;

for (ActiveSubscription nextActiveSubscription : subscriptions) {
// skip if the partitions don't match
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
continue;
}
String nextSubscriptionId = getId(nextActiveSubscription);

if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
continue;
}
}
anySubscriptionsMatchedResource |= processSubscription(theMsg, resourceId, nextActiveSubscription);
}

if (!resourceTypeIsAppropriateForSubscription(nextActiveSubscription, resourceId)) {
continue;
}
if (!anySubscriptionsMatchedResource) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
}
}

if (theMsg.getOperationType().equals(DELETE)) {
if (!nextActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
continue;
}
}
/**
* Returns true if subscription matched, and processing completed successfully, and the message was sent to the delivery channel. False otherwise.
*
*/
private boolean processSubscription(ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) {
// skip if the partitions don't match
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
if (subscription != null && theMsg.getPartitionId() != null &&
theMsg.getPartitionId().hasPartitionIds() && !subscription.getCrossPartitionEnabled() &&
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
return false;
}
String nextSubscriptionId = getId(theActiveSubscription);

InMemoryMatchResult matchResult;
if (nextActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
continue;
}
ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
return false;
}
}

if (!resourceTypeIsAppropriateForSubscription(theActiveSubscription, theResourceId)) {
return false;
}

IBaseResource payload = theMsg.getNewPayload(myFhirContext);
if (theMsg.getOperationType().equals(DELETE)) {
if (!theActiveSubscription.getSubscription().getSendDeleteMessages()) {
ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
return false;
}
}

EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
InMemoryMatchResult matchResult;
if (theActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(theActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
ourLog.trace("Subscription {} was not matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
return false;
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ourLog.debug("Subscription {} was matched by resource {} {}",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
ourLog.trace("Subscription {} was not matched by resource {} - No search expression",
theActiveSubscription.getId(),
theResourceId.toUnqualifiedVersionless().getValue());
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
}

ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
IBaseResource payload = theMsg.getNewPayload(myFhirContext);

if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);

// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, nextActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
return;
}
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());

resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg);
if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);

if (!resourceMatched) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, theMsg);
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", nextSubscriptionId);
return false;
}

return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
}

private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
Expand Down