my function code: private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) { // load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } String inputFile = pro.getProperty("input"); // map csv file return env.readCsvFile(inputFile) .ignoreInvalidLines() .fieldDelimiter('\u0009') .lineDelimiter("\n") .includeFields(true, true, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, true, true , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false) .types(String.class, Long.class, Double.class, Double.class) .map(new TuplePointConverter()); }
and i use the GDET data from here: http://data.gdeltproject.org/events/index.html 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > hi, > > i read a csv file from disk with flink (java, maven version 8.1) and get > the following exception: > > ERROR operators.DataSinkTask: Error in user code: Channel received an > event before completing the current partial record.: DataSink(Print to > System.out) (4/4) > java.lang.IllegalStateException: Channel received an event before > completing the current partial record. > at > org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) > at > org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) > at > org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:745) > > my code: > > public class FlinkMain { > > public static void main(String[] args) { > // set up execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > //env.setDegreeOfParallelism(1); > // get input points > DataSet<GeoTimeDataTupel> points = getPointDataSet(env); > points.print(); > // execute program > try { > env.execute("KMeans Flink"); > } catch (Exception e) { > e.printStackTrace(); > } > } > > maybe someone have a solution? > > best regards paul >