Skip to content

Commit

Permalink
[1205] FhirEngine Sync Download API Re-design (google#1206)
Browse files Browse the repository at this point in the history
* FhirEngine Sync Download API Redesign

See: https://docs.google.com/document/d/1UZ7Ndb8JRmB2UdXpfIkPT-VAra6X2aLSzdGI_a8KJkg/edit#

TESTED=all unit tests passed. Added one more unit test as well

Co-authored-by: omarismail <[email protected]>
  • Loading branch information
omarismail94 and omarismail94 committed Mar 15, 2022
1 parent 62f5674 commit 6dae038
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.demo.data

import com.google.android.fhir.sync.DownloadManager
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.ListResource
import org.hl7.fhir.r4.model.Reference
import org.hl7.fhir.r4.model.Resource

class DownloadManagerImpl : DownloadManager {
override fun getInitialUrl(): String {
return "Patient?address-city=NAIROBI"
}

override fun createDownloadUrl(preProcessUrl: String, lastUpdate: String?): String {
var downloadUrl = preProcessUrl

// Affix lastUpdate to a $everything query using _since as per:
// https://hl7.org/fhir/operation-patient-everything.html
if (lastUpdate != null && downloadUrl.contains("\$everything")) {
downloadUrl = "$downloadUrl?_since=$lastUpdate"
}

// Affix lastUpdate to non-$everything queries as per:
// https://hl7.org/fhir/operation-patient-everything.html
if (lastUpdate != null && !downloadUrl.contains("\$everything")) {
downloadUrl = "$downloadUrl&_lastUpdated=gt$lastUpdate"
}

// Do not modify any URL set by a server that specifies the token of the page to return.
if (downloadUrl.contains("&page_token")) {
downloadUrl = preProcessUrl
}

return downloadUrl
}

override fun extractResourcesFromResponse(resourceResponse: Resource): Collection<Resource> {
var bundleCollection: Collection<Resource> = mutableListOf()

if (resourceResponse is Bundle && resourceResponse.type == Bundle.BundleType.SEARCHSET) {
bundleCollection = resourceResponse.entry.map { it.resource }
}
return bundleCollection
}

override fun extractNextUrlsFromResource(resourceResponse: Resource): Collection<String> {
val queueWork = mutableListOf<String>()

// If the resource returned is a List, extract Patient references and fetch all resources
// related to the patient using the $everything operation.
if (resourceResponse is ListResource) {
for (entry in resourceResponse.entry) {
val reference = Reference(entry.item.reference)
if (reference.referenceElement.resourceType.equals("Patient")) {
val patientUrl = "${entry.item.reference}/\$everything"
queueWork.add(patientUrl)
}
}
}

// If the resource returned is a Bundle, check to see if there is a "next" relation referenced
// in the Bundle.link component, if so, append the URL referenced to list of URLs to download.
if (resourceResponse is Bundle) {
val nextUrl =
resourceResponse.link.firstOrNull { component -> component.relation == "next" }?.url
if (nextUrl != null) {
queueWork.add(nextUrl)
}
}
return queueWork
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import ca.uhn.fhir.context.FhirContext
import ca.uhn.fhir.context.FhirVersionEnum
import com.google.android.fhir.demo.FhirApplication
import com.google.android.fhir.demo.api.HapiFhirService
import com.google.android.fhir.sync.DownloadManager
import com.google.android.fhir.sync.FhirSyncWorker
import org.hl7.fhir.r4.model.ResourceType

class FhirPeriodicSyncWorker(appContext: Context, workerParams: WorkerParameters) :
FhirSyncWorker(appContext, workerParams) {

override fun getSyncData() = mapOf(ResourceType.Patient to mapOf("address-city" to "NAIROBI"))

override fun getDataSource() =
HapiFhirResourceDataSource(
HapiFhirService.create(FhirContext.forCached(FhirVersionEnum.R4).newJsonParser())
)

override fun getDownloadManager(): DownloadManager {
return DownloadManagerImpl()
}

override fun getFhirEngine() = FhirApplication.fhirEngine(applicationContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ object SyncDataParams {
const val SORT_KEY = "_sort"
const val LAST_UPDATED_KEY = "_lastUpdated"
const val ADDRESS_COUNTRY_KEY = "address-country"
const val LAST_UPDATED_ASC_VALUE = "_lastUpdated"
}

/** Configuration for synchronization. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync

import org.hl7.fhir.r4.model.Resource

/** Module for downloading changes from a [DataSource]. */
interface DownloadManager {

/**
* Implement this method to set the initial resource to fetch. Can be a search query, such as
* "Patient?address-city=NAIROBI or a resource Id, such as "List/123"
*/
fun getInitialUrl(): String

/**
* Implement this method to affix the last update timestamp to the URL.
* @preProcessUrl is the entire URL
* @param lastUpdate retrieved from implementation of
* [com.google.android.fhir.SyncDownloadContext.getLatestTimestampFor]
*/
fun createDownloadUrl(preProcessUrl: String, lastUpdate: String?): String

/**
* Implement this method to take the FHIR resource returned by the server, and return the
* resources that need to be saved into the local database.
*/
fun extractResourcesFromResponse(resourceResponse: Resource): Collection<Resource>

/**
* Implement this method to parse the FHIR resource returned by the server to extract the next
* URLs to download.
*/
fun extractNextUrlsFromResource(resourceResponse: Resource): Collection<String>
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
CoroutineWorker(appContext, workerParams) {
abstract fun getFhirEngine(): FhirEngine
abstract fun getDataSource(): DataSource
abstract fun getSyncData(): ResourceSyncParams
abstract fun getDownloadManager(): DownloadManager

private val gson =
GsonBuilder()
.registerTypeAdapter(OffsetDateTime::class.java, OffsetDateTimeTypeAdapter().nullSafe())
.setExclusionStrategies(StateExclusionStrategy())
.create()
private var fhirSynchronizer: FhirSynchronizer =
FhirSynchronizer(appContext, getFhirEngine(), getDataSource(), getSyncData())
FhirSynchronizer(appContext, getFhirEngine(), getDataSource(), getDownloadManager())

override suspend fun doWork(): Result {
val flow = MutableSharedFlow<State>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import com.google.android.fhir.FhirEngine
import com.google.android.fhir.sync.bundle.BundleUploader
import com.google.android.fhir.sync.bundle.TransactionBundleGenerator
import java.time.OffsetDateTime
import java.util.LinkedList
import java.util.Queue
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

sealed class Result {
Expand All @@ -52,12 +54,13 @@ internal class FhirSynchronizer(
context: Context,
private val fhirEngine: FhirEngine,
private val dataSource: DataSource,
private val resourceSyncParams: ResourceSyncParams,
private val downloadManager: DownloadManager,
private val uploader: Uploader =
BundleUploader(dataSource, TransactionBundleGenerator.getDefault())
) {
private var flow: MutableSharedFlow<State>? = null
private val datastoreUtil = DatastoreUtil(context)
private val resourceTypeList = ResourceType.values().map { it.name }

private fun isSubscribed(): Boolean {
return flow != null
Expand Down Expand Up @@ -102,14 +105,34 @@ internal class FhirSynchronizer(
}

private suspend fun download(): Result {
var resourceTypeToDownload: ResourceType = ResourceType.Bundle
val exceptions = mutableListOf<ResourceSyncException>()
resourceSyncParams.forEach {
emit(State.InProgress(it.key))
emit(State.InProgress(resourceTypeToDownload))

try {
downloadResourceType(it.key, it.value)
} catch (exception: Exception) {
exceptions.add(ResourceSyncException(it.key, exception))
fhirEngine.syncDownload {
flow {
val listOfUrls: Queue<String> = LinkedList()
listOfUrls.add(downloadManager.getInitialUrl())
while (listOfUrls.isNotEmpty()) {
try {
val preprocessedUrl = listOfUrls.remove()
resourceTypeToDownload =
ResourceType.fromCode(
preprocessedUrl.findAnyOf(resourceTypeList, ignoreCase = true)!!.second
)

val lastUpdate = it.getLatestTimestampFor(resourceTypeToDownload)
val downloadUrl = downloadManager.createDownloadUrl(preprocessedUrl, lastUpdate)
val resourceReturned: Resource = dataSource.loadData(downloadUrl)
val resourceCollection = downloadManager.extractResourcesFromResponse(resourceReturned)
val additionalUrls = downloadManager.extractNextUrlsFromResource(resourceReturned)

emit(resourceCollection.toList())
listOfUrls.addAll(additionalUrls)
} catch (exception: Exception) {
exceptions.add(ResourceSyncException(resourceTypeToDownload, exception))
}
}
}
}
return if (exceptions.isEmpty()) {
Expand All @@ -121,36 +144,6 @@ internal class FhirSynchronizer(
}
}

private suspend fun downloadResourceType(resourceType: ResourceType, params: ParamMap) {
fhirEngine.syncDownload {
flow {
var nextUrl = getInitialUrl(resourceType, params, it.getLatestTimestampFor(resourceType))
while (nextUrl != null) {
val bundle = dataSource.loadData(nextUrl)
nextUrl = bundle.link.firstOrNull { component -> component.relation == "next" }?.url
if (bundle.type == Bundle.BundleType.SEARCHSET) {
emit(bundle.entry.map { it.resource })
}
}
}
}
}

private fun getInitialUrl(
resourceType: ResourceType,
params: ParamMap,
lastUpdate: String?
): String? {
val newParams = params.toMutableMap()
if (!params.containsKey(SyncDataParams.SORT_KEY)) {
newParams[SyncDataParams.SORT_KEY] = SyncDataParams.LAST_UPDATED_ASC_VALUE
}
if (lastUpdate != null) {
newParams[SyncDataParams.LAST_UPDATED_KEY] = "gt$lastUpdate"
}
return "${resourceType.name}?${newParams.concatParams()}"
}

private suspend fun upload(): Result {
val exceptions = mutableListOf<ResourceSyncException>()
fhirEngine.syncUpload { list ->
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/java/com/google/android/fhir/sync/Sync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ object Sync {
context: Context,
fhirEngine: FhirEngine,
dataSource: DataSource,
resourceSyncParams: ResourceSyncParams
downloadManager: DownloadManager
): Result {
return FhirSynchronizer(context, fhirEngine, dataSource, resourceSyncParams).synchronize()
return FhirSynchronizer(context, fhirEngine, dataSource, downloadManager).synchronize()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ interface SyncJob {
suspend fun run(
fhirEngine: FhirEngine,
dataSource: DataSource,
resourceSyncParams: ResourceSyncParams,
downloadManager: DownloadManager,
subscribeTo: MutableSharedFlow<State>?
): Result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ class SyncJobImpl(private val context: Context) : SyncJob {
override suspend fun run(
fhirEngine: FhirEngine,
dataSource: DataSource,
resourceSyncParams: ResourceSyncParams,
downloadManager: DownloadManager,
subscribeTo: MutableSharedFlow<State>?
): Result {
val fhirSynchronizer = FhirSynchronizer(context, fhirEngine, dataSource, resourceSyncParams)
val fhirSynchronizer = FhirSynchronizer(context, fhirEngine, dataSource, downloadManager)

if (subscribeTo != null) fhirSynchronizer.subscribe(subscribeTo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import com.google.android.fhir.db.impl.dao.LocalChangeToken
import com.google.android.fhir.db.impl.dao.SquashedLocalChange
import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.DataSource
import com.google.android.fhir.sync.DownloadManager
import com.google.common.truth.Truth.assertThat
import java.time.OffsetDateTime
import java.util.Date
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Meta
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.OperationOutcome
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
import org.json.JSONArray
Expand Down Expand Up @@ -109,6 +113,44 @@ class TestingUtils constructor(private val iParser: IParser) {
}
}

object TestDownloadManagerImpl : DownloadManager {

override fun getInitialUrl(): String {
return "Patient?address-city=NAIROBI"
}

override fun createDownloadUrl(preProcessUrl: String, lastUpdate: String?): String {
return preProcessUrl
}

override fun extractResourcesFromResponse(resourceResponse: Resource): Collection<Resource> {
val patient = Patient().setMeta(Meta().setLastUpdated(Date()))
return listOf(patient)
}

override fun extractNextUrlsFromResource(resourceResponse: Resource): Collection<String> {
return mutableListOf()
}
}

class TestDownloadManagerImplWithQueue : DownloadManager {
private val queueWork = mutableListOf("Patient/bob", "Encounter/doc")

override fun getInitialUrl(): String = TestDownloadManagerImpl.getInitialUrl()

override fun createDownloadUrl(preProcessUrl: String, lastUpdate: String?): String =
TestDownloadManagerImpl.createDownloadUrl(preProcessUrl, lastUpdate)

override fun extractResourcesFromResponse(resourceResponse: Resource): Collection<Resource> =
TestDownloadManagerImpl.extractResourcesFromResponse(resourceResponse)

override fun extractNextUrlsFromResource(resourceResponse: Resource): Collection<String> {
val returnQueueWork = ArrayList(queueWork)
queueWork.clear()
return returnQueueWork
}
}

object TestFhirEngineImpl : FhirEngine {
override suspend fun <R : Resource> save(vararg resource: R) {}

Expand Down
Loading

0 comments on commit 6dae038

Please sign in to comment.