mongodb消费kafka消息 实现kafka消息同步入库mongodb
代码中默认是以GroupId为 groupA
消费Topic为 flume1
的kafka消息
Properties props = new Properties();
props.put("bootstrap.servers", connectConfig.getKafkaIp() + ":" + connectConfig.getKafkaPort());
props.put("group.id", "groupA");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("flume1"));
代码中连接mydb数据库
ConnectConfig connectConfig = new ConnectConfig();
//连接到 mongodb 服务
MongoClient mongoClient = new MongoClient
(connectConfig.getMongodbIp(), Integer.parseInt(connectConfig.getMongodbPort()));
//返回连接数据库对象
return mongoClient.getDatabase("mydb");
private String kafkaIp = "192.168.50.201";
private String kafkaPort = "9092";
private String mongodbIp = "192.168.50.201";
private String mongodbPort = "27017";
代码中默认连接netflows
MongoCollection<Document> mongoDatabaseCollection = mongoDatabase.getCollection("netflows");
---------开始消费---------
如需更改请修改 log4j.properties内配置,log文件默认输出到运行路径下log文件夹内kafka2mongoDebug.log记录debug级别以上日志,kafka2mongoError.log记录error级别以上日志
{"employees":[{"firstName":"Bill","lastName":"Gates"},{"firstName":"George","lastName":"Bush"},{"firstName":"Thomas","lastName":"Carter"}]}