Skip to content

Commit

Permalink
Save the down synced resources of a page as it is dowloaded. (#880)
Browse files Browse the repository at this point in the history
* Changes to save downloaded searchset page right after its downloaded instead of waiting for all the pages to be downloaded and then storing them.

* Test compilation error fix

* Refactored fhir engine api to use flow for writing downloaded resource pages to database

* Review comment changes

* spotless
  • Loading branch information
aditya-07 committed Jan 28, 2022
1 parent f50b322 commit 324af6d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 27 deletions.
6 changes: 4 additions & 2 deletions engine/src/main/java/com/google/android/fhir/FhirEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.google.android.fhir.db.impl.dao.LocalChangeToken
import com.google.android.fhir.db.impl.dao.SquashedLocalChange
import com.google.android.fhir.search.Search
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

Expand Down Expand Up @@ -70,9 +71,10 @@ interface FhirEngine {

/**
* Synchronizes the [download] result in the database. The database will be updated to reflect the
* result of the [download] operation.
* result of the [download] operation. [onPageDownloaded] is called with the resources after each
* successful download of page.
*/
suspend fun syncDownload(download: suspend (SyncDownloadContext) -> List<Resource>)
suspend fun syncDownload(download: suspend (SyncDownloadContext) -> Flow<List<Resource>>)

/**
* Returns the total count of entities available for given search.
Expand Down
28 changes: 16 additions & 12 deletions engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import com.google.android.fhir.search.count
import com.google.android.fhir.search.execute
import com.google.android.fhir.toTimeZoneString
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

Expand Down Expand Up @@ -63,19 +65,21 @@ internal class FhirEngineImpl(private val database: Database, private val contex
return DatastoreUtil(context).readLastSyncTimestamp()
}

override suspend fun syncDownload(download: suspend (SyncDownloadContext) -> List<Resource>) {
val resources =
download(
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type)
}
)

val timeStamps =
resources.groupBy { it.resourceType }.entries.map {
SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString())
override suspend fun syncDownload(
download: suspend (SyncDownloadContext) -> Flow<List<Resource>>
) {
download(
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType) = database.lastUpdate(type)
}
)
.collect { resources ->
val timeStamps =
resources.groupBy { it.resourceType }.entries.map {
SyncedResourceEntity(it.key, it.value.maxOf { it.meta.lastUpdated }.toTimeZoneString())
}
database.insertSyncedResources(timeStamps, resources)
}
database.insertSyncedResources(timeStamps, resources)
}

override suspend fun syncUpload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.google.android.fhir.isUploadSuccess
import com.google.android.fhir.logicalId
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flow
import org.hl7.fhir.exceptions.FHIRException
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
Expand Down Expand Up @@ -122,17 +123,17 @@ internal class FhirSynchronizer(
}

private suspend fun downloadResourceType(resourceType: ResourceType, params: ParamMap) {
fhirEngine.syncDownload { it ->
var nextUrl = getInitialUrl(resourceType, params, it.getLatestTimestampFor(resourceType))
val result = mutableListOf<Resource>()
while (nextUrl != null) {
val bundle = dataSource.loadData(nextUrl)
nextUrl = bundle.link.firstOrNull { component -> component.relation == "next" }?.url
if (bundle.type == Bundle.BundleType.SEARCHSET) {
result.addAll(bundle.entry.map { it.resource })
fhirEngine.syncDownload {
flow {
var nextUrl = getInitialUrl(resourceType, params, it.getLatestTimestampFor(resourceType))
while (nextUrl != null) {
val bundle = dataSource.loadData(nextUrl)
nextUrl = bundle.link.firstOrNull { component -> component.relation == "next" }?.url
if (bundle.type == Bundle.BundleType.SEARCHSET) {
emit(bundle.entry.map { it.resource })
}
}
}
return@syncDownload result
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.DataSource
import com.google.common.truth.Truth
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.OperationOutcome
Expand Down Expand Up @@ -124,14 +126,17 @@ class TestingUtils constructor(private val iParser: IParser) {
upload(listOf())
}

override suspend fun syncDownload(download: suspend (SyncDownloadContext) -> List<Resource>) {
override suspend fun syncDownload(
download: suspend (SyncDownloadContext) -> Flow<List<Resource>>
) {
download(
object : SyncDownloadContext {
override suspend fun getLatestTimestampFor(type: ResourceType): String {
return "123456788"
}
}
)
.collect {}
}
override suspend fun count(search: Search): Long {
return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.google.android.fhir.db.impl.entities.LocalChangeEntity
import com.google.android.fhir.resource.TestingUtils
import com.google.common.truth.Truth.assertThat
import java.util.Date
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.r4.model.Enumerations
import org.hl7.fhir.r4.model.Meta
Expand Down Expand Up @@ -129,9 +130,7 @@ class FhirEngineImplTest {

@Test
fun syncDownload_downloadResources() = runBlocking {
fhirEngine.syncDownload {
return@syncDownload listOf(TEST_PATIENT_2)
}
fhirEngine.syncDownload { flowOf((listOf(TEST_PATIENT_2))) }

testingUtils.assertResourceEquals(
TEST_PATIENT_2,
Expand Down

0 comments on commit 324af6d

Please sign in to comment.