Skip to content

Commit

Permalink
fixed the issue of meta.source field inconsistently populated in subs…
Browse files Browse the repository at this point in the history
…cription messages for different requests (#4524)

* fix + test

* minor fix

* Addressing suggestion

* Minor changes
  • Loading branch information
Qingyixia committed Feb 9, 2023
1 parent 3d9a318 commit 53252b8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 4523
title: "Previously, meta source field in subscription messages was inconsistently populated regarding different requests.
Now, this has been fixed and meta source will be included in all subscription messages."
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.util.BundleBuilder;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.text.StringSubstitutor;
Expand Down Expand Up @@ -139,6 +141,14 @@ protected IBaseBundle createDeliveryBundleForPayloadSearchCriteria(CanonicalSubs
return builder.getBundle();
}

protected IBaseResource updateDeliveryResourceWithMetaSource(IBaseResource thePayloadResource) {
String resType = thePayloadResource.fhirType();
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resType);
IBaseResource resourceWithMetaSource = dao.read(thePayloadResource.getIdElement(), new SystemRequestDetails());

return resourceWithMetaSource;
}

@VisibleForTesting
public void setFhirContextForUnitTest(FhirContext theCtx) {
myFhirContext = theCtx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,8 +60,14 @@ public SubscriptionDeliveringMessageSubscriber(IChannelFactory theChannelFactory

protected void doDelivery(ResourceDeliveryMessage theSourceMessage, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theWrappedMessageToSend) {
String payloadId = theSourceMessage.getPayloadId();
IBaseResource payloadResource = null;
if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
payloadResource = createDeliveryBundleForPayloadSearchCriteria(theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
} else if (! theWrappedMessageToSend.getPayload().getPayloadString().contains(HapiExtensions.EXT_META_SOURCE)){
payloadResource = updateDeliveryResourceWithMetaSource(theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
}

if (payloadResource != null) {
ResourceModifiedJsonMessage newWrappedMessageToSend = convertDeliveryMessageToResourceModifiedMessage(theSourceMessage, payloadResource);
theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
payloadId = payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient;
Expand Down Expand Up @@ -59,6 +58,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -208,6 +208,7 @@ public void testMessageSubscriber_PermitsInterceptorsToModifyOutgoingEnvelope()
});
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
when(myDaoRegistry.getResourceDao(anyString())).thenReturn(myResourceDao);

CanonicalSubscription subscription = generateSubscription();
Patient patient = generatePatient();
Expand Down Expand Up @@ -345,6 +346,7 @@ public void testDeliveryMessageWithPartition() throws URISyntaxException {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY), any())).thenReturn(true);
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
when(myDaoRegistry.getResourceDao(anyString())).thenReturn(myResourceDao);

CanonicalSubscription subscription = generateSubscription();
Patient patient = generatePatient();
Expand Down Expand Up @@ -401,6 +403,40 @@ public void testRestHookDeliveryFails_raisedExceptionShouldNotIncludeSubmittedRe
}
}

@Test
public void testSubscriptionMessageContainsMetaSourceField() throws URISyntaxException {
//Given: we have a subscription message that contains a patient resource
Patient p1 = generatePatient();
p1.addName().setFamily("p1-family");

CanonicalSubscription subscription = generateSubscription();
subscription.setCriteriaString("[*]");

ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription);
payload.setPayload(myCtx, p1, EncodingEnum.JSON);
payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);

//When
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY), ArgumentMatchers.any(HookParams.class))).thenReturn(true);
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
when(myDaoRegistry.getResourceDao(anyString())).thenReturn(myResourceDao);

p1.getMeta().setSource("#example-source");
when(myResourceDao.read(any(), any())).thenReturn(p1);

//Then: meta.source field will be included in the message
myMessageSubscriber.handleMessage(payload);

ArgumentCaptor<ResourceModifiedJsonMessage> captor = ArgumentCaptor.forClass(ResourceModifiedJsonMessage.class);
verify(myChannelProducer).send(captor.capture());
List<ResourceModifiedJsonMessage> messages = captor.getAllValues();

ResourceModifiedMessage receivedMessage = messages.get(0).getPayload();
assertTrue(receivedMessage.getPayloadString().contains("\"source\":\"#example-source\""));
}

@Nonnull
private Patient generatePatient() {
Patient patient = new Patient();
Expand Down

0 comments on commit 53252b8

Please sign in to comment.