Skip to content

Commit

Permalink
[FLINK-2692] Untangle CsvInputFormat
Browse files Browse the repository at this point in the history
This closes apache#1266
  • Loading branch information
zentol committed Nov 18, 2015
1 parent fc6fec7 commit bd61f2d
Show file tree
Hide file tree
Showing 18 changed files with 573 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import backtype.storm.topology.IRichBolt;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -119,7 +120,7 @@ private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvir
// read the text file from given input path
PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
.getForObject(new Sentence(""));
return env.createInput(new CsvInputFormat<Sentence>(new Path(
return env.createInput(new PojoCsvInputFormat<Sentence>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.tuple.Fields;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -122,7 +123,7 @@ private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutio
// read the text file from given input path
TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
.getForObject(new Tuple1<String>(""));
return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
sourceType);
Expand Down

This file was deleted.

Loading

0 comments on commit bd61f2d

Please sign in to comment.