Ah, that is good to hear.

I think we should improve the error message there.

On Wed, May 13, 2015 at 2:41 PM, Pa Rö <paul.roewer1...@googlemail.com>
wrote:

> hi stephan,
>
> i have found the problem, something was wrong at the read and write
> function from my data object (implements Writable),
> now it's work.
>
> best regards
> paul
>
>
>
> 2015-05-13 13:32 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Hi Paul!
>>
>> Thank you for reporting this. This really seems like it should not happen
>> ;-)
>>
>> Is this error reproducable? If yes, we can probably fix it well...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, May 13, 2015 at 1:16 PM, Pa Rö <paul.roewer1...@googlemail.com>
>> wrote:
>>
>>> my function code:
>>> private static DataSet<GeoTimeDataTupel>
>>> getPointDataSet(ExecutionEnvironment env) {
>>>         // load properties
>>>         Properties pro = new Properties();
>>>         try {
>>>             pro.load(new
>>> FileInputStream("./resources/config.properties"));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>         String inputFile = pro.getProperty("input");
>>>         // map csv file
>>>         return env.readCsvFile(inputFile)
>>>             .ignoreInvalidLines()
>>>             .fieldDelimiter('\u0009')
>>>             .lineDelimiter("\n")
>>>             .includeFields(true, true, false, false, false, false,
>>> false, false, false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, true, true
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false)
>>>             .types(String.class, Long.class, Double.class, Double.class)
>>>             .map(new TuplePointConverter());
>>>     }
>>>
>>> and i use the GDET data from here:
>>>
>>> http://data.gdeltproject.org/events/index.html
>>>
>>> 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:
>>>
>>>> hi,
>>>>
>>>> i read a csv file from disk with flink (java, maven version 8.1) and
>>>> get the following exception:
>>>>
>>>> ERROR operators.DataSinkTask: Error in user code: Channel received an
>>>> event before completing the current partial record.:  DataSink(Print to
>>>> System.out) (4/4)
>>>> java.lang.IllegalStateException: Channel received an event before
>>>> completing the current partial record.
>>>>     at
>>>> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
>>>>     at
>>>> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
>>>>     at
>>>> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
>>>>     at
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
>>>>     at
>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
>>>>     at
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> my code:
>>>>
>>>> public class FlinkMain {
>>>>
>>>>     public static void main(String[] args) {
>>>>         // set up execution environment
>>>>         ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>         //env.setDegreeOfParallelism(1);
>>>>         // get input points
>>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>>         points.print();
>>>>         // execute program
>>>>         try {
>>>>             env.execute("KMeans Flink");
>>>>         } catch (Exception e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>> maybe someone have a solution?
>>>>
>>>> best regards paul
>>>>
>>>
>>>
>>
>

Reply via email to