跳到主要内容

 

流式更新

概述

RTK Query 为您提供了接收持久查询的流式更新的能力。这使得查询能够建立与服务器的持续连接(通常使用 WebSockets),并在从服务器接收到额外信息时更新缓存的数据。

流式更新可以用来使 API 实时接收后端数据的更新,例如创建新条目,或者更新重要属性。

要为查询启用流式更新,将异步函数 onCacheEntryAdded 传递给查询,包括当接收到流式数据时如何更新查询的逻辑。请参阅 onCacheEntryAdded API 参考以获取更多详情。

何时使用流式更新

主要的查询数据更新应通过轮询在一定间隔内间歇性地进行,使用缓存失效根据与查询和突变相关联的标签使数据失效,或者使用refetchOnMountOrArgChange在使用数据的组件挂载时获取新数据。

然而,流式更新对于以下情况特别有用:

  • 大对象的小的、频繁的更改。而不是反复轮询一个大对象,可以使用初始查询获取对象,然后流式更新可以在接收到更新时更新单个属性。
  • 外部事件驱动的更新。在数据可能被服务器或其他外部用户更改,且预期实时更新将显示给活动用户的情况下,仅轮询会导致查询之间的数据过时,导致状态容易脱节。流式更新可以在更新发生时更新所有活动客户端,而不是等待下一个间隔过去。

从流式更新中受益的示例用例包括:

  • GraphQL 订阅
  • 实时聊天应用
  • 实时多人游戏
  • 多个并发用户的协作文档编辑

使用 onCacheEntryAdded 生命周期

onCacheEntryAdded 生命周期回调让您可以编写将在新的缓存条目添加到 RTK Query 缓存后(即,组件创建了给定端点+参数组合的新订阅后)执行的任意异步逻辑。

onCacheEntryAdded 将被调用两个参数:传递给订阅的 arg,以及包含 "生命周期承诺" 和实用函数的选项对象。您可以使用这些来编写等待数据被添加、启动服务器连接、应用部分更新,并在查询订阅被移除时清理连接的顺序逻辑。

通常,您将 await cacheDataLoaded 以确定何时获取了第一批数据,然后使用 updateCacheData 实用程序在接收到消息时应用流式更新。updateCacheData 是一个由 Immer 驱动的回调,它接收当前缓存值的 draft。您可以 "变更" 草稿值以根据接收到的值进行需要的更新。然后,RTK Query 将调度一个基于这些更改的差异化补丁的动作。

最后,您可以 await cacheEntryRemoved 以知道何时清理任何服务器连接。

流式更新示例

Websocket 聊天 API

// 文件:schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
// 在真实代码中,这将检查 `message` 以确保它是一个 `Message`
return true
}

// 文件:api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved },
) {
// 当缓存订阅开始时创建一个 websocket 连接
const ws = new WebSocket('ws://localhost:8080')
try {
// 在继续之前等待初始查询解析
await cacheDataLoaded

// 当从服务器到套接字连接接收到数据时,
// 如果它是一个消息并且是适当的频道,
// 使用接收到的消息更新我们的查询结果
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
draft.push(data)
})
}

ws.addEventListener('message', listener)
} catch {
// 如果 `cacheEntryRemoved` 在 `cacheDataLoaded` 之前解析,
// 那么 `cacheDataLoaded` 将抛出,此时无操作
}
// 当缓存订阅不再活动时,cacheEntryRemoved 将解析
await cacheEntryRemoved
// 一旦 `cacheEntryRemoved` 承诺解析,执行清理步骤
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

预期结果

当触发 getMessages 查询(例如,通过使用 useGetMessagesQuery() 钩子挂载组件时),将根据端点的序列化参数添加一个 缓存条目。关联的查询将基于 query 属性触发,以获取缓存的初始数据。同时,异步的 onCacheEntryAdded 回调将开始,并创建一个新的 WebSocket 连接。一旦收到初始查询的响应,缓存将用响应数据填充,cacheDataLoaded promise 将解决。在等待 cacheDataLoaded promise 后,将向 WebSocket 连接添加 message 事件监听器,当收到关联消息时更新缓存数据。

当没有更多的活动订阅数据时(例如,当订阅的组件保持卸载足够长的时间),cacheEntryRemoved promise 将解决,允许剩余的代码运行并关闭 websocket 连接。RTK Query 也将从缓存中删除关联的数据。

如果稍后运行对应缓存条目的查询,它将覆盖整个缓存条目,流更新监听器将继续在更新的数据上工作。

带有转换响应形状的 Websocket 聊天 API

// 文件:schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
// 在实际代码中,这将检查 `message` 以确保它是一个 `Message`

return true
}

// 文件:api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

const messagesAdapter = createEntityAdapter<Message>()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<EntityState<Message, number>, Channel>({
query: (channel) => `messages/${channel}`,
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response,
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved },
) {
const ws = new WebSocket('ws://localhost:8080')
try {
await cacheDataLoaded

const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}

ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

此示例演示了如何修改上一个示例,以允许在将数据添加到缓存时转换响应形状。

例如,数据从这种形状转换为:

[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]

变为这种形状:

{
// 每个项目的唯一 ID。必须是字符串或数字
ids: [0, 1],
// 一个查找表,将实体 ID 映射到对应的实体对象
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};

需要记住的关键点是,onCacheEntryAdded 回调中对缓存数据的更新必须尊重将存在于缓存数据的转换数据形状。示例显示了如何使用 createEntityAdapter 进行初始的 transformResponse,以及在接收到流更新时如何将接收到的项目插入到缓存数据中,同时保持规范化的状态结构。