Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt backport oldest first #4776

Open
wants to merge 12 commits into
base: ja_2030417_backport_subscription_opt_to_6_4
Choose a base branch
from
Prev Previous commit
Next Next commit
Support _exportId for bulk exports. (#4781)
* Full implementation, test, changelogs

* Add changelogs

* Add default method

* Update hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/WriteBinaryStep.java

Co-authored-by: James Agnew <[email protected]>

* Code Review Comments

* Compilation failures

---------

Co-authored-by: James Agnew <[email protected]>
  • Loading branch information
tadgh and jamesagnew committed May 1, 2023
commit a06b6225e157164a217b2391c4498c883931d18a
11 changes: 11 additions & 0 deletions hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -1141,12 +1141,23 @@ public String toString() {
}

// TODO KHS add the other primitive types
@Deprecated(since = "6.6.0", forRemoval = true)
public IPrimitiveType<Boolean> getPrimitiveBoolean(Boolean theValue) {
return newPrimitiveBoolean(theValue);
}

public IPrimitiveType<Boolean> newPrimitiveBoolean(Boolean theValue) {
IPrimitiveType<Boolean> retval = (IPrimitiveType<Boolean>) getElementDefinition("boolean").newInstance();
retval.setValue(theValue);
return retval;
}

public IPrimitiveType<String > newPrimitiveString(String theValue) {
IPrimitiveType<String> retval = (IPrimitiveType<String>) getElementDefinition("string").newInstance();
retval.setValue(theValue);
return retval;
}

private static boolean tryToInitParser(Runnable run) {
boolean retVal;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,33 @@ public enum Pointcut implements IPointcut {
"ca.uhn.fhir.jpa.util.SqlQueryList"
),

/**
* <b> Binary Blob Prefix Assigning Hook:</b>
* <p>
* Immediately before a binary blob is stored to its eventual data sink, this hook is called.
* This hook allows implementers to provide a prefix to the binary blob's ID.
* This is helpful in cases where you want to identify this blob for later retrieval outside of HAPI-FHIR. Note that allowable characters will depend on the specific storage sink being used.
* <ul>
* <li>
* ca.uhn.fhir.rest.api.server.RequestDetails - A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the servlet request. Note that the bean
* properties are not all guaranteed to be populated.
* </li>
* <li>
* org.hl7.fhir.instance.model.api.IBaseBinary - The binary resource that is about to be stored.
* </li>
* </ul>
* <p>
* Hooks should return <code>String</code>, which represents the full prefix to be applied to the blob.
* </p>
*/
STORAGE_BINARY_ASSIGN_BLOB_ID_PREFIX(String.class,
"ca.uhn.fhir.rest.api.server.RequestDetails",
"org.hl7.fhir.instance.model.api.IBaseResource"
),


/**
* This pointcut is used only for unit tests. Do not use in production code as it may be changed or
* removed at any time.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: add
issue: 4774
title: "Bulk Export now supports a new `_exportId` parameter. If provided, any Binary resources generated by this export will have an extension in their `binary.meta` field which identifies this export. This can be used to correlate exported resources with the export job that generated them.
In addition, the `binary.meta` field of Bulk Export-generated binaries will also contain the job ID of the export job that generated them, as well as the resource type of the data contained within the binary."
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: add
issue: 4774
title: "A new Pointcut called `STORAGE_BINARY_ASSIGN_BLOB_ID_PREFIX` has been added. This pointcut is called when a binary blob is about to be stored,
and allows implementers to attach a prefix to the blob ID before it is stored."
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B

public static final String BASE_RESOURCE_NAME = "resource";
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirResourceDao.class);
@Autowired
protected IInterceptorBroadcaster myInterceptorBroadcaster;

@Autowired
protected PlatformTransactionManager myPlatformTransactionManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public class JpaConstants {
* Parameter for the $export operation
*/
public static final String PARAM_EXPORT_TYPE_FILTER = "_typeFilter";

/**
* Parameter for the $export operation to identify binaries with a given identifier.
*/
public static final String PARAM_EXPORT_IDENTIFIER = "_exportId";
/**
* Parameter for the $export operation
*/
Expand All @@ -206,6 +211,8 @@ public class JpaConstants {
*/
public static final String PARAM_EXPORT_PATIENT = "patient";



/**
* Parameter for the $import operation
*/
Expand Down Expand Up @@ -289,6 +296,9 @@ public class JpaConstants {
* IPS Generation operation URL
*/
public static final String SUMMARY_OPERATION_URL = "http:https://hl7.org/fhir/uv/ips/OperationDefinition/summary";
public static final String BULK_META_EXTENSION_EXPORT_IDENTIFIER = "https://hapifhir.org/NamingSystem/bulk-export-identifier";
public static final String BULK_META_EXTENSION_JOB_ID = "https://hapifhir.org/NamingSystem/bulk-export-job-id";
public static final String BULK_META_EXTENSION_RESOURCE_TYPE = "https://hapifhir.org/NamingSystem/bulk-export-binary-resource-type";

/**
* Non-instantiable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
Expand All @@ -30,6 +31,7 @@
import com.google.common.collect.Sets;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -40,6 +42,7 @@
import org.hl7.fhir.r4.model.Coverage;
import org.hl7.fhir.r4.model.Encounter;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType;
Expand Down Expand Up @@ -102,6 +105,49 @@ public class BulkExportUseCaseIT extends BaseResourceProviderR4Test {
@Nested
public class SpecConformanceIT {


@Test
public void testBulkExportJobsAreMetaTaggedWithJobIdAndExportId() throws IOException {
//Given a patient exists
Patient p = new Patient();
p.setId("Pat-1");
myClient.update().resource(p).execute();

//And Given we start a bulk export job with a specific export id
String pollingLocation = submitBulkExportForTypesWithExportId("im-an-export-identifier", "Patient");
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);

//Then: When the poll shows as complete, all attributes should be filled.
HttpGet statusGet = new HttpGet(pollingLocation);
String expectedOriginalUrl = myClient.getServerBase() + "/$export?_type=Patient&_exportId=im-an-export-identifier";
try (CloseableHttpResponse status = ourHttpClient.execute(statusGet)) {
assertEquals(200, status.getStatusLine().getStatusCode());
String responseContent = IOUtils.toString(status.getEntity().getContent(), StandardCharsets.UTF_8);
assertTrue(isNotBlank(responseContent), responseContent);

ourLog.info(responseContent);

BulkExportResponseJson result = JsonUtil.deserialize(responseContent, BulkExportResponseJson.class);
assertThat(result.getRequest(), is(equalTo(expectedOriginalUrl)));
assertThat(result.getOutput(), is(not(empty())));
String binary_url = result.getOutput().get(0).getUrl();
Binary binaryResource = myClient.read().resource(Binary.class).withUrl(binary_url).execute();

List<Extension> extension = binaryResource.getMeta().getExtension();
assertThat(extension, hasSize(3));

assertThat(extension.get(0).getUrl(), is(equalTo(JpaConstants.BULK_META_EXTENSION_EXPORT_IDENTIFIER)));
assertThat(extension.get(0).getValue().toString(), is(equalTo("im-an-export-identifier")));

assertThat(extension.get(1).getUrl(), is(equalTo(JpaConstants.BULK_META_EXTENSION_JOB_ID)));
assertThat(extension.get(1).getValue().toString(), is(equalTo(jobId)));

assertThat(extension.get(2).getUrl(), is(equalTo(JpaConstants.BULK_META_EXTENSION_RESOURCE_TYPE)));
assertThat(extension.get(2).getValue().toString(), is(equalTo("Patient")));
}
}

@Test
public void testBatchJobsAreOnlyReusedIfInProgress() throws IOException {
//Given a patient exists
Expand All @@ -110,7 +156,7 @@ public void testBatchJobsAreOnlyReusedIfInProgress() throws IOException {
myClient.update().resource(p).execute();

//And Given we start a bulk export job
String pollingLocation = submitBulkExportForTypes("Patient");
String pollingLocation = submitBulkExportForTypesWithExportId("my-export-id-","Patient");
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);

Expand Down Expand Up @@ -285,8 +331,16 @@ public void export_shouldNotExportBinaryResource_whenTypeParameterOmitted() thro
}

private String submitBulkExportForTypes(String... theTypes) throws IOException {
return submitBulkExportForTypesWithExportId(null, theTypes);
}
private String submitBulkExportForTypesWithExportId(String theExportId, String... theTypes) throws IOException {
String typeString = String.join(",", theTypes);
HttpGet httpGet = new HttpGet(myClient.getServerBase() + "/$export?_type=" + typeString);
String uri = myClient.getServerBase() + "/$export?_type=" + typeString;
if (!StringUtils.isBlank(theExportId)) {
uri += "&_exportId=" + theExportId;
}

HttpGet httpGet = new HttpGet(uri);
httpGet.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
String pollingLocation;
try (CloseableHttpResponse status = ourHttpClient.execute(httpGet)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package ca.uhn.fhir.jpa.provider.r4;

import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.binary.api.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binary.interceptor.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.rest.client.api.IClientInterceptor;
import ca.uhn.fhir.rest.client.api.IHttpRequest;
import ca.uhn.fhir.rest.client.api.IHttpResponse;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.DocumentReference;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -21,6 +30,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.stream.Collectors;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -70,6 +83,42 @@ public void after() throws Exception {
myInterceptorRegistry.unregisterInterceptor(myBinaryStorageInterceptor);
}

class BinaryFilePrefixingInterceptor{

@Hook(Pointcut.STORAGE_BINARY_ASSIGN_BLOB_ID_PREFIX)
public String provideFilenameForBinary(RequestDetails theRequestDetails, IBaseResource theResource) {
ourLog.info("Received binary for prefixing!" + theResource.getIdElement());
String extensionValus = ((IBaseHasExtensions) theResource.getMeta()).getExtension().stream().map(ext -> ext.getValue().toString()).collect(Collectors.joining("-"));
return "prefix-" + extensionValus + "-";
}
}
@Test
public void testCreatingExternalizedBinaryTriggersPointcut() {
BinaryFilePrefixingInterceptor interceptor = new BinaryFilePrefixingInterceptor();
myInterceptorRegistry.registerInterceptor(interceptor);
// Create a resource with two metadata extensions on the binary
Binary binary = new Binary();
binary.setContentType("application/octet-stream");
Extension ext = binary.getMeta().addExtension();
ext.setUrl("http:https://foo");
ext.setValue(new StringType("bar"));

Extension ext2 = binary.getMeta().addExtension();
ext2.setUrl("http:https://foo2");
ext2.setValue(new StringType("bar2"));

binary.setData(SOME_BYTES);
DaoMethodOutcome outcome = myBinaryDao.create(binary, mySrd);

// Make sure it was externalized
IIdType id = outcome.getId().toUnqualifiedVersionless();
String encoded = myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome.getResource());
ourLog.info("Encoded: {}", encoded);
assertThat(encoded, containsString(HapiExtensions.EXT_EXTERNALIZED_BINARY_ID));
assertThat(encoded, (containsString("prefix-bar-bar2-")));
myInterceptorRegistry.unregisterInterceptor(interceptor);
}

@Test
public void testCreateAndRetrieveBinary_ServerAssignedId_ExternalizedBinary() {

Expand All @@ -90,7 +139,6 @@ public void testCreateAndRetrieveBinary_ServerAssignedId_ExternalizedBinary() {
Binary output = myBinaryDao.read(id, mySrd);
assertEquals("application/octet-stream", output.getContentType());
assertArrayEquals(SOME_BYTES, output.getData());

}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package ca.uhn.fhir.jpa.test;

/*-
* #%L
* HAPI FHIR JPA Server Test Utilities
Expand All @@ -19,6 +17,7 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.test;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
// They don't seem to serve any distinct purpose so they should be collapsed into 1
public class BulkDataExportOptions {


public enum ExportStyle {
PATIENT,
GROUP,
Expand All @@ -49,6 +50,8 @@ public enum ExportStyle {
private IIdType myGroupId;
private Set<IIdType> myPatientIds;

private String myExportIdentifier;

public void setOutputFormat(String theOutputFormat) {
myOutputFormat = theOutputFormat;
}
Expand Down Expand Up @@ -132,4 +135,12 @@ public Set<IIdType> getPatientIds() {
public void setPatientIds(Set<IIdType> thePatientIds) {
myPatientIds = thePatientIds;
}

public String getExportIdentifier() {
return myExportIdentifier;
}

public void setExportIdentifier(String theExportIdentifier) {
myExportIdentifier = theExportIdentifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.binary.api.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import org.apache.commons.lang3.StringUtils;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class BulkExportJobParametersValidator implements IJobParametersValidator<BulkExportJobParameters> {

Expand All @@ -41,6 +46,9 @@ public class BulkExportJobParametersValidator implements IJobParametersValidator
@Autowired
private DaoRegistry myDaoRegistry;

@Autowired
private IBinaryStorageSvc myBinaryStorageSvc;

@Nullable
@Override
public List<String> validate(@Nonnull BulkExportJobParameters theParameters) {
Expand All @@ -62,6 +70,13 @@ public List<String> validate(@Nonnull BulkExportJobParameters theParameters) {
if (!Constants.CT_FHIR_NDJSON.equalsIgnoreCase(theParameters.getOutputFormat())) {
errorMsgs.add("The only allowed format for Bulk Export is currently " + Constants.CT_FHIR_NDJSON);
}
// validate the exportId
if (!StringUtils.isBlank(theParameters.getExportIdentifier())) {

if (!myBinaryStorageSvc.isValidBlobId(theParameters.getExportIdentifier())) {
errorMsgs.add("Export ID does not conform to the current blob storage implementation's limitations.");
}
}

// validate for group
BulkDataExportOptions.ExportStyle style = theParameters.getExportStyle();
Expand All @@ -84,4 +99,5 @@ public List<String> validate(@Nonnull BulkExportJobParameters theParameters) {

return errorMsgs;
}

}
Loading