使用 Channels
到目前为止,我们已经使用了 take
和 put
效果来与 Redux Store 进行通信。Channels 将这些效果推广到与外部事件源或 Saga 之间的通信。它们也可以用来从 Store 队列特定的操作。
在这一节中,我们将看到:
-
如何使用
yield actionChannel
效果从 Store 缓冲特定的操作。 -
如何使用
eventChannel
工厂函数将take
效果连接到外部事件源。 -
如何使用通用的
channel
工厂函数创建一个 channel,并在take
/put
效果中使用它来在两个 Saga 之间进行通信。
使用 actionChannel
效果
让我们回顾一下典型的例子:
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
上面的 例子说明了典型的 watch-and-fork 模式。watchRequests
saga 使用 fork
来避免阻塞,从而不会错过来自 store 的任何操作。每一个 REQUEST
操作都会创建一个 handleRequest
任务。所以,如果有很多操作在短时间内被触发,那么可能会有很多 handleRequest
任务同时执行。
现在假设我们的需求是这样的:我们想要串行处理 REQUEST
。如果我们在任何时刻有四个操作,我们想要处理第一个 REQUEST
操作,然后只有在完成这个操作之后我们才处理第二个操作,依此类推...
所以我们想要 队列 所有未处理的操作,一旦我们处理完当前的请求,我们就从队列中获取下一个消息。
Redux-Saga 提供了一个小的 helper 效果 actionChannel
,它可以为我们处理这个问题。让我们看看我们如何用它来重写前面的例子:
import { take, actionChannel, call, ... } from 'redux-saga/effects'
function* watchRequests() {
// 1- 为请求操作创建一个 channel
const requestChan = yield actionChannel('REQUEST')
while (true) {
// 2- 从 channel 中取出
const {payload} = yield take(requestChan)
// 3- 注意我们正在使用一个阻塞调用
yield call(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
首先要创建 action channel。我们使用 yield actionChannel(pattern)
,其中 pattern 是按照我们之前提到的 take(pattern)
的规则解释的。两种形式的区别在于 actionChannel
可以缓冲传入的消息,如果 Saga 还没有准备好接收它们(例如,被 API 调用阻塞)。
接下来是 yield take(requestChan)
。除了与 pattern
一起使用以从 Redux Store 中获取特定操作外,take
也可以与 channels 一起使用(上面我们从特定的 Redux 操作中创建了一个 channel 对象)。take
将阻塞 Saga,直到 channel 上有消息可用。如果底层缓冲区中存储了消息,take 也可能立即恢复。
需要注意的重要一点是我们如何使用阻塞的 call
。Saga 将保持阻塞状态,直到 call(handleRequest)
返回。但与此同时,如果在 Saga 仍然被阻塞时分派了其他的 REQUEST
操作,它们将由 requestChan
内部排队。当 Saga 从 call(handleRequest)
恢复并执行下一个 yield take(requestChan)
时,take 将解析为队列中的消息。
默认情况下,actionChannel
会无限制地缓冲所有传入的消息。如果你想对缓冲有更多的控制,你可以向效果创建器提供一个 Buffer 参数。Redux-Saga 提供了一些常见的缓冲(无,丢弃,滑动),但你也可以提供你自己的缓冲实现。查看 API 文档 了解更多详情。
例如,如果你只想处理最近的五个项目,你可以使用:
import { buffers } from 'redux-saga'
import { actionChannel } from 'redux-saga/effects'
function* watchRequests() {
const requestChan = yield actionChannel('REQUEST', buffers.sliding(5))
...
}
使用 eventChannel
工厂连接到外部事件
像 actionChannel
(Effect) 一样,eventChannel
(一个工厂函数,不是 Effect) 创建了一个用于事件的 Channel,但是事件源不是 Redux Store。
这个基本示例从一个间隔创建一个 Channel:
import { eventChannel, END } from 'redux-saga'
function countdown(secs) {
return eventChannel(emitter => {
const iv = setInterval(() => {
secs -= 1
if (secs > 0) {
emitter(secs)
} else {
// 这会导致 channel 关闭
emitter(END)
}
}, 1000);
// 订阅者必须返回一个取消订阅的函数
return () => {
clearInterval(iv)
}
}
)
}
eventChannel
的第一个参数是一个 订阅者 函数。订阅者的角色是初始化外部事件源(上面使用 setInterval
),然后通过调用提供的 emitter
将所有来自源的事件路由到 channel。在上面的例子中,我们每秒都在调用 emitter
。
注意:你需要清理你的事件源,以免通过事件 channel 传递 null 或 undefined。虽然传递数字是可以的,但我们建议你像组织你的 redux actions 一样组织你的事件 channel 数据。选择
{ number }
而不是number
。
还要注意 emitter(END)
的调用。我们使用它来通知任何 channel 消费者,channel 已经关闭,意味着不会有其他消息通过这个 channel。
让我们看看我们如何在我们的 Saga 中使用这个 channel。(这是从仓库中的 cancellable-counter 示例中取出的。)
import { take, put, call } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// 从秒间隔创建一个事件 Channel
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
// take(END) 将导致 saga 通过跳转到 finally 块来终止
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
console.log('countdown terminated')
}
}
所以 Saga 是在 take(chan)
。这导致 Saga 阻塞,直到在 channel 上放置一条消息。在我们上面的例子中,它对应于我们调用 emitter(secs)
时。还要注意我们在 try/finally
块内执行整个 while (true) {...}
循环。当间隔终止时,countdown 函数通 过调用 emitter(END)
关闭事件 channel。关闭一个 channel 会导致所有在该 channel 上的 take
阻塞的 Sagas 终止。在我们的例子中,终止 Saga 将导致它跳转到其 finally
块(如果提供了,否则 Saga 终止)。
订阅者返回一个 unsubscribe
函数。这是 channel 在事件源完成之前取消订阅的方式。在一个从事件 channel 消费消息的 Saga 中,如果我们想在事件源完成之前提前退出(例如,Saga 已被取消),你可以调用 chan.close()
来关闭 channel 并从源取消订阅。
例如,我们可以让我们的 Saga 支持取消:
import { take, put, call, cancelled } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// 从秒间隔创建一个事件 Channel
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
if (yield cancelled()) {
chan.close()
console.log('countdown cancelled')
}
}
}
这是另一个示例,展示了如何使用事件 channels 将 WebSocket 事件传入你的 saga(例如:使用 socket.io 库)。
假设你正在等待服务器消息 ping
,然后在一些延迟后回复一个 pong
消息。
import { take, put, call, apply, delay } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'
// 这个函数从给定的 socket 创建一个事件通道
// 设置订阅到传入的 `ping` 事件
function createSocketChannel(socket) {
// `eventChannel` 接受一个订阅者函数
// 订阅者函数接受一个 `emit` 参数来将消息放入通道
return eventChannel(emit => {
const pingHandler = (event) => {
// 将事件负载放入通道
// 这允许 Saga 从返回的通道中获取此负载
emit(event.payload)
}
const errorHandler = (errorEvent) => {
// 创建一个 Error 对象并将其放入通道
emit(new Error(errorEvent.reason))
}
// 设置订阅
socket.on('ping', pingHandler)
socket.on('error', errorHandler)
// 订阅者必须返回一个取消订阅的函数
// 当 saga 调用 `channel.close` 方法时,将调用此函数
const unsubscribe = () => {
socket.off('ping', pingHandler)
}
return unsubscribe
})
}
// 通过调用 `socket.emit('pong')` 回复一个 `pong` 消息
function* pong(socket) {
yield delay(5000)
yield apply(socket, socket.emit, ['pong']) // 以 `socket` 为上下文调用 `emit` 方法
}
export function* watchOnPings() {
const socket = yield call(createWebSocketConnection)
const socketChannel = yield call(createSocketChannel, socket)
while (true) {
try {
// socketChannel 的错误会导致 saga 跳到 catch 块
const payload = yield take(socketChannel)
yield put({ type: INCOMING_PONG_PAYLOAD, payload })
yield fork(pong, socket)
} catch(err) {
console.error('socket error:', err)
// 在 catch 块中,socketChannel 仍然是打开的
// 如果我们想结束 socketChannel,我们需要显式地关闭它
// socketChannel.close()
}
}
}