-
-
Notifications
You must be signed in to change notification settings - Fork 311
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: event missing after redis reconnected (#674)
* chore: add automationGap config * fix: event missing after redis reconnected * fix: pnpm lock
- Loading branch information
1 parent
0735c1c
commit d6dc4fc
Showing
7 changed files
with
94 additions
and
89 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* eslint-disable @typescript-eslint/naming-convention */ | ||
import Redis from 'ioredis'; | ||
import type { RedisOptions } from 'ioredis'; | ||
import type { Error } from 'sharedb'; | ||
import { PubSub } from 'sharedb'; | ||
|
||
const PUBLISH_SCRIPT = 'for i = 2, #ARGV do ' + 'redis.call("publish", ARGV[i], ARGV[1]) ' + 'end'; | ||
|
||
// Redis pubsub driver for ShareDB. | ||
// | ||
// The redis driver requires two redis clients (a single redis client can't do | ||
// both pubsub and normal messaging). These clients will be created | ||
// automatically if you don't provide them. | ||
export class RedisPubSub extends PubSub { | ||
client: Redis; | ||
observer: Redis; | ||
_closing?: boolean; | ||
|
||
constructor(options: RedisOptions & { prefix?: string } = {}) { | ||
super(options); | ||
|
||
this.client = new Redis(options); | ||
|
||
// Redis doesn't allow the same connection to both listen to channels and do | ||
// operations. Make an extra redis connection for subscribing with the same | ||
// options if not provided | ||
this.observer = new Redis(options); | ||
this.observer.on('message', this.handleMessage.bind(this)); | ||
} | ||
|
||
close( | ||
callback = function (err: Error | null) { | ||
if (err) throw err; | ||
} | ||
): void { | ||
PubSub.prototype.close.call(this, (err) => { | ||
if (err) return callback(err); | ||
this._close().then(function () { | ||
callback(null); | ||
}, callback); | ||
}); | ||
} | ||
|
||
async _close() { | ||
if (this._closing) { | ||
return; | ||
} | ||
this._closing = true; | ||
this.observer.removeAllListeners(); | ||
await Promise.all([this.client.quit(), this.observer.quit()]); | ||
} | ||
|
||
_subscribe(channel: string, callback: (err: Error | null) => void): void { | ||
this.observer.subscribe(channel).then(function () { | ||
callback(null); | ||
}, callback); | ||
} | ||
|
||
handleMessage(channel: string, message: string) { | ||
this._emit(channel, JSON.parse(message)); | ||
} | ||
|
||
_unsubscribe(channel: string, callback: (err: Error | null) => void): void { | ||
this.observer.unsubscribe(channel).then(function () { | ||
callback(null); | ||
}, callback); | ||
} | ||
|
||
async _publish(channels: string[], data: unknown, callback: (err: Error | null) => void) { | ||
const message = JSON.stringify(data); | ||
const args = [message].concat(channels); | ||
this.client.eval(PUBLISH_SCRIPT, 0, ...args).then(function () { | ||
callback(null); | ||
}, callback); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.