hi stephan, i have found the problem, something was wrong at the read and write function from my data object (implements Writable), now it's work.
best regards paul 2015-05-13 13:32 GMT+02:00 Stephan Ewen <se...@apache.org>: > Hi Paul! > > Thank you for reporting this. This really seems like it should not happen > ;-) > > Is this error reproducable? If yes, we can probably fix it well... > > Greetings, > Stephan > > > On Wed, May 13, 2015 at 1:16 PM, Pa Rö <paul.roewer1...@googlemail.com> > wrote: > >> my function code: >> private static DataSet<GeoTimeDataTupel> >> getPointDataSet(ExecutionEnvironment env) { >> // load properties >> Properties pro = new Properties(); >> try { >> pro.load(new >> FileInputStream("./resources/config.properties")); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> String inputFile = pro.getProperty("input"); >> // map csv file >> return env.readCsvFile(inputFile) >> .ignoreInvalidLines() >> .fieldDelimiter('\u0009') >> .lineDelimiter("\n") >> .includeFields(true, true, false, false, false, false, false, >> false, false, false, false >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false, >> false, true, true >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false) >> .types(String.class, Long.class, Double.class, Double.class) >> .map(new TuplePointConverter()); >> } >> >> and i use the GDET data from here: >> >> http://data.gdeltproject.org/events/index.html >> >> 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >> >>> hi, >>> >>> i read a csv file from disk with flink (java, maven version 8.1) and get >>> the following exception: >>> >>> ERROR operators.DataSinkTask: Error in user code: Channel received an >>> event before completing the current partial record.: DataSink(Print to >>> System.out) (4/4) >>> java.lang.IllegalStateException: Channel received an event before >>> completing the current partial record. >>> at >>> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) >>> at >>> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) >>> at >>> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) >>> at >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) >>> at >>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) >>> at >>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> my code: >>> >>> public class FlinkMain { >>> >>> public static void main(String[] args) { >>> // set up execution environment >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> //env.setDegreeOfParallelism(1); >>> // get input points >>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>> points.print(); >>> // execute program >>> try { >>> env.execute("KMeans Flink"); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> maybe someone have a solution? >>> >>> best regards paul >>> >> >> >