Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into lincheck-with-mode…
Browse files Browse the repository at this point in the history
…l-checking

# Conflicts:
#	kotlinx-coroutines-core/build.gradle
  • Loading branch information
ndkoval committed Nov 18, 2020
2 parents c7971d4 + 179f142 commit 1adfb4f
Show file tree
Hide file tree
Showing 30 changed files with 216 additions and 52 deletions.
13 changes: 13 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ buildscript {
throw new IllegalArgumentException("'kotlin_snapshot_version' should be defined when building with snapshot compiler")
}
}
// These three flags are enabled in train builds for JVM IR compiler testing
ext.jvm_ir_enabled = rootProject.properties['enable_jvm_ir'] != null
ext.jvm_ir_api_check_enabled = rootProject.properties['enable_jvm_ir_api_check'] != null
ext.native_targets_enabled = rootProject.properties['disable_native_targets'] == null

// Determine if any project dependency is using a snapshot version
ext.using_snapshot_version = build_snapshot_train
Expand Down Expand Up @@ -323,3 +327,12 @@ knit {
}

knitPrepare.dependsOn getTasksByName("dokka", true)

// Disable binary compatibility check for JVM IR compiler output by default
if (jvm_ir_enabled) {
subprojects { project ->
configure(tasks.matching { it.name == "apiCheck" }) {
enabled = enabled && jvm_ir_api_check_enabled
}
}
}
8 changes: 6 additions & 2 deletions gradle/compile-jvm-multiplatform.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ sourceCompatibility = 1.6
targetCompatibility = 1.6

kotlin {
targets {
fromPreset(presets.jvm, 'jvm')
jvm {
if (rootProject.ext.jvm_ir_enabled) {
compilations.all {
kotlinOptions.useIR = true
}
}
}
sourceSets {
jvmTest.dependencies {
Expand Down
6 changes: 6 additions & 0 deletions gradle/compile-jvm.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ apply plugin: 'org.jetbrains.kotlin.jvm'
sourceCompatibility = 1.6
targetCompatibility = 1.6

if (rootProject.ext.jvm_ir_enabled) {
kotlin.target.compilations.all {
kotlinOptions.useIR = true
}
}

dependencies {
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
// Workaround to make addSuppressed work in tests
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ public final class kotlinx/coroutines/flow/LintKt {
}

public abstract interface class kotlinx/coroutines/flow/MutableSharedFlow : kotlinx/coroutines/flow/FlowCollector, kotlinx/coroutines/flow/SharedFlow {
public abstract fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getSubscriptionCount ()Lkotlinx/coroutines/flow/StateFlow;
public abstract fun resetReplayCache ()V
public abstract fun tryEmit (Ljava/lang/Object;)Z
Expand Down
21 changes: 17 additions & 4 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
apply plugin: 'org.jetbrains.kotlin.multiplatform'
apply from: rootProject.file("gradle/compile-jvm-multiplatform.gradle")
apply from: rootProject.file("gradle/compile-common.gradle")

if (rootProject.ext.native_targets_enabled) {
apply from: rootProject.file("gradle/compile-native-multiplatform.gradle")
}

apply from: rootProject.file("gradle/compile-js-multiplatform.gradle")
apply from: rootProject.file("gradle/compile-native-multiplatform.gradle")
apply from: rootProject.file('gradle/publish-npm-js.gradle')

/* ==========================================================================
Expand Down Expand Up @@ -52,8 +56,11 @@ static boolean isNativeDarwin(String name) { return ["ios", "macos", "tvos", "wa
static boolean isNativeOther(String name) { return ["linux", "mingw"].any { name.startsWith(it) } }

defineSourceSet("concurrent", ["common"]) { it in ["jvm", "native"] }
defineSourceSet("nativeDarwin", ["native"]) { isNativeDarwin(it) }
defineSourceSet("nativeOther", ["native"]) { isNativeOther(it) }

if (rootProject.ext.native_targets_enabled) {
defineSourceSet("nativeDarwin", ["native"]) { isNativeDarwin(it) }
defineSourceSet("nativeOther", ["native"]) { isNativeOther(it) }
}

/* ========================================================================== */

Expand Down Expand Up @@ -129,7 +136,7 @@ def configureNativeSourceSetPreset(name, preset) {
}

// :KLUDGE: Idea.active: Configure platform libraries for native source sets when working in IDEA
if (Idea.active) {
if (Idea.active && rootProject.ext.native_targets_enabled) {
def manager = project.ext.hostManager
def linuxPreset = kotlin.presets.linuxX64
def macosPreset = kotlin.presets.macosX64
Expand Down Expand Up @@ -197,6 +204,12 @@ jvmTest {
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for model checking
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
}
// TODO: JVM IR generates different stacktrace so temporary disable stacktrace tests
if (rootProject.ext.jvm_ir_enabled) {
filter {
excludeTestsMatching('kotlinx.coroutines.exceptions.StackTraceRecovery*')
}
}
}

jvmJar {
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ private class LazyDeferredCoroutine<T>(
* which means that if the original [coroutineContext], in which `withContext` was invoked,
* is cancelled by the time its dispatcher starts to execute the code,
* it discards the result of `withContext` and throws [CancellationException].
*
* The cancellation behaviour described above is enabled if and only if the dispatcher is being changed.
* For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and
* this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it.
*/
public suspend fun <T> withContext(
context: CoroutineContext,
Expand Down
13 changes: 1 addition & 12 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
override fun toString(): String =
"ChildCompletion[$child, $proposedUpdate]"
}

private class AwaitContinuation<T>(
Expand Down Expand Up @@ -1350,6 +1348,7 @@ internal abstract class JobNode<out J : Job>(
override val isActive: Boolean get() = true
override val list: NodeList? get() = null
override fun dispose() = (job as JobSupport).removeNode(this)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

internal class NodeList : LockFreeLinkedListHead(), Incomplete {
Expand Down Expand Up @@ -1384,15 +1383,13 @@ private class InvokeOnCompletion(
private val handler: CompletionHandler
) : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
}

private class ResumeOnCompletion(
job: Job,
private val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
}

private class ResumeAwaitOnCompletion<T>(
Expand All @@ -1411,15 +1408,13 @@ private class ResumeAwaitOnCompletion<T>(
continuation.resume(state.unboxState() as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

internal class DisposeOnCompletion(
job: Job,
private val handle: DisposableHandle
) : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCompletion[$handle]"
}

private class SelectJoinOnCompletion<R>(
Expand All @@ -1431,7 +1426,6 @@ private class SelectJoinOnCompletion<R>(
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
}

private class SelectAwaitOnCompletion<T, R>(
Expand All @@ -1443,7 +1437,6 @@ private class SelectAwaitOnCompletion<T, R>(
if (select.trySelect())
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
}

// -------- invokeOnCancellation nodes
Expand All @@ -1463,7 +1456,6 @@ private class InvokeOnCancelling(
override fun invoke(cause: Throwable?) {
if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
}
override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
}

internal class ChildHandleNode(
Expand All @@ -1472,7 +1464,6 @@ internal class ChildHandleNode(
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
override fun toString(): String = "ChildHandle[$childJob]"
}

// Same as ChildHandleNode, but for cancellable continuation
Expand All @@ -1483,7 +1474,5 @@ internal class ChildContinuation(
override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
}
override fun toString(): String =
"ChildContinuation[$child]"
}

Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ internal abstract class AbstractChannel<E>(
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal open class ChannelCoroutine<E>(
}

final override fun cancel(cause: CancellationException?) {
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
cancelInternal(cause ?: defaultCancellationException())
}

Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
*/
@Deprecated(
message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel",
replaceWith = ReplaceWith("shareIn(scope, 0, SharingStarted.Lazily)"),
replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"),
level = DeprecationLevel.WARNING
)
public fun <T> Flow<T>.broadcastIn(
Expand Down
17 changes: 17 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ public interface SharedFlow<out T> : Flow<T> {
* Use the `MutableSharedFlow(...)` constructor function to create an implementation.
*/
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
/**
* Emits a [value] to this shared flow, suspending on buffer overflow if the shared flow was created
* with the default [BufferOverflow.SUSPEND] strategy.
*
* See [tryEmit] for a non-suspending variant of this function.
*
* This method is **thread-safe** and can be safely invoked from concurrent coroutines without
* external synchronization.
*/
override suspend fun emit(value: T)

/**
* Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
* emitted successfully. When this function returns `false`, it means that the call to a plain [emit]
Expand All @@ -155,6 +166,9 @@ public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
* A shared flow configured with a [BufferOverflow] strategy other than [SUSPEND][BufferOverflow.SUSPEND]
* (either [DROP_OLDEST][BufferOverflow.DROP_OLDEST] or [DROP_LATEST][BufferOverflow.DROP_LATEST]) never
* suspends on [emit], and thus `tryEmit` to such a shared flow always returns `true`.
*
* This method is **thread-safe** and can be safely invoked from concurrent coroutines without
* external synchronization.
*/
public fun tryEmit(value: T): Boolean

Expand Down Expand Up @@ -190,6 +204,9 @@ public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
* supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
* to an initial value, just update its [value][MutableStateFlow.value].
*
* This method is **thread-safe** and can be safely invoked from concurrent coroutines without
* external synchronization.
*
* **Note: This is an experimental api.** This function may be removed or renamed in the future.
*/
@ExperimentalCoroutinesApi
Expand Down
12 changes: 5 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/SharingStarted.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,17 @@ public enum class SharingCommand {
/**
* A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators.
*
* This interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
* This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
* supports custom strategies by implementing this interface's [command] function.
*
* For example, it is possible to define a custom strategy that starts the upstream only when the number
* of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so
* that it looks like a built-in strategy on the use-site:
*
* ```
* fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted =
* object : SharingStarted {
* override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
* subscriptionCount
* .map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
* fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) =
* SharingStarted { subscriptionCount: StateFlow<Int> ->
* subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
* }
* ```
*
Expand All @@ -74,7 +72,7 @@ public enum class SharingCommand {
* The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
* The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
*/
public interface SharingStarted {
public fun interface SharingStarted {
public companion object {
/**
* Sharing is started immediately and never stops.
Expand Down
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
* The current value of this state flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
*
* This property is **thread-safe** and can be safely updated from concurrent coroutines without
* external synchronization.
*/
public override var value: T

Expand All @@ -170,6 +173,9 @@ public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
* This function use a regular comparison using [Any.equals]. If both [expect] and [update] are equal to the
* current [value], this function returns `true`, but it does not actually change the reference that is
* stored in the [value].
*
* This method is **thread-safe** and can be safely invoked from concurrent coroutines without
* external synchronization.
*/
public fun compareAndSet(expect: T, update: T): Boolean
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this@unsafeFlow)
} finally {
if (!second.isClosedForReceive) second.cancel()
second.cancel()
}
}
}
Loading

0 comments on commit 1adfb4f

Please sign in to comment.