Skip to content

Commit

Permalink
fix(ot): use comsumer group to receive latest op
Browse files Browse the repository at this point in the history
  • Loading branch information
BrotherJing committed Nov 12, 2019
1 parent a89320d commit b86ae01
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions ot-server/src/mq/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {KAFKA_TOPIC_OP, KAFKA_TOPIC_REVISION} from "../const/config";
class Kafka {
private backend: Backend;
private producer: kafka.Producer;
private opConsumer: kafka.Consumer;
private opConsumer: kafka.ConsumerGroup;
private ready: boolean;
private pendingOps: Command[];
constructor(backend: Backend) {
Expand All @@ -17,19 +17,12 @@ class Kafka {
partitionerType: 3, // partitioned by key
requireAcks: 1,
});
this.opConsumer = new kafka.Consumer(new kafka.KafkaClient(), [{
partition: 0,
topic: KAFKA_TOPIC_OP,
}, {
partition: 1,
topic: KAFKA_TOPIC_OP,
}, {
partition: 2,
topic: KAFKA_TOPIC_OP,
}], {
// autoCommit: true,
this.opConsumer = new kafka.ConsumerGroup({
autoCommit: false,
encoding: "buffer",
});
fromOffset: "latest",
groupId: "OpConsumerGroup",
}, [KAFKA_TOPIC_OP]);
this.ready = false;
this.pendingOps = [];
this.bindEvents_();
Expand All @@ -55,7 +48,11 @@ class Kafka {

this.opConsumer.on("message", (message) => {
const command = Command.deserializeBinary(message.value as Buffer);
console.log(`[kafka-consumer -> ${KAFKA_TOPIC_OP}]: received ${command.getVersion()}`);
this.backend.submit(command, async (err, ops) => {
this.opConsumer.commit((e) => {
console.log(`[kafka-consumer -> ${KAFKA_TOPIC_OP}]: commit ${command.getVersion()} ${e ? "failed" : "success"}`);
});
const ack = new Command();
ack.setDocid(command.getDocid());
ack.setSeq(command.getSeq());
Expand All @@ -79,7 +76,7 @@ class Kafka {
topic: KAFKA_TOPIC_REVISION
}];
this.producer.send(payloads, (err) => {
console.log(`[kafka-producer -> " + KAFKA_TOPIC_REVISION + "]: broker update ${err ? "failed" : "success"}`);
console.log(`[kafka-producer -> ${KAFKA_TOPIC_REVISION}]: send ${op.getVersion()} ${err ? "failed" : "success"}`);
});
}
}
Expand Down

0 comments on commit b86ae01

Please sign in to comment.