Ah, that is good to hear. I think we should improve the error message there.
On Wed, May 13, 2015 at 2:41 PM, Pa Rö <paul.roewer1...@googlemail.com> wrote: > hi stephan, > > i have found the problem, something was wrong at the read and write > function from my data object (implements Writable), > now it's work. > > best regards > paul > > > > 2015-05-13 13:32 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> Hi Paul! >> >> Thank you for reporting this. This really seems like it should not happen >> ;-) >> >> Is this error reproducable? If yes, we can probably fix it well... >> >> Greetings, >> Stephan >> >> >> On Wed, May 13, 2015 at 1:16 PM, Pa Rö <paul.roewer1...@googlemail.com> >> wrote: >> >>> my function code: >>> private static DataSet<GeoTimeDataTupel> >>> getPointDataSet(ExecutionEnvironment env) { >>> // load properties >>> Properties pro = new Properties(); >>> try { >>> pro.load(new >>> FileInputStream("./resources/config.properties")); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> String inputFile = pro.getProperty("input"); >>> // map csv file >>> return env.readCsvFile(inputFile) >>> .ignoreInvalidLines() >>> .fieldDelimiter('\u0009') >>> .lineDelimiter("\n") >>> .includeFields(true, true, false, false, false, false, >>> false, false, false, false, false >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false, >>> false, true, true >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false) >>> .types(String.class, Long.class, Double.class, Double.class) >>> .map(new TuplePointConverter()); >>> } >>> >>> and i use the GDET data from here: >>> >>> http://data.gdeltproject.org/events/index.html >>> >>> 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >>> >>>> hi, >>>> >>>> i read a csv file from disk with flink (java, maven version 8.1) and >>>> get the following exception: >>>> >>>> ERROR operators.DataSinkTask: Error in user code: Channel received an >>>> event before completing the current partial record.: DataSink(Print to >>>> System.out) (4/4) >>>> java.lang.IllegalStateException: Channel received an event before >>>> completing the current partial record. >>>> at >>>> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) >>>> at >>>> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) >>>> at >>>> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) >>>> at >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) >>>> at >>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) >>>> at >>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> my code: >>>> >>>> public class FlinkMain { >>>> >>>> public static void main(String[] args) { >>>> // set up execution environment >>>> ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> //env.setDegreeOfParallelism(1); >>>> // get input points >>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>>> points.print(); >>>> // execute program >>>> try { >>>> env.execute("KMeans Flink"); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> maybe someone have a solution? >>>> >>>> best regards paul >>>> >>> >>> >> >