Skip to content

Commit

Permalink
Moved the broker login to another file
Browse files Browse the repository at this point in the history
  • Loading branch information
stliakis committed Aug 26, 2022
1 parent cc39042 commit dec53f5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 49 deletions.
51 changes: 51 additions & 0 deletions app/src/broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import Redis from "ioredis";

abstract class Broker {
abstract onMessage(callback: Function): void;

abstract publishMessage(topic: string | null, message: any): void;

abstract quit(): void
}

class RedisBroker extends Broker {
redis: Redis
channel: string
hasSubscribed: boolean = false

constructor(connectionString: string, channel: string) {
super();
this.channel = channel
this.redis = new Redis({
host: connectionString.split(":")[0],
port: parseInt(connectionString.split(":")[1]),
});
}

publishMessage(topic: string, message: object) {
this.redis.publish(this.channel, JSON.stringify({
topic,
payload: message
}));
}

onMessage(callback: Function) {
if (!this.hasSubscribed) {
this.redis.subscribe(this.channel);
}
this.redis.on("message", (channel, message) => {
const parsedMessage = JSON.parse(message)
const {topic, payload} = parsedMessage;
callback(topic, payload);
}
);
}

quit() {
this.redis.quit()
}
}

export {
RedisBroker, Broker
}
50 changes: 2 additions & 48 deletions app/src/pubsuby.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,5 @@
import Redis from "ioredis";
import {Broker} from "./broker";

abstract class Broker {
abstract onMessage(callback: Function): void;

abstract publishMessage(topic: string | null, message: any): void;

abstract quit(): void
}

class RedisBroker extends Broker {
redis: Redis
channel: string
hasSubscribed: boolean = false

constructor(connectionString: string, channel: string) {
super();
this.channel = channel
this.redis = new Redis({
host: connectionString.split(":")[0],
port: parseInt(connectionString.split(":")[1]),
});
}

publishMessage(topic: string, message: object) {
this.redis.publish(this.channel, JSON.stringify({
topic,
payload: message
}));
}

onMessage(callback: Function) {
if (!this.hasSubscribed) {
this.redis.subscribe(this.channel);
}
this.redis.on("message", (channel, message) => {
const parsedMessage = JSON.parse(message)
const {topic, payload} = parsedMessage;
callback(topic, payload);
}
);
}

quit() {
this.redis.quit()
}
}

class Pubsuby {
broker: Broker;
Expand Down Expand Up @@ -80,6 +35,5 @@ class PubsubyConfig {

export {
Pubsuby,
PubsubyConfig,
RedisBroker
PubsubyConfig
}
3 changes: 2 additions & 1 deletion app/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import express from "express";
import {Pubsuby, RedisBroker} from "./pubsuby";
import {Pubsuby} from "./pubsuby";
import expressWs from 'express-ws'
import {listify} from "./utils";
import {requiredSecretApiKey} from "./middlewares";
import Config from "./config";
import {RedisBroker} from "./broker";

let wsInstance = expressWs(express());
let {app} = wsInstance;
Expand Down

0 comments on commit dec53f5

Please sign in to comment.