Skip to content

Commit

Permalink
[FLINK-1512] [tests] Add integration tests for CsvReader
Browse files Browse the repository at this point in the history
This closes apache#426
  • Loading branch information
chiwanpark authored and fhueske committed Mar 25, 2015
1 parent 7a6f296 commit 43ac967
Showing 1 changed file with 32 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,31 @@ class CsvInputFormatTest {

case class CaseClassItem(field1: Int, field2: String, field3: Double)

private def validatePOJOItem(format: ScalaCsvInputFormat[POJOItem]): Unit = {
var result = new POJOItem()
result = format.nextRecord(result)
assertEquals(123, result.field1)
assertEquals("HELLO", result.field2)
assertEquals(3.123, result.field3, 0.001)

result = format.nextRecord(result)
assertEquals(456, result.field1)
assertEquals("ABC", result.field2)
assertEquals(1.234, result.field3, 0.001)
}

private def validateCaseClassItem(format: ScalaCsvInputFormat[CaseClassItem]): Unit = {
var result = format.nextRecord(null)
assertEquals(123, result.field1)
assertEquals("HELLO", result.field2)
assertEquals(3.123, result.field3, 0.001)

result = format.nextRecord(null)
assertEquals(456, result.field1)
assertEquals("ABC", result.field2)
assertEquals(1.234, result.field3, 0.001)
}

@Test
def testPOJOType(): Unit = {
val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
Expand All @@ -425,16 +450,7 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)

var result = new POJOItem()
result = format.nextRecord(result)
assertEquals(123, result.field1)
assertEquals("HELLO", result.field2)
assertEquals(3.123, result.field3, 0.001)

result = format.nextRecord(result)
assertEquals(456, result.field1)
assertEquals("ABC", result.field2)
assertEquals(1.234, result.field3, 0.001)
validatePOJOItem(format)
}

@Test
Expand All @@ -449,15 +465,7 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)

var result = format.nextRecord(null)
assertEquals(123, result.field1)
assertEquals("HELLO", result.field2)
assertEquals(3.123, result.field3, 0.001)

result = format.nextRecord(null)
assertEquals(456, result.field1)
assertEquals("ABC", result.field2)
assertEquals(1.234, result.field3, 0.001)
validateCaseClassItem(format)
}

@Test
Expand All @@ -474,36 +482,24 @@ class CsvInputFormatTest {
format.configure(new Configuration)
format.open(tempFile)

var result = new POJOItem()
result = format.nextRecord(result)
assertEquals(123, result.field1)
assertEquals("HELLO", result.field2)
assertEquals(3.123, result.field3, 0.001)

result = format.nextRecord(result)
assertEquals(456, result.field1)
assertEquals("ABC", result.field2)
assertEquals(1.234, result.field3, 0.001)
validatePOJOItem(format)
}

@Test
def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
val tempFile = createTempFile(fileContent)
val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)

format.setDelimiter('\n')
format.setFieldDelimiter(',')
format.setFields(Array(false, true), Array(classOf[String]): Array[Class[_]])
format.setFields(Array(true, true, false, true, false),
Array(classOf[String], classOf[Integer], classOf[java.lang.Double]): Array[Class[_]])
format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
format.configure(new Configuration)
format.open(tempFile)

var result = format.nextRecord(new POJOItem())
assertEquals("HELLO", result.field2)

result = format.nextRecord(result)
assertEquals("ABC", result.field2)
validatePOJOItem(format)
}
}

0 comments on commit 43ac967

Please sign in to comment.