This is connector for Apache Flink®
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for Apache Flink® 1.15 -->
<version>x.x.x_flink-1.15</version>
<!-- for Apache Flink® 1.14 -->
<version>x.x.x_flink-1.14_2.11</version>
<version>x.x.x_flink-1.14_2.12</version>
<!-- for Apache Flink® 1.13 -->
<version>x.x.x_flink-1.13_2.11</version>
<version>x.x.x_flink-1.13_2.12</version>
<!-- for Apache Flink® 1.12 -->
<version>x.x.x_flink-1.12_2.11</version>
<version>x.x.x_flink-1.12_2.12</version>
<!-- for Apache Flink® 1.11 -->
<version>x.x.x_flink-1.11_2.11</version>
<version>x.x.x_flink-1.11_2.12</version>
</dependency>
Click HERE to get the latest version.
StarRocksSourceOptions options = StarRocksSourceOptions.builder()
.withProperty("scan-url", "fe_ip1:8030,fe_ip2:8030,fe_ip3:8030")
.withProperty("jdbc-url", "jdbc:mysql:https://fe_ip:9030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "flink_test")
.withProperty("database-name", "test")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("date_1", DataTypes.DATE())
.field("datetime_1", DataTypes.TIMESTAMP(6))
.field("char_1", DataTypes.CHAR(20))
.field("varchar_1", DataTypes.STRING())
.field("boolean_1", DataTypes.BOOLEAN())
.field("tinyint_1", DataTypes.TINYINT())
.field("smallint_1", DataTypes.SMALLINT())
.field("int_1", DataTypes.INT())
.field("bigint_1", DataTypes.BIGINT())
.field("largeint_1", DataTypes.STRING())
.field("float_1", DataTypes.FLOAT())
.field("double_1", DataTypes.DOUBLE())
.field("decimal_1", DataTypes.DECIMAL(27, 9))
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print();
env.execute("StarRocks flink source");
OR
// create a table with `structure` and `properties`
CREATE TABLE flink_test (
date_1 DATE,
datetime_1 TIMESTAMP(6),
char_1 CHAR(20),
varchar_1 VARCHAR,
boolean_1 BOOLEAN,
tinyint_1 TINYINT,
smallint_1 SMALLINT,
int_1 INT,
bigint_1 BIGINT,
largeint_1 STRING,
float_1 FLOAT,
double_1 DOUBLE,
decimal_1 DECIMAL(27,9)
) WITH (
'connector'='starrocks',
'scan-url'='fe_ip1:8030,fe_ip2:8030,fe_ip3:8030',
'jdbc-url'='jdbc:mysql:https://fe_ip:9030',
'username'='root',
'password'='',
'database-name'='flink_test',
'table-name'='flink_test'
);
select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | YES | NONE | String | starrocks |
scan-url | YES | NONE | String | Hosts of the fe nodes like: fe_ip1:http_port,fe_ip2:http_port... . |
jdbc-url | YES | NONE | String | Hosts of the fe nodes like: fe_ip1:query_port,fe_ip2: query_port... . |
username | YES | NONE | String | StarRocks user name. |
password | YES | NONE | String | StarRocks user password. |
database-name | YES | NONE | String | Database name |
table-name | YES | NONE | String | Table name |
scan.connect.timeout-ms | NO | 1000 | String | Connect timeout |
scan.params.keep-alive-min | NO | 10 | String | Max keep alive time min |
scan.params.query-timeout-s | NO | 600(5min) | String | Query timeout for a single query(The value of this parameter needs to be longer than the estimated period of the source) |
scan.params.mem-limit-byte | NO | 102410241024(1G) | String | Memory limit for a single query |
scan.max-retries | NO | 1 | String | Max request retry times. |
Name | Type | Description |
---|---|---|
totalScannedRows | counter | successfully collected data |
StarRocks | Flink |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
exactly-once
semantic cannot be guaranteed in the case of a task failure.- Only SQLs without aggregation like
select {*|columns|count(1)} from {table-name} where ...
are supported.
// -------- sink with raw json string stream --------
fromElements(new String[]{
"{\"score\": \"99\", \"name\": \"stephen\"}",
"{\"score\": \"100\", \"name\": \"lebron\"}"
}).addSink(
StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql:https://ip:port,ip:port?xxxxx")
.withProperty("load-url", "ip:port;ip:port")
.withProperty("username", "xxx")
.withProperty("password", "xxx")
.withProperty("table-name", "xxx")
.withProperty("database-name", "xxx")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.withProperty("sink.parallelism", "1")
.build()
)
);
// -------- sink with stream transformation --------
class RowData {
public int score;
public String name;
public RowData(int score, String name) {
......
}
}
fromElements(
new RowData[]{
new RowData(99, "stephen"),
new RowData(100, "lebron")
}
).addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("score", DataTypes.INT())
.field("name", DataTypes.VARCHAR(20))
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql:https://ip:port,ip:port?xxxxx")
.withProperty("load-url", "ip:port;ip:port")
.withProperty("username", "xxx")
.withProperty("password", "xxx")
.withProperty("table-name", "xxx")
.withProperty("database-name", "xxx")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.withProperty("sink.parallelism", "1")
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.score;
slots[1] = streamRowData.name;
}
)
);
OR
// create a table with `structure` and `properties`
tEnv.executeSql(
"CREATE TABLE USER_RESULT(" +
"name VARCHAR," +
"score BIGINT" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='jdbc:mysql:https://ip:port,ip:port?xxxxx'," +
"'load-url'='ip:port;ip:port'," +
"'database-name' = 'xxx'," +
"'table-name' = 'xxx'," +
"'username' = 'xxx'," +
"'password' = 'xxx'," +
"'sink.buffer-flush.interval-ms' = '15000'," +
"'sink.properties.format' = 'json'," +
"'sink.properties.strip_outer_array' = 'true'," +
"'sink.parallelism' = '1'," +
"'sink.max-retries' = '10'," +
")"
);
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | YES | NONE | String | starrocks |
jdbc-url | YES | NONE | String | this will be used to execute queries in starrocks. |
load-url | YES | NONE | String | fe_ip:http_port;fe_ip:http_port separated with ; , which would be used to do the batch sinking. |
database-name | YES | NONE | String | starrocks database name |
table-name | YES | NONE | String | starrocks table name |
username | YES | NONE | String | starrocks connecting username |
password | YES | NONE | String | starrocks connecting password |
sink.semantic | NO | at-least-once |
String | at-least-once or exactly-once (flush at checkpoint only and options like sink.buffer-flush.* won't work either). |
sink.buffer-flush.max-bytes | NO | 94371840(90M) | String | the max batching size of the serialized data, range: [64MB, 10GB] . |
sink.buffer-flush.max-rows | NO | 500000 | String | the max batching rows, range: [64,000, 5000,000] . |
sink.buffer-flush.interval-ms | NO | 300000 | String | the flushing time interval, range: [1000ms, 3600000ms] . |
sink.max-retries | NO | 3 | String | max retry times of the stream load request, range: [0, 1000] . |
sink.parallelism | NO | NULL | String | Specify the parallelism of the sink individually. Remove it if you want to follow the global parallelism settings. |
sink.connect.timeout-ms | NO | 1000 | String | Timeout in millisecond for connecting to the load-url , range: [100, 60000] . |
sink.label-prefix | NO | NO | String | the prefix of the stream load label, available characters are within [-_A-Za-z0-9]. |
sink.properties.* | NO | NONE | String | the stream load properties like 'sink.properties.columns' = 'k1, v1' . |
Name | Type | Description |
---|---|---|
totalFlushBytes | counter | successfully flushed bytes. |
totalFlushRows | counter | successfully flushed rows. |
totalFlushSucceededTimes | counter | number of times that the data-batch been successfully flushed. |
totalFlushFailedTimes | counter | number of times that the flushing been failed. |
Flink type | StarRocks type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | JSON / STRING |
VARCHAR | JSON / STRING |
STRING | JSON / STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON / JSON STRING |
ROW<arg T...> | JSON / JSON STRING |
Flush
action was triggeredat-least-once
when:cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
sink.buffer-flush.{max-rows|max-bytes|interval-ms}
becomes invalid when it comes with theexactly-once
semantic.