Skip to content

Commit

Permalink
[FLINK-9697] Add new kafka connector module
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored and aljoscha committed Oct 16, 2018
1 parent d7a2234 commit 2e3e820
Show file tree
Hide file tree
Showing 29 changed files with 5,744 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
Expand Down Expand Up @@ -255,7 +255,7 @@ public enum Semantic {
private final AtomicLong pendingRecords = new AtomicLong();

/** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */
private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>();
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();

/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
Expand Down Expand Up @@ -986,12 +986,12 @@ private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics)
String name = entry.getKey().name();
Metric metric = entry.getValue();

KafkaMetricMuttableWrapper wrapper = previouslyCreatedMetrics.get(name);
KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name);
if (wrapper != null) {
wrapper.setKafkaMetric(metric);
} else {
// TODO: somehow merge metrics from all active producers?
wrapper = new KafkaMetricMuttableWrapper(metric);
wrapper = new KafkaMetricMutableWrapper(metric);
previouslyCreatedMetrics.put(name, wrapper);
kafkaMetricGroup.gauge(name, wrapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
* Gauge for getting the current value of a Kafka metric.
*/
@Internal
public class KafkaMetricMuttableWrapper implements Gauge<Double> {
public class KafkaMetricMutableWrapper implements Gauge<Double> {
private org.apache.kafka.common.Metric kafkaMetric;

public KafkaMetricMuttableWrapper(org.apache.kafka.common.Metric metric) {
public KafkaMetricMutableWrapper(org.apache.kafka.common.Metric metric) {
this.kafkaMetric = metric;
}

Expand Down
318 changes: 318 additions & 0 deletions flink-connectors/flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="https://maven.apache.org/POM/4.0.0"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>flink-connectors</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.7-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<name>flink-connector-kafka</name>

<packaging>jar</packaging>

<properties>
<kafka.version>2.0.0</kafka.version>
</properties>

<dependencies>

<!-- core dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- streaming-java dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add Kafka 2.0.x as a dependency -->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project, won't depend on flink-table. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<!-- exclude Kafka dependencies -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<!-- exclude Kafka dependencies -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<!-- include 2.0 server for tests -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<profiles>
<!-- Create SQL Client uber jars by default -->
<profile>
<id>sql-jars</id>
<activation>
<property>
<name>!skipSqlJars</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>sql-jar</shadedClassifierName>
<artifactSet>
<includes combine.children="append">
<include>org.apache.kafka:*</include>
<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<includes>
<include>**/KafkaTestEnvironmentImpl*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-test-sources</id>
<goals>
<goal>test-jar-no-fork</goal>
</goals>
<configuration>
<includes>
<include>**/KafkaTestEnvironmentImpl*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
</plugins>
</build>

</project>
Loading

0 comments on commit 2e3e820

Please sign in to comment.