Hi Paul! Thank you for reporting this. This really seems like it should not happen ;-)
Is this error reproducable? If yes, we can probably fix it well... Greetings, Stephan On Wed, May 13, 2015 at 1:16 PM, Pa Rö <paul.roewer1...@googlemail.com> wrote: > my function code: > private static DataSet<GeoTimeDataTupel> > getPointDataSet(ExecutionEnvironment env) { > // load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > String inputFile = pro.getProperty("input"); > // map csv file > return env.readCsvFile(inputFile) > .ignoreInvalidLines() > .fieldDelimiter('\u0009') > .lineDelimiter("\n") > .includeFields(true, true, false, false, false, false, false, > false, false, false, false > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false, > false, true, true > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false) > .types(String.class, Long.class, Double.class, Double.class) > .map(new TuplePointConverter()); > } > > and i use the GDET data from here: > > http://data.gdeltproject.org/events/index.html > > 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > >> hi, >> >> i read a csv file from disk with flink (java, maven version 8.1) and get >> the following exception: >> >> ERROR operators.DataSinkTask: Error in user code: Channel received an >> event before completing the current partial record.: DataSink(Print to >> System.out) (4/4) >> java.lang.IllegalStateException: Channel received an event before >> completing the current partial record. >> at >> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) >> at >> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) >> at >> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) >> at >> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) >> at >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >> at java.lang.Thread.run(Thread.java:745) >> >> my code: >> >> public class FlinkMain { >> >> public static void main(String[] args) { >> // set up execution environment >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> //env.setDegreeOfParallelism(1); >> // get input points >> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >> points.print(); >> // execute program >> try { >> env.execute("KMeans Flink"); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> >> maybe someone have a solution? >> >> best regards paul >> > >