Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(alm): cancel forkApi.delay and forkApi.pause if listener is cancelled or completed #2074

Merged
merged 1 commit into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/toolkit/src/listenerMiddleware/exceptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ const completed = 'completed'
const cancelled = 'cancelled'

/* TaskAbortError error codes */
export const taskCancelled = `${task}-${cancelled}` as const
export const taskCompleted = `${task}-${completed}` as const
export const taskCancelled = `task-${cancelled}` as const
export const taskCompleted = `task-${completed}` as const
export const listenerCancelled = `${listener}-${cancelled}` as const
export const listenerCompleted = `${listener}-${completed}` as const

export class TaskAbortError implements SerializedError {
name = 'TaskAbortError'
message = ''
constructor(public code = 'unknown') {
this.message = `task cancelled (reason: ${code})`
message: string
constructor(public code: string | undefined) {
this.message = `${task} ${cancelled} (reason: ${code})`
}
}
11 changes: 10 additions & 1 deletion packages/toolkit/src/listenerMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import type {
ForkedTask,
TypedRemoveListener,
TaskResult,
AbortSignalWithReason,
} from './types'
import {
abortControllerWithReason,
addAbortSignalListener,
assertFunction,
catchRejection,
} from './utils'
Expand Down Expand Up @@ -74,11 +76,18 @@ const INTERNAL_NIL_TOKEN = {} as const

const alm = 'listenerMiddleware' as const

const createFork = (parentAbortSignal: AbortSignal) => {
const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
const linkControllers = (controller: AbortController) =>
addAbortSignalListener(parentAbortSignal, () =>
abortControllerWithReason(controller, parentAbortSignal.reason)
)

return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()

linkControllers(childAbortController)

const result = runTask<T>(
async (): Promise<T> => {
validateActive(parentAbortSignal)
Expand Down
4 changes: 2 additions & 2 deletions packages/toolkit/src/listenerMiddleware/task.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TaskAbortError } from './exceptions'
import type { AbortSignalWithReason, TaskResult } from './types'
import { catchRejection } from './utils'
import { addAbortSignalListener, catchRejection } from './utils'

/**
* Synchronously raises {@link TaskAbortError} if the task tied to the input `signal` has been cancelled.
Expand Down Expand Up @@ -29,7 +29,7 @@ export const promisifyAbortSignal = (
if (signal.aborted) {
notifyRejection()
} else {
signal.addEventListener('abort', notifyRejection, { once: true })
addAbortSignalListener(signal, notifyRejection)
}
})
)
Expand Down
92 changes: 90 additions & 2 deletions packages/toolkit/src/listenerMiddleware/tests/fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ import type { EnhancedStore } from '@reduxjs/toolkit'
import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'

import type { PayloadAction } from '@reduxjs/toolkit'
import type { ForkedTaskExecutor, TaskResult } from '../types'
import type {
AbortSignalWithReason,
ForkedTaskExecutor,
TaskResult,
} from '../types'
import { createListenerMiddleware, TaskAbortError } from '../index'
import { listenerCancelled, taskCancelled } from '../exceptions'
import {
listenerCancelled,
listenerCompleted,
taskCancelled,
} from '../exceptions'

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand Down Expand Up @@ -312,6 +320,58 @@ describe('fork', () => {
})
})

test('forkApi.delay rejects as soon as the parent listener is cancelled', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
listenerApi.cancelActiveListeners()
await listenerApi.fork(async (forkApi) => {
await forkApi
.delay(100)
.then(deferredResult.resolve, deferredResult.resolve)

return 4
}).result

deferredResult.resolve(new Error('unreachable'))
},
})

store.dispatch(increment())

await Promise.resolve()

store.dispatch(increment())
expect(await deferredResult).toEqual(
new TaskAbortError(listenerCancelled)
)
})

test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
async effect(_, listenerApi) {
const wronglyDoNotAwaitResultOfTask = listenerApi.fork(
async (forkApi) => {
forkApi.signal.addEventListener('abort', () => {
deferredResult.resolve(
(forkApi.signal as AbortSignalWithReason<unknown>).reason
)
})
}
)
},
})

store.dispatch(increment)

expect(await deferredResult).toBe(listenerCompleted)
})

test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => {
let deferredCompletedEvt = deferred()
let deferredCancelledEvt = deferred()
Expand Down Expand Up @@ -384,6 +444,34 @@ describe('fork', () => {
})
})

test('forkApi.pause rejects as soon as the parent listener is cancelled', async () => {
let deferredResult = deferred()

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
listenerApi.cancelActiveListeners()
const forkedTask = listenerApi.fork(async (forkApi) => {
await forkApi
.pause(delay(100))
.then(deferredResult.resolve, deferredResult.resolve)

return 4
})

await forkedTask.result
deferredResult.resolve(new Error('unreachable'))
},
})

store.dispatch(increment())

await Promise.resolve()

store.dispatch(increment())
expect(await deferredResult).toEqual(new TaskAbortError(listenerCancelled))
})

test('forkApi.pause rejects if listener is cancelled', async () => {
const incrementByInListener = createAction<number>('incrementByInListener')

Expand Down
4 changes: 2 additions & 2 deletions packages/toolkit/src/listenerMiddleware/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ export type MatchFunction<T> = (v: any) => v is T
export interface ForkedTaskAPI {
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the task has been cancelled or completed.
* rejects if the task or the parent listener has been cancelled or is completed.
*/
pause<W>(waitFor: Promise<W>): Promise<W>
/**
* Returns a promise resolves after `timeoutMs` or
* rejects if the task has been cancelled or is completed.
* rejects if the task or the parent listener has been cancelled or is completed.
* @param timeoutMs
*/
delay(timeoutMs: number): Promise<void>
Expand Down
7 changes: 7 additions & 0 deletions packages/toolkit/src/listenerMiddleware/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ export const catchRejection = <T>(
return promise
}

export const addAbortSignalListener = (
abortSignal: AbortSignal,
callback: (evt: Event) => void
) => {
abortSignal.addEventListener('abort', callback, { once: true })
}

/**
* Calls `abortController.abort(reason)` and patches `signal.reason`.
* if it is not supported.
Expand Down