Skip to content

Commit

Permalink
fix: provider sync state (TexteaInc#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
exuanbo committed Sep 2, 2022
1 parent ac11e25 commit e57c355
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"socket.io": "^4.5.1",
"socket.io-client": "^4.5.1",
"use-sync-external-store": "^1.2.0",
"uuid": "^8.3.2",
"y-protocols": "^1.0.5",
"zustand": "^4.1.1"
},
Expand All @@ -43,6 +44,7 @@
"@types/express": "^4.17.13",
"@types/node": "^18.7.13",
"@types/use-sync-external-store": "^0.0.3",
"@types/uuid": "^8.3.4",
"@types/web": "^0.0.72",
"@typescript-eslint/eslint-plugin": "^5.33.0",
"@typescript-eslint/parser": "^5.33.0",
Expand Down
61 changes: 42 additions & 19 deletions src/provider.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { io, Socket } from 'socket.io-client'
import { v4 as uuid } from 'uuid'
import { applyAwarenessUpdate, Awareness, encodeAwarenessUpdate, removeAwarenessStates } from 'y-protocols/awareness'
import * as Y from 'yjs'
import { createStore, Mutate, StoreApi } from 'zustand'
Expand Down Expand Up @@ -71,6 +72,9 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
autoConnectBroadcastChannel = true
} = {}
) => {
type DocUpdateId = string
const syncingDocUpdates = new Set<DocUpdateId>()

const store = createStore<SocketIOProviderState>()(
subscribeWithSelector(() => ({
...INITIAL_STATE,
Expand All @@ -92,6 +96,11 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
socket.emit('join', roomName)
const docDiff = Y.encodeStateVector(doc)
socket.emit('doc:diff', roomName, docDiff)
socket.once('doc:update', () => {
if (!syncingDocUpdates.size) {
store.setState({ synced: true })
}
})
if (awareness.getLocalState() !== null) {
const awarenessUpdate = encodeAwarenessUpdate(awareness, [doc.clientID])
socket.emit('awareness:update', roomName, awarenessUpdate)
Expand All @@ -108,33 +117,32 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
})
socket.on('doc:update', (updateV2) => {
Y.applyUpdateV2(doc, new Uint8Array(updateV2), socket)
store.setState({ synced: true })
})
socket.on('awareness:update', (update) => {
applyAwarenessUpdate(awareness, new Uint8Array(update), socket)
})
socket.on('disconnect', (_reason, description) => {
syncingDocUpdates.clear()
const clients = [...awareness.getStates().keys()].filter(
(clientId) => clientId !== doc.clientID
)
removeAwarenessStates(awareness, clients, socket)
const err = description instanceof Error ? description.message : null
store.setState({
connecting: false,
connected: false,
...INITIAL_STATE,
error: err
})
})

let broadcastChannel: TypedBroadcastChannel | undefined
const broadcastChannelName = new URL(roomName, serverUrl).toString()
const broadcastChannel: TypedBroadcastChannel = new BroadcastChannel(broadcastChannelName)
const handleBroadcastChannelMessage = (event: BroadcastChannelMessageEvent) => {
const [eventName] = event.data
switch (eventName) {
case 'doc:diff': {
const [, diff, clientId] = event.data
const updateV2 = Y.encodeStateAsUpdateV2(doc, diff)
broadcastChannel.postMessage(['doc:update', updateV2, clientId])
broadcastChannel!.postMessage(['doc:update', updateV2, clientId])
break
}
case 'doc:update': {
Expand All @@ -148,7 +156,7 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
const [, clientId] = event.data
const clients = [...awareness.getStates().keys()]
const update = encodeAwarenessUpdate(awareness, clients)
broadcastChannel.postMessage(['awareness:update', update, clientId])
broadcastChannel!.postMessage(['awareness:update', update, clientId])
break
}
case 'awareness:update': {
Expand All @@ -161,9 +169,10 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
}
}
const connectBroadcastChannel = () => {
if (broadcastChannel.onmessage !== null) {
if (broadcastChannel) {
return
}
broadcastChannel = new BroadcastChannel(broadcastChannelName)
broadcastChannel.onmessage = handleBroadcastChannelMessage
const docDiff = Y.encodeStateVector(doc)
broadcastChannel.postMessage(['doc:diff', docDiff, doc.clientID])
Expand All @@ -176,31 +185,45 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
}
}
const disconnectBroadcastChannel = () => {
broadcastChannel.onmessage = null
if (broadcastChannel) {
broadcastChannel.close()
broadcastChannel = undefined
}
}
if (autoConnectBroadcastChannel) {
connectBroadcastChannel()
}

const shouldSyncUpdate = () => socket.connected || broadcastChannel

const handleDocUpdate = (updateV1: Uint8Array, origin: null | Socket) => {
if (origin !== socket) {
const updateV2 = Y.convertUpdateFormatV1ToV2(updateV1)
if (origin === socket || !shouldSyncUpdate()) {
return
}
const updateV2 = Y.convertUpdateFormatV1ToV2(updateV1)
if (socket.connected) {
const updateId = uuid()
syncingDocUpdates.add(updateId)
store.setState({ synced: false })
socket.emit('doc:update', roomName, updateV2, () => {
store.setState({ synced: true })
syncingDocUpdates.delete(updateId)
if (!syncingDocUpdates.size) {
store.setState({ synced: true })
}
})
store.setState({ synced: false })
broadcastChannel.postMessage(['doc:update', updateV2])
}
broadcastChannel?.postMessage(['doc:update', updateV2])
}
doc.on('update', handleDocUpdate)

const handleAwarenessUpdate = (changes: AwarenessChanges, origin: string | Socket) => {
if (origin !== socket) {
const changedClients = Object.values(changes).reduce((res, cur) => [...res, ...cur])
const update = encodeAwarenessUpdate(awareness, changedClients)
socket.emit('awareness:update', roomName, update)
broadcastChannel.postMessage(['awareness:update', update])
if (origin === socket || !shouldSyncUpdate()) {
return
}
const changedClients = Object.values(changes).reduce((res, cur) => [...res, ...cur])
const update = encodeAwarenessUpdate(awareness, changedClients)
socket.volatile.emit('awareness:update', roomName, update)
broadcastChannel?.postMessage(['awareness:update', update])
}
awareness.on('update', handleAwarenessUpdate)

Expand All @@ -225,7 +248,7 @@ export const createSocketIOProvider: CreateSocketIOProvider = (
destroy: () => {
store.destroy()
socket.disconnect()
broadcastChannel.close()
broadcastChannel?.close()
doc.off('update', handleDocUpdate)
awareness.off('update', handleAwarenessUpdate)
}
Expand Down
18 changes: 18 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,13 @@ __metadata:
languageName: node
linkType: hard

"@types/uuid@npm:^8.3.4":
version: 8.3.4
resolution: "@types/uuid@npm:8.3.4"
checksum: 6f11f3ff70f30210edaa8071422d405e9c1d4e53abbe50fdce365150d3c698fe7bbff65c1e71ae080cbfb8fded860dbb5e174da96fdbbdfcaa3fb3daa474d20f
languageName: node
linkType: hard

"@types/web@npm:^0.0.72":
version: 0.0.72
resolution: "@types/web@npm:0.0.72"
Expand Down Expand Up @@ -6656,6 +6663,15 @@ __metadata:
languageName: node
linkType: hard

"uuid@npm:^8.3.2":
version: 8.3.2
resolution: "uuid@npm:8.3.2"
bin:
uuid: dist/bin/uuid
checksum: 5575a8a75c13120e2f10e6ddc801b2c7ed7d8f3c8ac22c7ed0c7b2ba6383ec0abda88c905085d630e251719e0777045ae3236f04c812184b7c765f63a70e58df
languageName: node
linkType: hard

"v8-compile-cache-lib@npm:^3.0.1":
version: 3.0.1
resolution: "v8-compile-cache-lib@npm:3.0.1"
Expand Down Expand Up @@ -6916,6 +6932,7 @@ __metadata:
"@types/express": ^4.17.13
"@types/node": ^18.7.13
"@types/use-sync-external-store": ^0.0.3
"@types/uuid": ^8.3.4
"@types/web": ^0.0.72
"@typescript-eslint/eslint-plugin": ^5.33.0
"@typescript-eslint/parser": ^5.33.0
Expand All @@ -6942,6 +6959,7 @@ __metadata:
ts-node: ^10.9.1
typescript: ^4.7.4
use-sync-external-store: ^1.2.0
uuid: ^8.3.2
vite: ^3.0.9
vitest: ^0.22.1
y-protocols: ^1.0.5
Expand Down

0 comments on commit e57c355

Please sign in to comment.