Skip to content

Commit

Permalink
Ordering of patches with cycles during upload (#2524)
Browse files Browse the repository at this point in the history
* Ordering of patches with cyclic during upload

* Ignored failing test

* Refactored and added tests

* Refactored

* Updated code to use strongly connected resources

* Review comments: Seperated the code for finding ssc. Added some tets for Bundle generation.

* Refactored code and added tests.

* Refactored code and added tests.

* Updated StronglyConnectedPatches to compute node size

* Added comments

* Updated tarjan

* Review comments: Updated docs
  • Loading branch information
aditya-07 committed Jun 13, 2024
1 parent 1016558 commit 8878362
Show file tree
Hide file tree
Showing 13 changed files with 615 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database

/**
* Generates [Patch]es from [LocalChange]s and output [List<[PatchMapping]>] to keep a mapping of
* the [LocalChange]s to their corresponding generated [Patch]
* Generates [Patch]es from [LocalChange]s and output [List<[StronglyConnectedPatchMappings]>] to
* keep a mapping of the [LocalChange]s to their corresponding generated [Patch]
*
* INTERNAL ONLY. This interface should NEVER been exposed as an external API because it works
* together with other components in the upload package to fulfill a specific upload strategy.
Expand All @@ -35,7 +35,7 @@ internal interface PatchGenerator {
* NOTE: different implementations may have requirements on the size of [localChanges] and output
* certain numbers of [Patch]es.
*/
suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping>
suspend fun generate(localChanges: List<LocalChange>): List<StronglyConnectedPatchMappings>
}

internal object PatchGeneratorFactory {
Expand Down Expand Up @@ -67,3 +67,14 @@ internal data class PatchMapping(
val localChanges: List<LocalChange>,
val generatedPatch: Patch,
)

/**
* Structure to describe the cyclic nature of [PatchMapping].
* - A single value in [patchMappings] signifies the acyclic nature of the node.
* - Multiple values in [patchMappings] signifies the cyclic nature of the nodes among themselves.
*
* [StronglyConnectedPatchMappings] is used by the engine to make sure that related resources get
* uploaded to the server in the same request to maintain the referential integrity of resources
* during creation.
*/
internal data class StronglyConnectedPatchMappings(val patchMappings: List<PatchMapping>)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,26 @@ import androidx.annotation.VisibleForTesting
import com.google.android.fhir.db.Database
import com.google.android.fhir.db.LocalChangeResourceReference

private typealias Node = String
/** Represents a resource e.g. 'Patient/123' , 'Encounter/123'. */
internal typealias Node = String

/**
* Represents a collection of resources with reference to other resource represented as an edge.
* e.g. Two Patient resources p1 and p2, each with an encounter and subsequent observation will be
* represented as follows
*
* ```
* [
* 'Patient/p1' : [],
* 'Patient/p2' : [],
* 'Encounter/e1' : ['Patient/p1'], // Encounter.subject
* 'Encounter/e2' : ['Patient/p2'], // Encounter.subject
* 'Observation/o1' : ['Patient/p1', 'Encounter/e1'], // Observation.subject, Observation.encounter
* 'Observation/o2' : ['Patient/p2', 'Encounter/e2'], // Observation.subject, Observation.encounter
* ]
* ```
*/
internal typealias Graph = Map<Node, List<Node>>

/**
* Orders the [PatchMapping]s to maintain referential integrity during upload.
Expand Down Expand Up @@ -53,15 +72,16 @@ internal object PatchOrdering {
* {D} (UPDATE), then B,C needs to go before the resource A so that referential integrity is
* retained. Order of D shouldn't matter for the purpose of referential integrity.
*
* @return - A ordered list of the [PatchMapping]s based on the references to other [PatchMapping]
* if the mappings are acyclic
* - throws [IllegalStateException] otherwise
* @return A ordered list of the [StronglyConnectedPatchMappings] containing:
* - [StronglyConnectedPatchMappings] with single value for the [PatchMapping] based on the
* references to other [PatchMapping] if the mappings are acyclic
* - [StronglyConnectedPatchMappings] with multiple values for [PatchMapping]s based on the
* references to other [PatchMapping]s if the mappings are cyclic.
*/
suspend fun List<PatchMapping>.orderByReferences(
suspend fun List<PatchMapping>.sccOrderByReferences(
database: Database,
): List<PatchMapping> {
): List<StronglyConnectedPatchMappings> {
val resourceIdToPatchMapping = associateBy { patchMapping -> patchMapping.resourceTypeAndId }

/* Get LocalChangeResourceReferences for all the local changes. A single LocalChange may have
multiple LocalChangeResourceReference, one for each resource reference in the
LocalChange.payload.*/
Expand All @@ -71,7 +91,10 @@ internal object PatchOrdering {
.groupBy { it.localChangeId }

val adjacencyList = createAdjacencyListForCreateReferences(localChangeIdToResourceReferenceMap)
return createTopologicalOrderedList(adjacencyList).mapNotNull { resourceIdToPatchMapping[it] }

return StronglyConnectedPatches.scc(adjacencyList).map {
StronglyConnectedPatchMappings(it.mapNotNull { resourceIdToPatchMapping[it] })
}
}

/**
Expand Down Expand Up @@ -121,22 +144,4 @@ internal object PatchOrdering {
}
return references
}

private fun createTopologicalOrderedList(adjacencyList: Map<Node, List<Node>>): List<Node> {
val stack = ArrayDeque<String>()
val visited = mutableSetOf<String>()
val currentPath = mutableSetOf<String>()

fun dfs(key: String) {
check(currentPath.add(key)) { "Detected a cycle." }
if (visited.add(key)) {
adjacencyList[key]?.forEach { dfs(it) }
stack.addFirst(key)
}
currentPath.remove(key)
}

adjacencyList.keys.forEach { dfs(it) }
return stack.reversed()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ import com.google.android.fhir.LocalChange
* maintain an audit trail.
*/
internal object PerChangePatchGenerator : PatchGenerator {
override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> =
localChanges.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
override suspend fun generate(
localChanges: List<LocalChange>,
): List<StronglyConnectedPatchMappings> =
localChanges
.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
.map { StronglyConnectedPatchMappings(listOf(it)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.github.fge.jsonpatch.JsonPatch
import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChange.Type
import com.google.android.fhir.db.Database
import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences
import com.google.android.fhir.sync.upload.patch.PatchOrdering.sccOrderByReferences

/**
* Generates a [Patch] for all [LocalChange]es made to a single FHIR resource.
Expand All @@ -35,8 +35,10 @@ import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences
internal class PerResourcePatchGenerator private constructor(val database: Database) :
PatchGenerator {

override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> {
return generateSquashedChangesMapping(localChanges).orderByReferences(database)
override suspend fun generate(
localChanges: List<LocalChange>,
): List<StronglyConnectedPatchMappings> {
return generateSquashedChangesMapping(localChanges).sccOrderByReferences(database)
}

@androidx.annotation.VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync.upload.patch

import kotlin.math.min

internal object StronglyConnectedPatches {

/**
* Takes a [directedGraph] and computes all the strongly connected components in the graph.
*
* @return An ordered List of strongly connected components of the [directedGraph]. The SCCs are
* topologically ordered which may change based on the ordering algorithm and the [Node]s inside
* a SSC may be ordered randomly depending on the path taken by algorithm to discover the nodes.
*/
fun scc(directedGraph: Graph): List<List<Node>> {
return findSCCWithTarjan(directedGraph)
}

/**
* Finds strongly connected components in topological order. See
* https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
*/
private fun findSCCWithTarjan(diGraph: Graph): List<List<Node>> {
// Ideally the graph.keys should have all the nodes in the graph. But use values as well in case
// the input graph looks something like [ N1: [N2] ].
val nodeToIndex =
(diGraph.keys + diGraph.values.flatten().toSet())
.mapIndexed { index, s -> s to index }
.toMap()

val sccs = mutableListOf<List<Node>>()
val lowLinks = IntArray(nodeToIndex.size)
var exploringCounter = 0
val discoveryTimes = IntArray(nodeToIndex.size)

val visitedNodes = BooleanArray(nodeToIndex.size)
val nodesCurrentlyInStack = BooleanArray(nodeToIndex.size)
val stack = ArrayDeque<Node>()

fun Node.index() = nodeToIndex[this]!!

fun dfs(at: Node) {
lowLinks[at.index()] = exploringCounter
discoveryTimes[at.index()] = exploringCounter
visitedNodes[at.index()] = true
exploringCounter++
stack.addFirst(at)
nodesCurrentlyInStack[at.index()] = true

diGraph[at]?.forEach {
if (!visitedNodes[it.index()]) {
dfs(it)
}

if (nodesCurrentlyInStack[it.index()]) {
lowLinks[at.index()] = min(lowLinks[at.index()], lowLinks[it.index()])
}
}

// We have found the head node in the scc.
if (lowLinks[at.index()] == discoveryTimes[at.index()]) {
val connected = mutableListOf<Node>()
var node: Node
do {
node = stack.removeFirst()
connected.add(node)
nodesCurrentlyInStack[node.index()] = false
} while (node != at && stack.isNotEmpty())
sccs.add(connected.reversed())
}
}

diGraph.keys.forEach {
if (!visitedNodes[it.index()]) {
dfs(it)
}
}

return sccs
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.google.android.fhir.sync.upload.request
import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import com.google.android.fhir.sync.upload.patch.StronglyConnectedPatchMappings
import org.hl7.fhir.r4.model.Bundle

/** Generates list of [BundleUploadRequest] of type Transaction [Bundle] from the [Patch]es */
Expand All @@ -29,10 +30,38 @@ internal class TransactionBundleGenerator(
(patch: Patch, useETagForUpload: Boolean) -> BundleEntryComponentGenerator,
) : UploadRequestGenerator {

/**
* In order to accommodate cyclic dependencies between [PatchMapping]s and maintain referential
* integrity on the server, the [PatchMapping]s in a [StronglyConnectedPatchMappings] are all put
* in a single [BundleUploadRequestMapping]. Based on the [generatedBundleSize], the remaining
* space of the [BundleUploadRequestMapping] maybe filled with other
* [StronglyConnectedPatchMappings] mappings.
*
* In case a single [StronglyConnectedPatchMappings] has more [PatchMapping]s than the
* [generatedBundleSize], [generatedBundleSize] will be ignored so that all of the dependent
* mappings in [StronglyConnectedPatchMappings] can be sent in a single [Bundle].
*/
override fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<StronglyConnectedPatchMappings>,
): List<BundleUploadRequestMapping> {
return mappedPatches.chunked(generatedBundleSize).map { patchList ->
val mappingsPerBundle = mutableListOf<List<PatchMapping>>()

var bundle = mutableListOf<PatchMapping>()
mappedPatches.forEach {
if ((bundle.size + it.patchMappings.size) <= generatedBundleSize) {
bundle.addAll(it.patchMappings)
} else {
if (bundle.isNotEmpty()) {
mappingsPerBundle.add(bundle)
bundle = mutableListOf()
}
bundle.addAll(it.patchMappings)
}
}

if (bundle.isNotEmpty()) mappingsPerBundle.add(bundle)

return mappingsPerBundle.map { patchList ->
generateBundleRequest(patchList).let { mappedBundleRequest ->
BundleUploadRequestMapping(
splitLocalChanges = mappedBundleRequest.first,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ package com.google.android.fhir.sync.upload.request
import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import com.google.android.fhir.sync.upload.patch.StronglyConnectedPatchMappings
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.codesystems.HttpVerb

/**
* Generator that generates [UploadRequest]s from the [Patch]es present in the
* [List<[PatchMapping]>]. Any implementation of this generator is expected to output
* [List<[UploadRequestMapping]>] which maps [UploadRequest] to the corresponding [LocalChange]s it
* was generated from.
* [List<[StronglyConnectedPatchMappings]>]. Any implementation of this generator is expected to
* output [List<[UploadRequestMapping]>] which maps [UploadRequest] to the corresponding
* [LocalChange]s it was generated from.
*/
internal interface UploadRequestGenerator {
/** Generates a list of [UploadRequestMapping] from the [PatchMapping]s */
fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<StronglyConnectedPatchMappings>,
): List<UploadRequestMapping>
}

Expand Down
Loading

0 comments on commit 8878362

Please sign in to comment.