forked from google/android-fhir
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FhirSyncWorker.kt
158 lines (139 loc) · 5.48 KB
/
FhirSyncWorker.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.fhir.sync
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import androidx.work.workDataOf
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.FhirEngineProvider
import com.google.android.fhir.OffsetDateTimeTypeAdapter
import com.google.android.fhir.sync.download.DownloaderImpl
import com.google.android.fhir.sync.upload.BundleUploader
import com.google.android.fhir.sync.upload.LocalChangesPaginator
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
import com.google.gson.ExclusionStrategy
import com.google.gson.FieldAttributes
import com.google.gson.GsonBuilder
import java.time.OffsetDateTime
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import timber.log.Timber
/** A WorkManager Worker that handles periodic sync. */
abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameters) :
CoroutineWorker(appContext, workerParams) {
abstract fun getFhirEngine(): FhirEngine
abstract fun getDownloadWorkManager(): DownloadWorkManager
abstract fun getConflictResolver(): ConflictResolver
/**
* Configuration defining the max upload Bundle size (in terms to number of resources in a Bundle)
* and optionally defining the order of Resources.
*/
open fun getUploadConfiguration(): UploadConfiguration = UploadConfiguration()
private val gson =
GsonBuilder()
.registerTypeAdapter(OffsetDateTime::class.java, OffsetDateTimeTypeAdapter().nullSafe())
.setExclusionStrategies(StateExclusionStrategy())
.create()
/** The purpose of this api makes it easy to stub [FhirSyncWorker] for testing. */
internal open fun getDataSource() = FhirEngineProvider.getDataSource(applicationContext)
override suspend fun doWork(): Result {
val dataSource =
getDataSource()
?: return Result.failure(
buildWorkData(
IllegalStateException(
"FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration."
)
)
)
val flow = MutableSharedFlow<SyncJobStatus>()
val job =
CoroutineScope(Dispatchers.IO).launch {
flow.collect {
// now send Progress to work manager so caller app can listen
setProgress(buildWorkData(it))
if (it is SyncJobStatus.Finished || it is SyncJobStatus.Failed) {
}
}
}
Timber.v("Subscribed to flow for progress")
val result =
with(getUploadConfiguration()) {
FhirSynchronizer(
applicationContext,
getFhirEngine(),
BundleUploader(
dataSource,
TransactionBundleGenerator.getDefault(useETagForUpload),
LocalChangesPaginator.create(this)
),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver()
)
.apply { subscribe(flow) }
}
.synchronize()
val output = buildWorkData(result)
// await/join is needed to collect states completely
kotlin.runCatching { job.join() }.onFailure(Timber::w)
setProgress(output)
Timber.d("Received result from worker $result and sending output $output")
/**
* In case of failure, we can check if its worth retrying and do retry based on
* [RetryConfiguration.maxRetries] set by user.
*/
val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0)
return when {
result is SyncJobStatus.Finished -> {
Result.success(output)
}
retries > runAttemptCount -> {
Result.retry()
}
else -> {
Result.failure(output)
}
}
}
private fun buildWorkData(state: SyncJobStatus): Data {
return workDataOf(
// send serialized state and type so that consumer can convert it back
"StateType" to state::class.java.name,
"State" to gson.toJson(state)
)
}
private fun buildWorkData(exception: Exception): Data {
return workDataOf("error" to exception::class.java.name, "reason" to exception.message)
}
/**
* Exclusion strategy for [Gson] that handles field exclusions for [SyncJobStatus] returned by
* FhirSynchronizer. It should skip serializing the exceptions to avoid exceeding WorkManager
* WorkData limit
* @see <a
* href="https://github.com/google/android-fhir/issues/707">https://github.com/google/android-fhir/issues/707</a>
*/
internal class StateExclusionStrategy : ExclusionStrategy {
override fun shouldSkipField(field: FieldAttributes) = field.name.equals("exceptions")
override fun shouldSkipClass(clazz: Class<*>?) = false
}
}