QStreaming is a framework that simplifies writing and executing ETLs on top of Apache Spark
It is based on a simple sql-like configuration file and runs on any Spark cluster
To run QStreaming you must first define Pipeline DSL file as below.
For example a simple pipeline dsl file should be as follows:
-- DDL for streaming input which connect to a kafka topic
-- this declares five fields based on the JSON data format.In addition, it use the ROWTIME() to declare a virtual column that generate the event time attribute from existing ts field
create stream input table user_behavior(
user_id LONG,
item_id LONG,
category_id LONG,
behavior STRING,
ts TIMESTAMP,
eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
kafka.bootstrap.servers="localhost:localhost:9091",
startingOffsets=earliest,
subscribe="user_behavior",
"group-id"="user_behavior"
);
-- DDL for streaming output which connect to a kafka topic
create stream output table behavior_cnt_per_hour
using kafka(
kafka.bootstrap.servers="localhost:9091",
topic="behavior_cnt_per_hour"
)TBLPROPERTIES(
"update-mode"="update",
checkpointLocation = "behavior_cnt_per_hour"
);
-- CREATE VIEW count the number of "buy" records in each hour window.
create view v_behavior_cnt_per_hour as
SELECT
window(eventTime, "1 minutes") as window,
COUNT(*) as behavior_cnt,
behavior
FROM user_behavior
GROUP BY
window(eventTime, "1 minutes"),
behavior;
-- persist metric to kafka
insert into behavior_cnt_per_hour
select
from_unixtime(cast(window.start as LONG)/1000,'yyyy-MM-dd HH:mm') as time,
behavior_cnt,
behavior
from
v_behavior_cnt_per_hour;
There are three options to run QStreaming, first to get the latest released JAR from here
To run on a cluster requires Apache Spark v2.2+
- Run the following command:
$SPARK_HOME/bin/spark-submit
--class com.qiniu.stream.core.Streaming \
--master yarn \
--deploy-mode client \
stream-standalone-0.1.0-jar-with-dependencies.jar \
-j pathToYourPipeline.dsl
To run on a standalone cluster you must first start a spark standalone cluster , and then run the following command:
$SPARK_HOME/bin/spark-submit
--class com.qiniu.stream.core.Streaming \
--master spark:https://IP:PORT \
stream-standalone-0.1.0-jar-with-dependencies.jar \
-j pathToYourPipeline.dsl
It's also possible to use QStreaming inside your own project
To use it adds the dependency to your project
-
maven
<dependency> <groupId>com.qiniu</groupId> <dependency>stream-core</dependency> <version>0.1.0</version> </dependency>
-
gradle
compile 'com.qiniu:stream-core:0.1.0'
-
sbt
libraryDependencies += "com.qiniu" % "stream-core" % "0.1.0"
QStreaming allow to connect to a stream source with DDL statement.
For example below define an input which connect to a kafka topic
create stream input table user_behavior(
user_id LONG,
item_id LONG,
category_id LONG,
behavior STRING,
ts TIMESTAMP,
eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
kafka.bootstrap.servers="localhost:9091",
startingOffsets=earliest,
subscribe="user_behavior",
"group-id"="user_behavior"
);
Please refer to CreateSourceTableStatement and CreateSinkTableStatement for the detail of DDL statement .
QStreaming supports watermark which helps a stream processing engine to deal with late data.
There are two ways to use watermark for a stream processing engine
-
Adding ROWTIME(eventTimeField,delayThreshold) as a schema property in a ddl statement
create stream input table user_behavior( user_id LONG, item_id LONG, category_id LONG, behavior STRING, ts TIMESTAMP, eventTime as ROWTIME(ts,'1 minutes') ) using kafka( kafka.bootstrap.servers="localhost:9091", startingOffsets=earliest, subscribe="user_behavior", "group-id"="user_behavior" );
Above example means use
eventTime
as event time field with 5 minutes delay thresholds -
Adding waterMark("eventTimeField, delayThreshold") as a view property in a view statement
create view v_behavior_cnt_per_hour(waterMark = "eventTime, 1 minutes") as SELECT window(eventTime, "1 minutes") as window, COUNT(*) as behavior_cnt, behavior FROM user_behavior GROUP BY window(eventTime, "1 minutes"), behavior;
Above example define a watermark use eventTime
field with 1 minute threshold
-- define UDF named hello
create function hello(name:String) = {
s"hello ${name}"
};
QStreaming allow to define a dynamic UDF inside job.dsl, for more detail information please refer to createFunctionStatement
Above example define UDF with a string parameter.
QStreaming allow you to define multiple output for streaming process by leavarage foreEachBatch mode (only avaliable in spark>=2.4.0)
Below example will sink the behavior count metric to two kafka topics
create stream output table output using kafka(
kafka.bootstrap.servers=<kafkaBootStrapServers>,
topic="topic1"
),kafka(
kafka.bootstrap.servers=<kafkaBootStrapServers>,
topic="topic2"
) TBLPROPERTIES (outputMode = update,checkpointLocation = "behavior_output");
For more information about how to create multiple sink please refer to createSinkTableStatement
QStreaming support variable interpolation from command line arguments , this is useful for running QStreaming as a periodic job, and referece them in sql file .
For example, you can pass the value for theDayThatRunAJob
and theHourThatRunAJob
from an Airflow DAG
$SPARK_HOME/bin/spark-submit
--name yourAppName \
--class com.qiniu.stream.core.Streaming \
stream-standalone-0.1.0-jar-with-dependencies.jar \
-j pathToYourPipeline.dsl \
-v day=theDayThatRunAJob \
-v hour=theHourThatRunAJob
and the pipeline dsl file
create batch input table raw_log
USING parquet(path="hdfs:https://cluster1/logs/day=${day}/hour=${hourt");
QStreaming allow to monitor the kafka topic offset lag by adding the "group-id" connector property in ddl statement as below
create stream input table user_behavior(
user_id LONG,
item_id LONG,
category_id LONG,
behavior STRING,
ts TIMESTAMP,
eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
kafka.bootstrap.servers="localhost:9091",
startingOffsets=earliest,
subscribe="user_behavior",
"group-id"="user_behavior"
);
The purpose is to "unit-test" data to find errors early, before the data gets fed to any storage.
For example, we test for the following properties of data :
- there are 5 rows in total
- values of the
id
attribute are never NULL and unique - values of the
productName
attribute are never NULL - the
priority
attribute can only contain "high" or "low" as value numViews
should not contain negative values- at least half of the values in
description
should contain a url - the median of
numViews
should be less than or equal to 10
In DSL this looks as follows:
CREATE TEST testName(testLevel=Error,testOutput=testResult) on dataset WITH
numRows()=5 and
isNotNull(id) and
isUnique(id) and
isComplete(productName) and
isContainedIn(priority, ["high", "low"]) and
isNonNegative(numViews) and
containsUrl(description) >= 0.5 and
hasApproxQuantile(numViews, 0.5) <= 10
See the LICENSE file for license rights and limitations (Apache License).