hi stephan,

i have found the problem, something was wrong at the read and write
function from my data object (implements Writable),
now it's work.

best regards
paul



2015-05-13 13:32 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi Paul!
>
> Thank you for reporting this. This really seems like it should not happen
> ;-)
>
> Is this error reproducable? If yes, we can probably fix it well...
>
> Greetings,
> Stephan
>
>
> On Wed, May 13, 2015 at 1:16 PM, Pa Rö <paul.roewer1...@googlemail.com>
> wrote:
>
>> my function code:
>> private static DataSet<GeoTimeDataTupel>
>> getPointDataSet(ExecutionEnvironment env) {
>>         // load properties
>>         Properties pro = new Properties();
>>         try {
>>             pro.load(new
>> FileInputStream("./resources/config.properties"));
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>         String inputFile = pro.getProperty("input");
>>         // map csv file
>>         return env.readCsvFile(inputFile)
>>             .ignoreInvalidLines()
>>             .fieldDelimiter('\u0009')
>>             .lineDelimiter("\n")
>>             .includeFields(true, true, false, false, false, false, false,
>> false, false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, true, true
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false)
>>             .types(String.class, Long.class, Double.class, Double.class)
>>             .map(new TuplePointConverter());
>>     }
>>
>> and i use the GDET data from here:
>>
>> http://data.gdeltproject.org/events/index.html
>>
>> 2015-05-13 13:09 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:
>>
>>> hi,
>>>
>>> i read a csv file from disk with flink (java, maven version 8.1) and get
>>> the following exception:
>>>
>>> ERROR operators.DataSinkTask: Error in user code: Channel received an
>>> event before completing the current partial record.:  DataSink(Print to
>>> System.out) (4/4)
>>> java.lang.IllegalStateException: Channel received an event before
>>> completing the current partial record.
>>>     at
>>> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
>>>     at
>>> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
>>>     at
>>> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
>>>     at
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
>>>     at
>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
>>>     at
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> my code:
>>>
>>> public class FlinkMain {
>>>
>>>     public static void main(String[] args) {
>>>         // set up execution environment
>>>         ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>         //env.setDegreeOfParallelism(1);
>>>         // get input points
>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>         points.print();
>>>         // execute program
>>>         try {
>>>             env.execute("KMeans Flink");
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>> maybe someone have a solution?
>>>
>>> best regards paul
>>>
>>
>>
>

Reply via email to