Skip to content

Commit

Permalink
Updated PerResourcePatchGenerator to return ordered PatchMapping to a…
Browse files Browse the repository at this point in the history
…void referential integrity issues. (#2442)

* Updated PerResourcePatchGenerator  to return ordererd PatchMapping to avoid referential integrity issues.

* Fixed the failing test by fixing the palyload json format

* Review Comments: Refactored code to find references using LocalChangeResourceReferenceEntity

* Review comments: Refactored code

* Added files

* Review comments: Changes to cache PerResourcePatchGenerator instance

* spotless

* Review comments: Simplified tests and added kdocs
  • Loading branch information
aditya-07 committed Mar 11, 2024
1 parent ee810d6 commit 9697a8a
Show file tree
Hide file tree
Showing 14 changed files with 713 additions and 79 deletions.
1 change: 1 addition & 0 deletions engine/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ dependencies {
testImplementation(Dependencies.AndroidxTest.core)
testImplementation(Dependencies.AndroidxTest.workTestingRuntimeKtx)
testImplementation(Dependencies.Kotlin.kotlinCoroutinesTest)
testImplementation(Dependencies.Kotlin.kotlinTestJunit)
testImplementation(Dependencies.junit)
testImplementation(Dependencies.jsonAssert)
testImplementation(Dependencies.mockitoInline)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.google.android.fhir

import android.content.Context
import com.google.android.fhir.DatabaseErrorStrategy.UNSPECIFIED
import com.google.android.fhir.db.Database
import com.google.android.fhir.sync.DataSource
import com.google.android.fhir.sync.FhirDataStore
import com.google.android.fhir.sync.HttpAuthenticator
Expand Down Expand Up @@ -73,6 +74,11 @@ object FhirEngineProvider {
return getOrCreateFhirService(context).fhirDataStore
}

@Synchronized
internal fun getFhirDatabase(context: Context): Database {
return getOrCreateFhirService(context).database
}

@Synchronized
private fun getOrCreateFhirService(context: Context): FhirServices {
if (fhirServices == null) {
Expand Down
15 changes: 15 additions & 0 deletions engine/src/main/java/com/google/android/fhir/db/Database.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,24 @@ internal interface Database {
* will delete resource entry from LocalChangeEntity table.
*/
suspend fun purge(type: ResourceType, id: String, forcePurge: Boolean = false)

/**
* @return List of [LocalChangeResourceReference] associated with the [LocalChangeEntity.id]s. A
* single [LocalChangeEntity] may have one or more [LocalChangeResourceReference] associated
* with it.
*/
suspend fun getLocalChangeResourceReferences(
localChangeIds: List<Long>,
): List<LocalChangeResourceReference>
}

data class ResourceWithUUID<R>(
val uuid: UUID,
val resource: R,
)

data class LocalChangeResourceReference(
val localChangeId: Long,
val resourceReferenceValue: String,
val resourceReferencePath: String?,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import ca.uhn.fhir.util.FhirTerser
import com.google.android.fhir.DatabaseErrorStrategy
import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.db.LocalChangeResourceReference
import com.google.android.fhir.db.ResourceNotFoundException
import com.google.android.fhir.db.ResourceWithUUID
import com.google.android.fhir.db.impl.DatabaseImpl.Companion.UNENCRYPTED_DATABASE_NAME
Expand Down Expand Up @@ -408,6 +409,18 @@ internal class DatabaseImpl(
}
}

override suspend fun getLocalChangeResourceReferences(
localChangeIds: List<Long>,
): List<LocalChangeResourceReference> {
return localChangeDao.getReferencesForLocalChanges(localChangeIds).map {
LocalChangeResourceReference(
it.localChangeId,
it.resourceReferenceValue,
it.resourceReferencePath,
)
}
}

companion object {
/**
* The name for unencrypted database.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -348,6 +348,17 @@ internal abstract class LocalChangeDao {
localChangeId: Long,
): List<LocalChangeResourceReferenceEntity>

@Query(
"""
SELECT *
FROM LocalChangeResourceReferenceEntity
WHERE localChangeId = (:localChangeId)
""",
)
abstract suspend fun getReferencesForLocalChanges(
localChangeId: List<Long>,
): List<LocalChangeResourceReferenceEntity>

@Insert(onConflict = OnConflictStrategy.REPLACE)
abstract suspend fun insertLocalChangeResourceReferences(
resourceReferences: List<LocalChangeResourceReferenceEntity>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
UploadConfiguration(
Uploader(
dataSource = dataSource,
patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode),
patchGenerator =
PatchGeneratorFactory.byMode(
getUploadStrategy().patchGeneratorMode,
FhirEngineProvider.getFhirDatabase(applicationContext),
),
requestGenerator =
UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode),
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.google.android.fhir.sync.upload.patch

import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database

/**
* Generates [Patch]es from [LocalChange]s and output [List<[PatchMapping]>] to keep a mapping of
Expand All @@ -34,16 +35,17 @@ internal interface PatchGenerator {
* NOTE: different implementations may have requirements on the size of [localChanges] and output
* certain numbers of [Patch]es.
*/
fun generate(localChanges: List<LocalChange>): List<PatchMapping>
suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping>
}

internal object PatchGeneratorFactory {
fun byMode(
mode: PatchGeneratorMode,
database: Database,
): PatchGenerator =
when (mode) {
is PatchGeneratorMode.PerChange -> PerChangePatchGenerator
is PatchGeneratorMode.PerResource -> PerResourcePatchGenerator
is PatchGeneratorMode.PerResource -> PerResourcePatchGenerator.with(database)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync.upload.patch

import androidx.annotation.VisibleForTesting
import com.google.android.fhir.db.Database
import com.google.android.fhir.db.LocalChangeResourceReference

private typealias Node = String

/**
* Orders the [PatchMapping]s to maintain referential integrity during upload.
*
* ```
* Encounter().apply {
* id = "encounter-1"
* subject = Reference("Patient/patient-1")
* }
*
* Observation().apply {
* id = "observation-1"
* subject = Reference("Patient/patient-1")
* encounter = Reference("Encounter/encounter-1")
* }
* ```
* * The Encounter has an outgoing reference to Patient and the Observation has outgoing references
* to Patient and the Encounter.
* * Now, to maintain the referential integrity of the resources during the upload,
* `Encounter/encounter-1` must go before the `Observation/observation-1`, irrespective of the
* order in which the Encounter and Observation were added to the database.
*/
internal object PatchOrdering {

private val PatchMapping.resourceTypeAndId: String
get() = "${generatedPatch.resourceType}/${generatedPatch.resourceId}"

/**
* Order the [PatchMapping] so that if the resource A has outgoing references {B,C} (CREATE) and
* {D} (UPDATE), then B,C needs to go before the resource A so that referential integrity is
* retained. Order of D shouldn't matter for the purpose of referential integrity.
*
* @return - A ordered list of the [PatchMapping]s based on the references to other [PatchMapping]
* if the mappings are acyclic
* - throws [IllegalStateException] otherwise
*/
suspend fun List<PatchMapping>.orderByReferences(
database: Database,
): List<PatchMapping> {
val resourceIdToPatchMapping = associateBy { patchMapping -> patchMapping.resourceTypeAndId }

/* Get LocalChangeResourceReferences for all the local changes. A single LocalChange may have
multiple LocalChangeResourceReference, one for each resource reference in the
LocalChange.payload.*/
val localChangeIdToResourceReferenceMap: Map<Long, List<LocalChangeResourceReference>> =
database
.getLocalChangeResourceReferences(flatMap { it.localChanges.flatMap { it.token.ids } })
.groupBy { it.localChangeId }

val adjacencyList = createAdjacencyListForCreateReferences(localChangeIdToResourceReferenceMap)
return createTopologicalOrderedList(adjacencyList).mapNotNull { resourceIdToPatchMapping[it] }
}

/**
* @return A map of [PatchMapping] to all the outgoing references to the other [PatchMapping]s of
* type [Patch.Type.INSERT] .
*/
@VisibleForTesting
internal fun List<PatchMapping>.createAdjacencyListForCreateReferences(
localChangeIdToReferenceMap: Map<Long, List<LocalChangeResourceReference>>,
): Map<Node, List<Node>> {
val adjacencyList = mutableMapOf<Node, List<Node>>()
/* if the outgoing reference is to a resource that's just an update and not create, then don't
link to it. This may make the sub graphs smaller and also help avoid cyclic dependencies.*/
val resourceIdsOfInsertTypeLocalChanges =
asSequence()
.filter { it.generatedPatch.type == Patch.Type.INSERT }
.map { it.resourceTypeAndId }
.toSet()

forEach { patchMapping ->
adjacencyList[patchMapping.resourceTypeAndId] =
patchMapping.findOutgoingReferences(localChangeIdToReferenceMap).filter {
resourceIdsOfInsertTypeLocalChanges.contains(it)
}
}
return adjacencyList
}

private fun PatchMapping.findOutgoingReferences(
localChangeIdToReferenceMap: Map<Long, List<LocalChangeResourceReference>>,
): Set<Node> {
val references = mutableSetOf<Node>()
when (generatedPatch.type) {
Patch.Type.INSERT,
Patch.Type.UPDATE, -> {
localChanges.forEach { localChange ->
localChange.token.ids.forEach { id ->
localChangeIdToReferenceMap[id]?.let {
references.addAll(it.map { it.resourceReferenceValue })
}
}
}
}
Patch.Type.DELETE -> {
// do nothing
}
}
return references
}

private fun createTopologicalOrderedList(adjacencyList: Map<Node, List<Node>>): List<Node> {
val stack = ArrayDeque<String>()
val visited = mutableSetOf<String>()
val currentPath = mutableSetOf<String>()

fun dfs(key: String) {
check(currentPath.add(key)) { "Detected a cycle." }
if (visited.add(key)) {
adjacencyList[key]?.forEach { dfs(it) }
stack.addFirst(key)
}
currentPath.remove(key)
}

adjacencyList.keys.forEach { dfs(it) }
return stack.reversed()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@ import com.google.android.fhir.LocalChange
* maintain an audit trail.
*/
internal object PerChangePatchGenerator : PatchGenerator {
override fun generate(localChanges: List<LocalChange>): List<PatchMapping> =
override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> =
localChanges.map {
PatchMapping(
localChanges = listOf(it),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.github.fge.jackson.JsonLoader
import com.github.fge.jsonpatch.JsonPatch
import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChange.Type
import com.google.android.fhir.db.Database
import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences

/**
* Generates a [Patch] for all [LocalChange]es made to a single FHIR resource.
Expand All @@ -30,10 +32,16 @@ import com.google.android.fhir.LocalChange.Type
* maintain an audit trail, but instead, multiple changes made to the same FHIR resource on the
* client can be recorded as a single change on the server.
*/
internal object PerResourcePatchGenerator : PatchGenerator {
internal class PerResourcePatchGenerator private constructor(val database: Database) :
PatchGenerator {

override fun generate(localChanges: List<LocalChange>): List<PatchMapping> {
return localChanges
override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> {
return generateSquashedChangesMapping(localChanges).orderByReferences(database)
}

@androidx.annotation.VisibleForTesting
internal fun generateSquashedChangesMapping(localChanges: List<LocalChange>) =
localChanges
.groupBy { it.resourceType to it.resourceId }
.values
.mapNotNull { resourceLocalChanges ->
Expand All @@ -44,7 +52,6 @@ internal object PerResourcePatchGenerator : PatchGenerator {
)
}
}
}

private fun mergeLocalChangesForSingleResource(localChanges: List<LocalChange>): Patch? {
// TODO (maybe this should throw exception when two entities don't have the same versionID)
Expand Down Expand Up @@ -138,4 +145,20 @@ internal object PerResourcePatchGenerator : PatchGenerator {
mergedOperations.values.flatten().forEach(mergedNode::add)
return objectMapper.writeValueAsString(mergedNode)
}

companion object {

@Volatile private lateinit var _instance: PerResourcePatchGenerator

@Synchronized
fun with(database: Database): PerResourcePatchGenerator {
if (!::_instance.isInitialized) {
_instance = PerResourcePatchGenerator(database)
} else if (_instance.database != database) {
_instance = PerResourcePatchGenerator(database)
}

return _instance
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ object TestFhirEngineImpl : FhirEngine {
LocalChange(
resourceType = type.name,
resourceId = id,
payload = "{ 'resourceType' : '$type', 'id' : '$id' }",
token = LocalChangeToken(listOf()),
payload = """{ "resourceType" : "$type", "id" : "$id" }""",
token = LocalChangeToken(listOf(1)),
type = LocalChange.Type.INSERT,
timestamp = Instant.now(),
),
Expand Down
Loading

0 comments on commit 9697a8a

Please sign in to comment.