Skip to content

Commit

Permalink
Merge pull request apache#11151 from [BEAM-9468] Hl7v2 io
Browse files Browse the repository at this point in the history
* add HL7v2IO and FhirIO, test dependency issue

* add to changes

* minor updates

* staging before splitting PRs

* remove FhirIO

* simplify HL7v2IO structure

* add write result

* refactor to HL7v2IO.Read.Result

* add Coders and HL7v2IOTest

* spotless

* encode more of the Message

* fix read result getPipeline

* add adaptive throttler to write

* remove unused import

* improve test readability

* nvm spotless apply didn't like that

* scaffolding of Read integration test

scaffolding of Read integration test

* scaffolding for Write IT

* small integration test

* clean up

* fix integration tests

* add labels to Message coder

* add parsed data coder

* Refactor to use HL7v2Message wrapper class

* Expose schematized data as JSON string to support motivating use case of
HL7v2 -> FHIR via HCLS DataHamonization Mapping Engine.
* The healthcare model Message class has fields
(namely ParsedData and PatientIds) that are difficult to deal with in the Coder
and provide little value for users.

* set scope and user agent

* add schematized data to coder

* refactor list fn to avoid double get, improve tests, extract json from schematized data

* add IT for new ListHL7v2Messages PTransform

* migration to latest beta client library

* beta api migration

* add RunWith

* remove adaptive throttling

* revert AdaptiveThrottler to package private

* fix javadoc

* add HealthcareIOErrorToTableRow convenience transform

* spotless

* Add convenience function for writing deadletter to bigquery

* Address review feedback

* Fix FetchHL7v2Message javadoc
* Add observedTime to HealthcareIOErrorCoder

* Improve API surface for users

* ephemeral HL7v2 stores in integration tests

* prepare integration tests for apache-beam-testing

* fix docstring typos

* update google_clients_version to 1.30.9

* fix merge issue w/ duplicate google_oauth_clients_version

* Use Jackson

* spotlessApply

* add TODO to add support for batch API in the future once available

* add reshuffle to increase downstream paralelism

* add setCoder before reshuffle
  • Loading branch information
Jacob Ferriero committed Apr 16, 2020
1 parent 1d7f640 commit 0b025e3
Show file tree
Hide file tree
Showing 19 changed files with 2,528 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss

* Java SDK: Adds support for Thrift encoded data via ThriftIO. ([BEAM-8561](https://issues.apache.org/jira/browse/BEAM-8561))
* Java SDK: KafkaIO supports schema resolution using Confluent Schema Registry. ([BEAM-7310](https://issues.apache.org/jira/browse/BEAM-7310))
* Java SDK: Add Google Cloud Healthcare IO connectors: HL7v2IO and FhirIO ([BEAM-9468](https://issues.apache.org/jira/browse/BEAM-9468))
* Python SDK: Support for Google Cloud Spanner. This is an experimental module for reading and writing data from Google Cloud Spanner ([BEAM-7246](https://issues.apache.org/jira/browse/BEAM-7246)).
* Python SDK: Adds support for standard HDFS URLs (with server name). ([#10223](https://github.com/apache/beam/pull/10223)).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,9 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common:1.8.1",
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20191211-$google_clients_version",
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200313-$google_clients_version",
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200210-$google_clients_version",
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200311-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200305-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1beta1-rev20200307-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20200312-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20200226-$google_clients_version",
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials:$google_auth_version",
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
compile library.java.gax_grpc
compile library.java.google_api_client
compile library.java.google_api_services_bigquery
compile library.java.google_api_services_healthcare
compile library.java.google_api_services_pubsub
compile library.java.google_auth_library_credentials
compile library.java.google_auth_library_oauth2_http
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.healthcare;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.healthcare.v1beta1.model.Message;
import java.util.Map;
import javax.annotation.Nullable;

/** The type HL7v2 message to wrap the {@link Message} model. */
public class HL7v2Message {
private static final String schematizedDataKey = "schematizedData";
private static final String schematizedDataPrefix = "{data=";
private final String name;
private final String messageType;
private final String sendTime;
private final String createTime;
private final String data;
private final String sendFacility;
private String schematizedData;
private final Map<String, String> labels;

private static String extractDataJson(String schematizedData) {
String jsonData;
final ObjectMapper mapper = new ObjectMapper();
if (schematizedData != null
&& schematizedData.startsWith(schematizedDataPrefix)
&& schematizedData.endsWith("}}")) {
jsonData =
schematizedData.substring(schematizedDataPrefix.length(), schematizedData.length() - 1);
try {
mapper.readTree(jsonData);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
String.format("Could not validate inner schematizedData JSON: %s", e.getMessage()));
}
return jsonData;
} else {
throw new IllegalArgumentException(
"expected schematized data string to be of the format '{data=<actual_valid_json>}'");
}
}

/**
* From model {@link Message} to hl7v2 message.
*
* @param msg the model msg
* @return the hl7v2 message
*/
public static HL7v2Message fromModel(Message msg) {
final String schematizedData;
if (msg.get(schematizedDataKey) != null) {
schematizedData = extractDataJson(msg.get(schematizedDataKey).toString());
} else {
schematizedData = null;
}
return new HL7v2Message(
msg.getName(),
msg.getMessageType(),
msg.getSendTime(),
msg.getCreateTime(),
msg.getData(),
msg.getSendFacility(),
schematizedData,
msg.getLabels());
}

public static HL7v2Message fromMap(Map msg) {
final String schematizedData;
if (msg.get(schematizedDataKey) != null) {
schematizedData = extractDataJson(msg.get(schematizedDataKey).toString());
} else {
schematizedData = null;
}

return new HL7v2Message(
msg.get("name").toString(),
msg.get("messageType").toString(),
null,
msg.get("createTime").toString(),
msg.get("data").toString(),
msg.get("sendFacility").toString(),
schematizedData,
null);
}
/**
* To model message.
*
* @return the message
*/
public Message toModel() {
Message out = new Message();
out.setName(this.getName());
out.setMessageType(this.getMessageType());
out.setSendTime(this.getSendTime());
out.setCreateTime(this.getCreateTime());
out.setData(this.getData());
out.setSendFacility(this.getSendFacility());
out.set(schematizedDataKey, this.getSchematizedData());
out.setLabels(this.labels);
return out;
}

private HL7v2Message(
String name,
String messageType,
String sendTime,
String createTime,
String data,
String sendFacility,
@Nullable String schematizedData,
@Nullable Map<String, String> labels) {
this.name = name;
this.messageType = messageType;
this.sendTime = sendTime;
this.createTime = createTime;
this.data = data;
this.sendFacility = sendFacility;
this.schematizedData = schematizedData;
this.labels = labels;
}

/**
* Gets name.
*
* @return the name
*/
public String getName() {
return name;
}

/**
* Gets message type.
*
* @return the message type
*/
public String getMessageType() {
return messageType;
}

/**
* Gets send time.
*
* @return the send time
*/
public String getSendTime() {
return sendTime;
}

/**
* Gets create time.
*
* @return the create time
*/
public String getCreateTime() {
return createTime;
}

/**
* Gets data.
*
* @return the data
*/
public String getData() {
return data;
}

/**
* Gets send facility.
*
* @return the send facility
*/
public String getSendFacility() {
return sendFacility;
}

/**
* Gets schematized data.
*
* @return the schematized data
*/
public String getSchematizedData() {
return schematizedData;
}

public void setSchematizedData(String schematizedData) {
this.schematizedData = schematizedData;
}
/**
* Gets labels.
*
* @return the labels
*/
public Map<String, String> getLabels() {
return labels;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.healthcare;

import com.google.api.services.healthcare.v1beta1.model.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;

public class HL7v2MessageCoder extends CustomCoder<HL7v2Message> {
HL7v2MessageCoder() {}

private static final NullableCoder<String> STRING_CODER = NullableCoder.of(StringUtf8Coder.of());
private static final NullableCoder<Map<String, String>> MAP_CODER =
NullableCoder.of(MapCoder.of(STRING_CODER, STRING_CODER));

@Override
public void encode(HL7v2Message value, OutputStream outStream)
throws CoderException, IOException {
STRING_CODER.encode(value.getName(), outStream);
STRING_CODER.encode(value.getMessageType(), outStream);
STRING_CODER.encode(value.getCreateTime(), outStream);
STRING_CODER.encode(value.getSendTime(), outStream);
STRING_CODER.encode(value.getData(), outStream);
STRING_CODER.encode(value.getSendFacility(), outStream);
MAP_CODER.encode(value.getLabels(), outStream);
STRING_CODER.encode(value.getSchematizedData(), outStream);
}

@Override
public HL7v2Message decode(InputStream inStream) throws CoderException, IOException {
Message msg = new Message();
msg.setName(STRING_CODER.decode(inStream));
msg.setMessageType(STRING_CODER.decode(inStream));
msg.setCreateTime(STRING_CODER.decode(inStream));
msg.setSendTime(STRING_CODER.decode(inStream));
msg.setData(STRING_CODER.decode(inStream));
msg.setSendFacility(STRING_CODER.decode(inStream));
msg.setLabels(MAP_CODER.decode(inStream));
HL7v2Message out = HL7v2Message.fromModel(msg);
out.setSchematizedData(STRING_CODER.decode(inStream));
return out;
}
}
Loading

0 comments on commit 0b025e3

Please sign in to comment.