diff --git a/ot-server/src/mq/kafka.ts b/ot-server/src/mq/kafka.ts index 67634e1..ed8a686 100644 --- a/ot-server/src/mq/kafka.ts +++ b/ot-server/src/mq/kafka.ts @@ -8,7 +8,7 @@ import {KAFKA_TOPIC_OP, KAFKA_TOPIC_REVISION} from "../const/config"; class Kafka { private backend: Backend; private producer: kafka.Producer; - private opConsumer: kafka.Consumer; + private opConsumer: kafka.ConsumerGroup; private ready: boolean; private pendingOps: Command[]; constructor(backend: Backend) { @@ -17,19 +17,12 @@ class Kafka { partitionerType: 3, // partitioned by key requireAcks: 1, }); - this.opConsumer = new kafka.Consumer(new kafka.KafkaClient(), [{ - partition: 0, - topic: KAFKA_TOPIC_OP, - }, { - partition: 1, - topic: KAFKA_TOPIC_OP, - }, { - partition: 2, - topic: KAFKA_TOPIC_OP, - }], { - // autoCommit: true, + this.opConsumer = new kafka.ConsumerGroup({ + autoCommit: false, encoding: "buffer", - }); + fromOffset: "latest", + groupId: "OpConsumerGroup", + }, [KAFKA_TOPIC_OP]); this.ready = false; this.pendingOps = []; this.bindEvents_(); @@ -55,7 +48,11 @@ class Kafka { this.opConsumer.on("message", (message) => { const command = Command.deserializeBinary(message.value as Buffer); + console.log(`[kafka-consumer -> ${KAFKA_TOPIC_OP}]: received ${command.getVersion()}`); this.backend.submit(command, async (err, ops) => { + this.opConsumer.commit((e) => { + console.log(`[kafka-consumer -> ${KAFKA_TOPIC_OP}]: commit ${command.getVersion()} ${e ? "failed" : "success"}`); + }); const ack = new Command(); ack.setDocid(command.getDocid()); ack.setSeq(command.getSeq()); @@ -79,7 +76,7 @@ class Kafka { topic: KAFKA_TOPIC_REVISION }]; this.producer.send(payloads, (err) => { - console.log(`[kafka-producer -> " + KAFKA_TOPIC_REVISION + "]: broker update ${err ? "failed" : "success"}`); + console.log(`[kafka-producer -> ${KAFKA_TOPIC_REVISION}]: send ${op.getVersion()} ${err ? "failed" : "success"}`); }); } }