跳到主要内容

 

createListenerMiddleware

概述

这是一个Redux中间件,它允许你定义包含额外逻辑的"监听器"条目和一个回调"效果",以及基于派发的动作或状态变化时应运行该回调的方式。

它旨在作为更广泛使用的Redux异步中间件(如sagas和observables)的轻量级替代品。虽然在复杂性和概念上类似于thunks,但它可以用来复制一些常见的saga使用模式。

从概念上讲,你可以将其视为类似于React的useEffect钩子,只是它在响应Redux存储更新而不是组件props/state更新时运行逻辑。

监听器效果回调可以访问dispatchgetState,类似于thunks。监听器还接收一组异步工作流函数,如takeconditionpauseforkunsubscribe,这些函数允许编写更复杂的异步逻辑。

监听器可以通过在设置期间调用listenerMiddleware.startListening()静态定义,或者可以通过特殊的dispatch(addListener())dispatch(removeListener())动作在运行时动态添加和删除。

基本使用

import { configureStore, createListenerMiddleware } from '@reduxjs/toolkit'

import todosReducer, {
todoAdded,
todoToggled,
todoDeleted,
} from '../features/todos/todosSlice'

// 创建中间件实例和方法
const listenerMiddleware = createListenerMiddleware()

// 添加一个或多个查找特定动作的监听器条目。
// 它们可能包含任何同步或异步逻辑,类似于thunks。
listenerMiddleware.startListening({
actionCreator: todoAdded,
effect: async (action, listenerApi) => {
// 在这里运行你想要的任何额外的副作用逻辑
console.log('Todo added: ', action.payload.text)

// 可以取消其他正在运行的实例
listenerApi.cancelActiveListeners()

// 运行异步逻辑
const data = await fetchData()

// 暂停,直到动作被派发或状态改变
if (await listenerApi.condition(matchSomeAction)) {
// 使用监听器API方法来派发,获取状态,
// 取消监听器,启动子任务等等
listenerApi.dispatch(todoAdded('Buy pet food'))

// 生成"子任务",可以做更多的工作并返回结果
const task = listenerApi.fork(async (forkApi) => {
// 可以暂停执行
await forkApi.delay(5)
// 通过返回一个值来完成子任务
return 42
})

const result = await task.result
// 在监听器中解包子结果
if (result.status === 'ok') {
// 记录返回的结果值`42`
console.log('Child succeeded: ', result.value)
}
}
},
})

const store = configureStore({
reducer: {
todos: todosReducer,
},
// 将监听器中间件添加到存储中。
// 注意:由于这可以接收包含函数的动作,
// 它应该在可序列化检查中间件之前
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().prepend(listenerMiddleware.middleware),
})

createListenerMiddleware

创建一个中间件实例,然后应通过configureStoremiddleware参数添加到存储中。

const createListenerMiddleware = (options?: CreateMiddlewareOptions) =>
ListenerMiddlewareInstance

interface CreateListenerMiddlewareOptions<ExtraArgument = unknown> {
extra?: ExtraArgument
onError?: ListenerErrorHandler
}

type ListenerErrorHandler = (
error: unknown,
errorInfo: ListenerErrorInfo,
) => void

interface ListenerErrorInfo {
raisedBy: 'effect' | 'predicate'
}

中间件选项

  • extra:一个可选的"额外参数",将被注入到每个监听器的listenerApi参数中。等同于Redux Thunk中间件中的"额外参数"
  • onError:一个可选的错误处理器,当listener引发同步和异步错误,以及predicate抛出同步错误时被调用。

监听器中间件实例

createListenerMiddleware返回的"监听器中间件实例"是一个对象,类似于createSlice生成的"slice"对象。实例对象并非实际的Redux中间件本身。相反,它包含中间件和一些实例方法,用于在中间件内添加和删除监听器条目。

interface ListenerMiddlewareInstance<
State = unknown,
Dispatch extends ThunkDispatch<State, unknown, UnknownAction> = ThunkDispatch<
State,
unknown,
UnknownAction
>,
ExtraArgument = unknown,
> {
middleware: ListenerMiddleware<State, Dispatch, ExtraArgument>
startListening: (options: AddListenerOptions) => Unsubscribe
stopListening: (
options: AddListenerOptions & UnsubscribeListenerOptions,
) => boolean
clearListeners: () => void
}

middleware

实际的Redux中间件。通过the configureStore.middleware option将其添加到Redux存储中。

由于监听器中间件可以接收包含函数的"add"和"remove"动作,因此通常应将其作为链中的第一个中间件添加,以便在可序列化检查中间件之前。

const store = configureStore({
reducer: {
todos: todosReducer,
},
// 将监听器中间件添加到存储中。
// 注意:由于这可以接收包含函数的动作,
// 它应该在可序列化检查中间件之前
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().prepend(listenerMiddleware.middleware),
})

startListening

向中间件添加新的监听器条目。通常用于在应用程序设置期间"静态"添加新的监听器。

const startListening = (options: AddListenerOptions) => UnsubscribeListener

interface AddListenerOptions {
// 决定监听器何时运行的四个选项:

// 1) 精确的动作类型字符串匹配
type?: string

// 2) 基于RTK动作创建器的精确动作类型匹配
actionCreator?: ActionCreator

// 3) 使用RTK匹配器匹配多个动作
matcher?: Matcher

// 4) 基于动作+状态的组合返回true
predicate?: ListenerPredicate

// 当动作匹配时运行的实际回调
effect: (action: Action, listenerApi: ListenerApi) => void | Promise<void>
}

type ListenerPredicate<Action extends ReduxAction, State> = (
action: Action,
currentState?: State,
originalState?: State,
) => boolean

type UnsubscribeListener = (
unsubscribeOptions?: UnsubscribeListenerOptions,
) => void

interface UnsubscribeListenerOptions {
cancelActive?: true
}

你必须提供决定监听器何时运行的四个选项中的一个:typeactionCreatormatcherpredicate。每次派发一个动作时,都会检查每个监听器是否应该基于当前动作与提供的比较选项运行。

这些都是可以接受的:

// 1) 动作类型字符串
listenerMiddleware.startListening({ type: 'todos/todoAdded', effect })
// 2) RTK动作创建器
listenerMiddleware.startListening({ actionCreator: todoAdded, effect })
// 3) RTK匹配函数
listenerMiddleware.startListening({
matcher: isAnyOf(todoAdded, todoToggled),
effect,
})
// 4) 监听器谓词
listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// 当监听器应该运行时返回true
},
effect,
})

注意,predicate选项实际上允许仅根据与状态相关的检查进行匹配,例如"是否改变了state.x"或"当前的state.x值匹配某些标准",而不考虑实际的动作。

RTK中包含的"matcher"实用函数可以作为matcherpredicate选项。

返回值是一个unsubscribe()回调,将删除此监听器。默认情况下,取消订阅将不会取消任何活动的监听器实例。但是,你也可以传入{cancelActive: true}来取消正在运行的实例。

如果你试图添加一个监听器条目,但是已经存在一个具有此确切函数引用的条目,那么不会添加新的条目,而是返回现有的unsubscribe方法。

effect回调将接收当前动作作为其第一个参数,以及一个类似于createAsyncThunk中的"thunk API"对象的"listener API"对象。

所有监听器谓词和回调都在根reducer已经处理了动作并更新了状态之后进行检查。listenerApi.getOriginalState()方法可以用来获取在触发此监听器的动作被处理之前存在的状态值。

stopListening

移除给定的监听器条目。

它接受与 startListening() 相同的参数。它通过比较 listener 和提供的 actionCreator/matcher/predicate 函数或 type 字符串的函数引用来检查是否存在现有的监听器条目。

默认情况下,这并 取消任何正在运行的实例。然而,你也可以传入 {cancelActive: true} 来取消正在运行的实例。

const stopListening = (
options: AddListenerOptions & UnsubscribeListenerOptions,
) => boolean

interface UnsubscribeListenerOptions {
cancelActive?: true
}

如果监听器条目已被移除,则返回 true ,如果没有找到与提供的输入匹配的订阅,则返回 false

// 示例:
// 1) Action 类型字符串
listenerMiddleware.stopListening({
type: 'todos/todoAdded',
listener,
cancelActive: true,
})
// 2) RTK action 创建器
listenerMiddleware.stopListening({ actionCreator: todoAdded, effect })
// 3) RTK 匹配函数
listenerMiddleware.stopListening({ matcher, effect, cancelActive: true })
// 4) 监听器谓词
listenerMiddleware.stopListening({ predicate, effect })

clearListeners

移除所有当前的监听器条目。它还会取消所有这些监听器的正在运行的实例。

这在测试场景中可能非常有用,其中单个中间件或存储实例可能在多个测试中使用,以及一些应用程序清理情况。

const clearListeners = () => void;

Action 创建器

除了通过直接在监听器实例上调用方法来添加和移除监听器外,你还可以通过分发特殊的 "add" 和 "remove" 动作在运行时动态地添加和移除监听器。这些都是从主 RTK 包中导出的标准 RTK 生成的动作创建器。

addListener

一个标准的 RTK 动作创建器,从包中导入。分发这个动作告诉中间件在运行时动态地添加一个新的监听器。它接受与 startListening() 完全相同的选项。

分发这个动作会从 dispatch 返回一个 unsubscribe() 回调。

// 如上所述,提供 `predicate` 或任何其他比较选项
const unsubscribe = store.dispatch(addListener({ predicate, effect }))

removeListener

一个标准的 RTK 动作创建器,从包中导入。分发这个动作告诉中间件在运行时动态地移除一个监听器。接受与 stopListening() 相同的参数。

默认情况下,这并 取消任何正在运行的实例。然而,你也可以传入 {cancelActive: true} 来取消正在运行的实例。

如果监听器条目已被移除,则返回 true ,如果没有找到与提供的输入匹配的订阅,则返回 false

const wasRemoved = store.dispatch(
removeListener({ predicate, effect, cancelActive: true }),
)

clearAllListeners

一个标准的 RTK 动作创建器,从包中导入。分发这个动作告诉中间件移除所有当前的监听器条目。它还会取消所有这些监听器的正在运行的实例。

store.dispatch(clearAllListeners())

监听器 API

listenerApi 对象是每个监听器回调的第二个参数。它包含了一些可以在监听器逻辑的任何地方调用的实用函数。

export interface ListenerEffectAPI<
State,
Dispatch extends ReduxDispatch<UnknownAction>,
ExtraArgument = unknown,
> extends MiddlewareAPI<Dispatch, State> {
// 注意:MiddlewareAPI 已经包含了 `dispatch` 和 `getState`

/**
* 返回当动作最初被分发时的存储状态,_在_ reducers 运行之前。
* 这个函数只能**同步**调用,否则会抛出错误。
*/
getOriginalState: () => State
/**
* 从中间件中移除监听器条目并阻止未来的监听器实例运行。
* 它并**不**取消任何正在运行的实例。
*/
unsubscribe(): void
/**
* 如果监听器之前被移除,它将订阅一个监听器,否则无操作。
*/
subscribe(): void
/**
* 返回一个 promise,当输入的谓词返回 `true` 或
* 如果监听器被取消或完成时拒绝。
*
* 如果谓词成功,返回值是 `true`,如果提供了超时并且首先过期,返回值是 `false`。
*/
condition: ConditionFunction<State>
/**
* 返回一个 promise,当输入的谓词返回 `true` 或
* 如果监听器被取消或完成时拒绝。
*
* 返回值是谓词看到的 `[action, currentState, previousState]` 组合。
*
* 如果提供了超时并且首先过期,promise 解析为 null。
*/
take: TakePattern<State>
/**
* 取消除了发出此调用的这个之外的这个相同监听器的所有其他正在运行的实例。
*/
cancelActiveListeners: () => void
/**
* 取消发出此调用的监听器实例。
*/
cancel: () => void
/**
* 如果这个监听器已经被取消,抛出一个 `TaskAbortError`
*/
throwIfCancelled: () => void
/**
* 如果监听器执行被中止或完成,其 `aborted` 属性被设置为 `true` 的中止信号。
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
signal: AbortSignal
/**
* 返回一个 promise,当 `timeoutMs` 后解析或
* 如果监听器被取消或完成时拒绝。
*/
delay(timeoutMs: number): Promise<void>
/**
* 在下一个微任务中排队执行任务。
*/
fork<T>(executor: ForkedTaskExecutor<T>): ForkedTask<T>
/**
* 返回一个 promise,当 `waitFor` 解析或
* 如果监听器被取消或完成时拒绝。
* @param promise
*/
pause<M>(promise: Promise<M>): Promise<M>
extra: ExtraArgument
}

这些可以分为几个类别。

Store 交互方法

  • dispatch: Dispatch:标准的 store.dispatch 方法
  • getState: () => State:标准的 store.getState 方法
  • getOriginalState: () => State:返回动作最初被调度时的存储状态, reducers 运行之前。 (注意:此方法只能在初始调度调用堆栈期间同步调用,以避免内存泄漏。异步调用它将抛出错误。)
  • extra: unknown:作为中间件设置的一部分提供的 "额外参数"(如果有的话)

dispatchgetState 与 thunk 中的完全相同。getOriginalState 可用于比较监听器启动前的原始状态。

extra 可用于在创建时将值(如 API 服务层)注入到中间件中,并在此处可访问。

监听器订阅管理

  • unsubscribe: () => void:从中间件中移除监听器条目,并防止未来的监听器实例运行。(这并 取消任何活动实例。)
  • subscribe: () => void:如果之前已移除监听器条目,将重新订阅它,如果当前已订阅则无操作
  • cancelActiveListeners: () => void:取消此同一监听器的所有其他正在运行的实例 除了 发出此调用的那个。(只有当其他实例使用 take/cancel/pause/delay 等取消感知 API 暂停时,取消才会有实质性的效果 - 请参阅 "使用" 部分的 "取消和任务管理" 以获取更多详细信息)
  • cancel: () => void:取消发出此调用的此监听器实例。
  • throwIfCancelled: () => void:如果当前的监听器实例被取消,就抛出一个 TaskAbortError
  • signal: AbortSignal:一个 AbortSignal,如果监听器执行被中止或完成,其 aborted 属性将被设置为 true

动态地取消订阅和重新订阅此监听器允许更复杂的异步工作流,例如通过在监听器开始时调用 listenerApi.unsubscribe() 来避免重复运行的实例,或者调用 listenerApi.cancelActiveListeners() 以确保只允许最近的实例完成。

条件工作流执行

  • take: (predicate: ListenerPredicate, timeout?: number) => Promise<[Action, State, State] | null>:返回一个 promise,当 predicate 返回 true 时解析。返回值是 predicate 作为参数看到的 [action, currentState, previousState] 组合。如果提供了 timeout 并且首先过期,promise 解析为 null
  • condition: (predicate: ListenerPredicate, timeout?: number) => Promise<boolean>:类似于 take,但如果 predicate 成功则解析为 true,如果提供了 timeout 并且首先过期则解析为 false。这允许异步逻辑暂停并等待某些条件发生后再继续。请参阅下面的 "编写异步工作流" 以获取使用详情。
  • delay: (timeoutMs: number) => Promise<void>:返回一个取消感知的 promise,该 promise 在超时后解析,或者如果在过期前被取消则拒绝。
  • pause: (promise: Promise<T>) => Promise<T>:接受任何 promise,并返回一个取消感知的 promise,该 promise 或者与参数 promise 一起解析,或者如果在解析前被取消则拒绝。

这些方法提供了基于未来调度的动作和状态更改编写条件逻辑的能力。两者都接受一个可选的 timeout(以毫秒为单位)。

take 解析为一个 [action, currentState, previousState] 元组或 null(如果它超时),而 condition 解析为 true(如果它成功)或 false(如果超时)。

take 用于 "等待一个动作并获取其内容",而 condition 用于像 if (await condition(predicate)) 这样的检查。

这两种方法都是取消感知的,并且如果在暂停时取消了监听器实例,将抛出一个 TaskAbortError

请注意,takecondition 只会在调度 下一个动作 后解析。即使他们的谓词会为当前状态返回 true,他们也不会立即解析。

子任务

  • fork: (executor: (forkApi: ForkApi) => T | Promise<T>) => ForkedTask<T>:启动一个 "子任务",可能用于完成额外的工作。接受任何同步或异步函数作为其参数,并返回一个 {result, cancel} 对象,可以用来检查子任务的最终状态和返回值,或者在进行中取消它。

可以启动子任务,并等待收集它们的返回值。提供的 executor 函数将异步地用一个包含 {pause, delay, signal}forkApi 对象调用,允许它暂停或检查取消状态。它也可以使用来自监听器范围的 listenerApi

这可能是一个监听器的例子,该监听器分叉一个包含无限循环的子任务,该循环从服务器监听事件。然后,父任务使用 listenerApi.condition() 等待一个 "stop" 动作,并取消子任务。

任务和结果类型是:

interface ForkedTaskAPI {
pause<W>(waitFor: Promise<W>): Promise<W>
delay(timeoutMs: number): Promise<void>
signal: AbortSignal
}

export type TaskResolved<T> = {
readonly status: 'ok'
readonly value: T
}

export type TaskRejected = {
readonly status: 'rejected'
readonly error: unknown
}

export type TaskCancelled = {
readonly status: 'cancelled'
readonly error: TaskAbortError
}

export type TaskResult<Value> =
| TaskResolved<Value>
| TaskRejected
| TaskCancelled

export interface ForkedTask<T> {
result: Promise<TaskResult<T>>
cancel(): void
}

TypeScript 使用

中间件代码完全是 TS 类型的。然而,startListeningaddListener 函数默认不知道存储的 RootState 类型是什么,所以 getState() 将返回 unknown

为了解决这个问题,中间件提供了定义 "预类型" 版本的这些方法的类型,类似于用于定义预类型的 React-Redux 钩子的模式。我们特别推荐在实际 configureStore() 调用的单独文件中创建中间件实例:

// listenerMiddleware.ts
import { createListenerMiddleware, addListener } from '@reduxjs/toolkit'
import type { RootState, AppDispatch } from './store'

export const listenerMiddleware = createListenerMiddleware()

export const startAppListening = listenerMiddleware.startListening.withTypes<
RootState,
AppDispatch
>()

export const addAppListener = addListener.withTypes<RootState, AppDispatch>()

然后在你的组件中导入和使用这些预类型的方法。

使用指南

总体目标

此中间件允许你在某个操作被调度时运行额外的逻辑,作为像 saga 和 observable 这样的中间件的轻量级替代品,这些中间件既有重的运行时包成本,也有大的概念开销。

这个中间件并不打算处理所有可能的用例。像 thunk 一样,它为你提供了一套基本的原语(包括访问 dispatchgetState 的能力),并给你自由编写任何你想要的同步或异步逻辑。这既是优点(你可以做任何事情!)也是缺点(你可以做任何事情,没有任何防护措施!)。

中间件包含了几个异步工作流原语,足以编写许多 Redux-Saga 效果操作符的等价物,如 takeLatesttakeLeadingdebounce,尽管这些方法都没有直接包含在内。(参见 listener middleware 测试文件中的示例,了解如何编写等价于这些效果的代码。)

标准使用模式

最常见的预期用法是 "在给定操作被调度后运行一些逻辑"。例如,你可以通过查找某些操作并将提取的数据发送到服务器来设置一个简单的分析跟踪器,包括从存储中提取用户详细信息:

listenerMiddleware.startListening({
matcher: isAnyOf(action1, action2, action3),
effect: (action, listenerApi) => {
const user = selectUserDetails(listenerApi.getState())

const { specialData } = action.meta

analyticsApi.trackUsage(action.type, user, specialData)
},
})

然而,predicate 选项也允许在某个状态值发生变化时,或者当状态匹配特定条件时触发逻辑:

listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// 当此字段发生变化时触发逻辑
return currentState.counter.value !== previousState.counter.value
},
effect,
})

listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// 如果此条件为真,则在每个操作后触发逻辑
return currentState.counter.value > 3
},
effect,
})

你也可以实现一个通用的 API 获取能力,其中 UI 调度一个描述要请求的资源类型的普通操作,中间件自动获取它并调度一个结果操作:

listenerMiddleware.startListening({
actionCreator: resourceRequested,
effect: async (action, listenerApi) => {
const { name, args } = action.payload
listenerApi.dispatch(resourceLoading())

const res = await serverApi.fetch(`/api/${name}`, ...args)
listenerApi.dispatch(resourceLoaded(res.data))
},
})

(话虽如此,我们建议对于任何有意义的数据获取行为,使用 RTK Query - 这主要是一个你 可以 在监听器中做什么的例子。)

listenerApi.unsubscribe 方法可以在任何时候使用,并将从处理任何未来操作的监听器中移除。作为一个例子,你可以通过在主体中无条件地调用 unsubscribe() 来创建一个一次性的监听器 - 效果回调将在第一次看到相关操作时运行,然后立即取消订阅,再也不会运行。(中间件实际上在 take/condition 方法中内部使用了这种技术)

编写带有条件的异步工作流

saga 和 observable 的一大优点是它们支持复杂的异步工作流,包括根据特定调度的操作停止和开始行为。然而,缺点是两者都需要掌握一个复杂的 API,有许多独特的操作符(像 saga 的 call()fork() 效果方法,observable 的 RxJS 操作符),并且两者都增加了应用程序包大小的大部分。

虽然监听器中间件 打算完全替代 saga 或 observable,但它确实提供了一个精心选择的 API 集,以实现长时间运行的异步工作流。

监听器可以使用 listenerApi 中的 conditiontake 方法,等待某个操作被调度或满足状态检查。condition 方法直接受到 Temporal.io 的工作流 API 中的 condition 函数 的启发(感谢 @swyx 的建议!),take 受到 Redux-Saga 中的 take 效果 的启发。

签名是:

type ConditionFunction<Action extends ReduxAction, State> = (
predicate: ListenerPredicate<Action, State> | (() => boolean),
timeout?: number,
) => Promise<boolean>

type TakeFunction<Action extends ReduxAction, State> = (
predicate: ListenerPredicate<Action, State> | (() => boolean),
timeout?: number,
) => Promise<[Action, State, State] | null>

你可以使用 await condition(somePredicate) 作为一种暂停执行你的监听器回调,直到满足某些条件的方式。

predicate 将在每个操作被 reducer 处理后被调用,并应在条件应解决时返回 true。(它实际上是一个一次性的监听器。)如果提供了一个 timeout 数字(以毫秒为单位),那么如果 predicate 先返回,promise 将解析为 true,如果超时到期,将解析为 false。这允许你编写像 if (await condition(predicate, timeout)) 这样的比较。

这应该使编写更长时间运行的工作流和更复杂的异步逻辑成为可能,例如 Redux-Saga 中的 "可取消计数器" 示例

来自测试套件的 condition 使用示例:

test('condition 方法在有超时时解析 promise', async () => {
let finalCount = 0
let listenerStarted = false

listenerMiddleware.startListening({
predicate: (action, currentState: CounterState) => {
return increment.match(action) && currentState.value === 0
},
effect: async (action, listenerApi) => {
listenerStarted = true
// 等待计数器达到 3,或者等待 50ms
const result = await listenerApi.condition(
(action, currentState: CounterState) => {
return currentState.value === 3
},
50,
)

// 在这个测试中,我们期望首先发生超时
expect(result).toBe(false)
// 保存状态以便在监听器外部进行比较
const latestState = listenerApi.getState()
finalCount = latestState.value
},
})

store.dispatch(increment())
// 监听器应该立即开始
expect(listenerStarted).toBe(true)

store.dispatch(increment())

// 如果我们等待 150ms,条件超时将首先到期
await delay(150)
// 再次更新状态以确认监听器没有检查它
store.dispatch(increment())

// 在延迟之前处理了状态更新,但在之后没有处理
expect(finalCount).toBe(2)
})

取消和任务管理

监听器中间件支持取消正在运行的监听器实例、take/condition/pause/delay 函数以及"子任务",其实现基于 AbortController

listenerApi.pause/delay() 函数提供了一种取消感知的方式来让当前的监听器休眠。pause() 接受一个 promise,而 delay 接受一个超时值。如果在等待时监听器被取消,将会抛出一个 TaskAbortError。此外,takecondition 也支持取消中断。

listenerApi.cancelActiveListeners() 将取消正在运行的 其他 实例,而 listenerApi.cancel() 可以用来取消 当前 实例(这可能在一个 fork 中很有用,它可能深度嵌套并且无法直接抛出一个 promise 来跳出效果执行)。listenerAPi.throwIfCancelled() 也可能在取消发生时效果正在做其他工作的情况下退出工作流程。

listenerApi.fork() 可以用来启动可以做额外工作的"子任务"。这些可以被等待以收集他们的结果。一个例子可能看起来像这样:

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
// 生成一个子任务并立即启动它
const task = listenerApi.fork(async (forkApi) => {
// 在子任务内部人为地等待一会儿
await forkApi.delay(5)
// 通过返回一个值来完成子任务
return 42
})

const result = await task.result
// 在监听器中解包子任务的结果
if (result.status === 'ok') {
// 记录返回的结果值 `42`
console.log('Child succeeded: ', result.value)
}
},
})

复杂的异步工作流

提供的异步工作流原语(cancelActiveListenerscancelunsubscribesubscribetakeconditionpausedelay)可以用来实现等效于 Redux-Saga 库中找到的许多更复杂的异步工作流能力的行为。这包括像 throttledebouncetakeLatesttakeLeadingfork/join 这样的效果。一些来自测试套件的例子:

test('debounce / takeLatest', async () => {
// 重复调用会取消前一个,没有工作会执行
// 直到指定的延迟过去没有其他调用
// 注意:这也基本上等同于 `takeLatest`。
// 参考:https://redux-saga.js.org/docs/api#debouncems-pattern-saga-args
// 参考:https://redux-saga.js.org/docs/api#takelatestpattern-saga-args

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
// 取消此监听器的任何进行中的实例
listenerApi.cancelActiveListeners()

// 在开始实际工作之前延迟
await listenerApi.delay(15)

// 在这里做工作
},
})
}

test('takeLeading', async () => {
// 在第一个动作上启动监听器,忽略其他的直到任务完成
// 参考:https://redux-saga.js.org/docs/api#takeleadingpattern-saga-args

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
listenerCalls++

// 停止监听此动作
listenerApi.unsubscribe()

// 假装我们正在做昂贵的工作

// 重新启用监听器
listenerApi.subscribe()
},
})
})

test('cancelled', async () => {
// cancelled 允许检查当前任务是否被取消
// 参考:https://redux-saga.js.org/docs/api#cancelled

let canceledAndCaught = false
let canceledCheck = false

// 条件取消先前实例并检查取消的示例
listenerMiddleware.startListening({
matcher: isAnyOf(increment, decrement, incrementByAmount),
effect: async (action, listenerApi) => {
if (increment.match(action)) {
// 让这个分支等待被其他的取消
try {
await listenerApi.delay(10)
} catch (err) {
// 可以根据异常及其原因检查取消
if (err instanceof TaskAbortError) {
canceledAndCaught = true
}
}
} else if (incrementByAmount.match(action)) {
// 做一个不感知取消的等待
await delay(15)
if (listenerApi.signal.aborted) {
canceledCheck = true
}
} else if (decrement.match(action)) {
listenerApi.cancelActiveListeners()
}
},
})
})

作为一个更实际的例子:这个基于 saga 的"长轮询"循环 重复地向服务器请求消息,然后处理每个响应。当一个 "开始轮询" 动作被派发时,子循环被按需启动,当一个 "停止轮询" 动作被派发时,循环被取消。

这种方法可以通过监听器中间件来实现:

// 跟踪每个消息被循环处理的次数
const receivedMessages = {
a: 0,
b: 0,
c: 0,
}

const eventPollingStarted = createAction('serverPolling/started')
const eventPollingStopped = createAction('serverPolling/stopped')

listenerMiddleware.startListening({
actionCreator: eventPollingStarted,
effect: async (action, listenerApi) => {
// 一次只允许一个实例的这个监听器运行
listenerApi.unsubscribe()

// 启动一个将无限循环接收消息的子任务
const pollingTask = listenerApi.fork(async (forkApi) => {
try {
while (true) {
// 对新的服务器消息进行取消感知的暂停
const serverEvent = await forkApi.pause(pollForEvent())
// 处理消息。在这种情况下,只是计算我们已经看到这个消息的次数。
if (serverEvent.type in receivedMessages) {
receivedMessages[
serverEvent.type as keyof typeof receivedMessages
]++
}
}
} catch (err) {
if (err instanceof TaskAbortError) {
// 可以在这里做一些事情来跟踪任务被取消
}
}
})

// 等待 "停止轮询" 动作
await listenerApi.condition(eventPollingStopped.match)
pollingTask.cancel()
},
})

在组件中添加监听器

可以通过 dispatch(addListener()) 在运行时添加监听器。这意味着你可以在任何可以访问 dispatch 的地方添加监听器,包括 React 组件。

由于分发 addListener 会返回一个 unsubscribe 回调,这自然对应到 React useEffect 钩子的行为,它允许你返回一个清理函数。你可以在效果中添加一个监听器,并在钩子清理时删除监听器。

基本模式可能看起来像这样:

useEffect(() => {
// 也可以直接 `return dispatch(addListener())`,但是为了清楚地说明发生了什么,这里将其显示为一个单独的变量
const unsubscribe = dispatch(
addListener({
actionCreator: todoAdded,
effect: (action, listenerApi) => {
// 在这里做一些有用的逻辑
},
}),
)
return unsubscribe
}, [])

虽然这种模式是 可能的,但我们并不一定 推荐 这样做!React 和 Redux 社区一直试图强调尽可能基于 状态 的行为。让 React 组件直接绑定到 Redux 动作分发管道可能会导致代码库更难维护。

同时,这 一种有效的技术,无论是在 API 行为还是潜在用例方面。在代码分割的应用中懒加载 sagas 是常见的,这通常需要一些复杂的额外设置工作来 "注入" sagas。相比之下,dispatch(addListener()) 自然地融入到 React 组件的生命周期中。

所以,虽然我们并不特别鼓励使用这种模式,但值得在这里记录下来,以便用户了解它作为一种可能性。

在文件中组织监听器

作为起点,最好在一个单独的文件中创建监听器中间件,例如 app/listenerMiddleware.ts,而不是在与存储相同的文件中。这避免了其他文件试图导入 middleware.addListener 时可能出现的循环导入问题。

从那里开始,到目前为止,我们已经想出了三种不同的方式来组织监听器函数和设置。

首先,你可以从切片文件中导入效果回调到中间件文件,并添加监听器:

app/listenerMiddleware.ts
import { action1, listener1 } from '../features/feature1/feature1Slice'
import { action2, listener2 } from '../features/feature2/feature2Slice'

listenerMiddleware.startListening({ actionCreator: action1, effect: listener1 })
listenerMiddleware.startListening({ actionCreator: action2, effect: listener2 })

这可能是最简单的选项,并且反映了如何将所有切片的 reducer 汇集到一起创建应用程序的存储设置。

第二个选项是相反的:让切片文件导入中间件并直接添加它们的监听器:

features/feature1/feature1Slice.ts
import { listenerMiddleware } from '../../app/listenerMiddleware'

const feature1Slice = createSlice(/* */)
const { action1 } = feature1Slice.actions

export default feature1Slice.reducer

listenerMiddleware.startListening({
actionCreator: action1,
effect: () => {},
})

这将所有逻辑保留在切片中,尽管它确实将设置锁定到一个单独的中间件实例中。

第三个选项是在切片中创建一个设置函数,但让监听器文件在启动时调用它:

features/feature1/feature1Slice.ts
import type { AppStartListening } from '../../app/listenerMiddleware'

const feature1Slice = createSlice(/* */)
const { action1 } = feature1Slice.actions

export default feature1Slice.reducer

export const addFeature1Listeners = (startListening: AppStartListening) => {
startListening({
actionCreator: action1,
effect: () => {},
})
}
app/listenerMiddleware.ts
import { addFeature1Listeners } from '../features/feature1/feature1Slice'

addFeature1Listeners(listenerMiddleware.startListening)

请随意使用在你的应用中最有效的这些方法。