Hi All, i have Written a consumer that read from kafka topic and write the data in parquet format using StreamSink . But i am getting following error. Its runs for some hours than start failing with this excpetions. I tried to restart it but failing with same exceptions.After i restart with latest offset it started working fine for soem hours and than again fail. I am not able to find root cause for this issue.
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371) at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346) at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278) at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191) at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) *code :* *DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);* * final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject))) .withBucketAssigner(new EventTimeBucketAssigner()) .build(); sourceStream.addSink(sink).setParallelism(parallelism);* I need to undetstand why its ran for few hours than start failing. Please help me to understand this. -- Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>