Skip to content

Commit

Permalink
[FLINK-4179] [table] Additional TPCHQuery3Table example improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Jul 28, 2016
1 parent ec4c9be commit 185b5f6
Showing 1 changed file with 65 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,60 @@
*/
package org.apache.flink.examples.scala

import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment

/**
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* The original query can be found at
* [http:https://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http:https://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
* (page 29).
*
* This program implements the following SQL equivalent:
*
* {{{
* SELECT
* l_orderkey,
* SUM(l_extendedprice*(1-l_discount)) AS revenue,
* o_orderdate,
* o_shippriority
* FROM customer,
* orders,
* lineitem
* WHERE
* c_mktsegment = '[SEGMENT]'
* AND c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND o_orderdate < date '[DATE]'
* AND l_shipdate > date '[DATE]'
* GROUP BY
* l_orderkey,
* o_orderdate,
* o_shippriority;
* }}}
*
* Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http:https://www.tpc.org/tpch/](a href="http:https://www.tpc.org/tpch/).
*
* Usage:
* {{{
* TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
* }}}
*
* This example shows how to use:
* - Table API expressions
*
*/
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* The original query can be found at
* [http:https://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http:https://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
* (page 29).
*
* This program implements the following SQL equivalent:
*
* {{{
* SELECT
* l_orderkey,
* SUM(l_extendedprice*(1-l_discount)) AS revenue,
* o_orderdate,
* o_shippriority
* FROM customer,
* orders,
* lineitem
* WHERE
* c_mktsegment = '[SEGMENT]'
* AND c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND o_orderdate < date '[DATE]'
* AND l_shipdate > date '[DATE]'
* GROUP BY
* l_orderkey,
* o_orderdate,
* o_shippriority
* ORDER BY
* revenue desc,
* o_orderdate;
* }}}
*
* Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http:https://www.tpc.org/tpch/](a href="http:https://www.tpc.org/tpch/).
*
* Usage:
* {{{
* TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
* }}}
*
* This example shows how to:
* - Convert DataSets to Tables
* - Use Table API expressions
*
*/
object TPCHQuery3Table {

def main(args: Array[String]) {
Expand All @@ -76,23 +79,23 @@ object TPCHQuery3Table {
}

// set filter date
val date = java.sql.Date.valueOf("1995-03-12")
val date = "1995-03-12".toDate

// get execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val lineitems = getLineitemDataSet(env)
.filter( l => java.sql.Date.valueOf(l.shipDate).after(date) ).toTable(tEnv)
.as('id, 'extdPrice, 'discount, 'shipDate)
.toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
.filter('shipDate.toDate > date)

val customers = getCustomerDataSet(env).toTable(tEnv)
.as('id, 'mktSegment)
.filter( 'mktSegment === "AUTOMOBILE" )
val customers = getCustomerDataSet(env)
.toTable(tEnv, 'id, 'mktSegment)
.filter('mktSegment === "AUTOMOBILE")

val orders = getOrdersDataSet(env)
.filter( o => java.sql.Date.valueOf(o.orderDate).before(date) ).toTable(tEnv)
.as('orderId, 'custId, 'orderDate, 'shipPrio)
.toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
.filter('orderDate.toDate < date)

val items =
orders.join(customers)
Expand All @@ -102,19 +105,20 @@ object TPCHQuery3Table {
.where('orderId === 'id)
.select(
'orderId,
'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
'orderDate,
'shipPrio)

val result = items
.groupBy('orderId, 'orderDate, 'shipPrio)
.select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
.select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
.orderBy('revenue.desc, 'orderDate.asc)

// emit result
result.writeAsCsv(outputPath, "\n", "|")

// execute program
env.execute("Scala TPCH Query 3 (Expression) Example")
env.execute("Scala TPCH Query 3 (Table API Expression) Example")
}

// *************************************************************************
Expand Down Expand Up @@ -145,12 +149,12 @@ object TPCHQuery3Table {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http:https://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
"<orders-csv path> <result path>")
false
}
}

private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
env.readCsvFile[Lineitem](
lineitemPath,
Expand All @@ -164,7 +168,7 @@ object TPCHQuery3Table {
fieldDelimiter = "|",
includedFields = Array(0, 6) )
}

private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
env.readCsvFile[Order](
ordersPath,
Expand Down

0 comments on commit 185b5f6

Please sign in to comment.