Skip to content

Commit

Permalink
[FLINK-5714] [table] Use a builder pattern for creating CsvTableSource
Browse files Browse the repository at this point in the history
This closes apache#3273.
  • Loading branch information
wuchong authored and twalthr committed Feb 15, 2017
1 parent 04e6758 commit 6c310a7
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 49 deletions.
73 changes: 34 additions & 39 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source");

The `CsvTableSource` is already included in `flink-table` without additional dependecies.

It can be configured with the following properties:

- `path` The path to the CSV file, required.
- `fieldNames` The names of the table fields, required.
- `fieldTypes` The types of the table fields, required.
- `fieldDelim` The field delimiter, `","` by default.
- `rowDelim` The row delimiter, `"\n"` by default.
- `quoteCharacter` An optional quote character for String values, `null` by default.
- `ignoreFirstLine` Flag to ignore the first line, `false` by default.
- `ignoreComments` An optional prefix to indicate comments, `null` by default.
- `lenient` Flag to skip records with parse error instead to fail, `false` by default.
The easiest way to create a `CsvTableSource` is by using the enclosed builder `CsvTableSource.builder()`, the builder has the following methods to configure properties:

- `path(String path)` Sets the path to the CSV file, required.
- `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, can be called multiple times, required. The call order of this method defines also the order of the fields in a row.
- `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default.
- `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default.
- `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default.
- `commentPrefix(String prefix)` Sets a prefix to indicate comments, `null` by default.
- `ignoreFirstLine()` Ignore the first line. Disabled by default.
- `ignoreParseErrors()` Skip records with parse error instead to fail. Throwing an exception by default.

You can create the source as follows:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
CsvTableSource csvTableSource = new CsvTableSource(
"/path/to/your/file.csv",
new String[] { "name", "id", "score", "comments" },
new TypeInformation<?>[] {
Types.STRING(),
Types.INT(),
Types.DOUBLE(),
Types.STRING()
},
"#", // fieldDelim
"$", // rowDelim
null, // quoteCharacter
true, // ignoreFirstLine
"%", // ignoreComments
false); // lenient
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("/path/to/your/file.csv")
.field("name", Types.STRING())
.field("id", Types.INT())
.field("score", Types.DOUBLE())
.field("comments", Types.STRING())
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val csvTableSource = new CsvTableSource(
"/path/to/your/file.csv",
Array("name", "id", "score", "comments"),
Array(
Types.STRING,
Types.INT,
Types.DOUBLE,
Types.STRING
),
fieldDelim = "#",
rowDelim = "$",
ignoreFirstLine = true,
ignoreComments = "%")
val csvTableSource = CsvTableSource
.builder
.path("/path/to/your/file.csv")
.field("name", Types.STRING)
.field("id", Types.INT)
.field("score", Types.DOUBLE)
.field("comments", Types.STRING)
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
{% endhighlight %}
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableException

import scala.collection.mutable

/**
* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
* (logically) unlimited number of fields.
Expand All @@ -44,15 +46,15 @@ import org.apache.flink.table.api.TableException
* @param lenient Flag to skip records with parse error instead to fail, false by default.
*/
class CsvTableSource(
path: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
quoteCharacter: Character = null,
ignoreFirstLine: Boolean = false,
ignoreComments: String = null,
lenient: Boolean = false)
private val path: String,
private val fieldNames: Array[String],
private val fieldTypes: Array[TypeInformation[_]],
private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
private val quoteCharacter: Character = null,
private val ignoreFirstLine: Boolean = false,
private val ignoreComments: String = null,
private val lenient: Boolean = false)
extends BatchTableSource[Row]
with StreamTableSource[Row]
with ProjectableTableSource[Row] {
Expand Down Expand Up @@ -138,4 +140,174 @@ class CsvTableSource(

inputFormat
}

override def equals(other: Any): Boolean = other match {
case that: CsvTableSource => returnType == that.returnType &&
path == that.path &&
fieldDelim == that.fieldDelim &&
rowDelim == that.rowDelim &&
quoteCharacter == that.quoteCharacter &&
ignoreFirstLine == that.ignoreFirstLine &&
ignoreComments == that.ignoreComments &&
lenient == that.lenient
case _ => false
}

override def hashCode(): Int = {
returnType.hashCode()
}
}

object CsvTableSource {

/**
* A builder for creating [[CsvTableSource]] instances.
*
* For example:
*
* {{{
* val source: CsvTableSource = new CsvTableSource.builder()
* .path("/path/to/your/file.csv")
* .field("myfield", Types.STRING)
* .field("myfield2", Types.INT)
* .build()
* }}}
*
*/
class Builder {

private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
mutable.LinkedHashMap[String, TypeInformation[_]]()
private var quoteCharacter: Character = _
private var path: String = _
private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
private var isIgnoreFirstLine: Boolean = false
private var commentPrefix: String = _
private var lenient: Boolean = false

/**
* Sets the path to the CSV file. Required.
*
* @param path the path to the CSV file
*/
def path(path: String): Builder = {
this.path = path
this
}

/**
* Sets the field delimiter, "," by default.
*
* @param delim the field delimiter
*/
def fieldDelimiter(delim: String): Builder = {
this.fieldDelim = delim
this
}

/**
* Sets the line delimiter, "\n" by default.
*
* @param delim the line delimiter
*/
def lineDelimiter(delim: String): Builder = {
this.lineDelim = delim
this
}

/**
* Adds a field with the field name and the type information. Required.
* This method can be called multiple times. The call order of this method defines
* also the order of thee fields in a row.
*
* @param fieldName the field name
* @param fieldType the type information of the field
*/
def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
if (schema.contains(fieldName)) {
throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
}
schema += (fieldName -> fieldType)
this
}

/**
* Sets a quote character for String values, null by default.
*
* @param quote the quote character
*/
def quoteCharacter(quote: Character): Builder = {
this.quoteCharacter = quote
this
}

/**
* Sets a prefix to indicate comments, null by default.
*
* @param prefix the prefix to indicate comments
*/
def commentPrefix(prefix: String): Builder = {
this.commentPrefix = prefix
this
}

/**
* Ignore the first line. Not skip the first line by default.
*/
def ignoreFirstLine(): Builder = {
this.isIgnoreFirstLine = true
this
}

/**
* Skip records with parse error instead to fail. Throw an exception by default.
*/
def ignoreParseErrors(): Builder = {
this.lenient = true
this
}

/**
* Apply the current values and constructs a newly-created [[CsvTableSource]].
*
* @return a newly-created [[CsvTableSource]].
*/
def build(): CsvTableSource = {
if (path == null) {
throw new IllegalArgumentException("Path must be defined.")
}
if (schema.isEmpty) {
throw new IllegalArgumentException("Fields can not be empty.")
}
new CsvTableSource(
path,
schema.keys.toArray,
schema.values.toArray,
fieldDelim,
lineDelim,
quoteCharacter,
isIgnoreFirstLine,
commentPrefix,
lenient)
}

}

/**
* Return a new builder that builds a [[CsvTableSource]].
*
* For example:
*
* {{{
* val source: CsvTableSource = CsvTableSource
* .builder()
* .path("/path/to/your/file.csv")
* .field("myfield", Types.STRING)
* .field("myfield2", Types.INT)
* .build()
* }}}
* @return a new builder to build a [[CsvTableSource]]
*/
def builder(): Builder = new Builder
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.flink.table.api.scala.batch

import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
import org.junit.{Assert, Test}

class TableSourceTest extends TableTestBase {

Expand Down Expand Up @@ -139,6 +140,59 @@ class TableSourceTest extends TableTestBase {
util.verifyTable(result, expected)
}

@Test
def testCsvTableSourceBuilder(): Unit = {
val source1 = CsvTableSource.builder()
.path("/path/to/csv")
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.quoteCharacter(';')
.fieldDelimiter("#")
.lineDelimiter("\r\n")
.commentPrefix("%%")
.ignoreFirstLine()
.ignoreParseErrors()
.build()

val source2 = new CsvTableSource(
"/path/to/csv",
Array("myfield", "myfield2"),
Array(Types.STRING, Types.INT),
"#",
"\r\n",
';',
true,
"%%",
true)

Assert.assertEquals(source1, source2)
}

@Test(expected = classOf[IllegalArgumentException])
def testCsvTableSourceBuilderWithNullPath(): Unit = {
CsvTableSource.builder()
.field("myfield", Types.STRING)
// should fail, path is not defined
.build()
}

@Test(expected = classOf[IllegalArgumentException])
def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
CsvTableSource.builder()
.path("/path/to/csv")
.field("myfield", Types.STRING)
// should fail, field name must no be duplicate
.field("myfield", Types.INT)
}

@Test(expected = classOf[IllegalArgumentException])
def testCsvTableSourceBuilderWithEmptyField(): Unit = {
CsvTableSource.builder()
.path("/path/to/csv")
// should fail, field can be empty
.build()
}

def tableSource: (CsvTableSource, String) = {
val csvTable = CommonTestData.getCsvTableSource
val tableName = "csvTable"
Expand Down

0 comments on commit 6c310a7

Please sign in to comment.