Hi Anuj,

it looks to me that your input GenericRecords don't conform with your
output schema schemaSubject. At least, the stack trace says that your
output schema expects some String field but the field was actually some
ArrayList. Consequently, I would suggest to verify that your input data has
the right format and if not to filter those records out which are
non-conformant.

Cheers,
Till

On Sat, Feb 29, 2020 at 2:13 PM aj <ajainje...@gmail.com> wrote:

> Hi All,
>
> i have Written a consumer that read from kafka topic and write the data in
> parquet format using StreamSink . But i am getting following error. Its
> runs for some hours than start failing with this excpetions. I tried to
> restart it but failing with same exceptions.After i restart with latest
> offset it started working fine for soem hours and than again fail. I am not
> able to find root cause for this issue.
>
> java.lang.Exception: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
>  by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
> java.lang.CharSequence
>     at 
> org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
>     at 
> org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
>     at 
> org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
>     at 
> org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
>     at 
> org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>     at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
>     at 
> org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
>
> *code  :*
>
>
> *DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);*
>
> *
> final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
>                     (path, 
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
>                     .withBucketAssigner(new EventTimeBucketAssigner())
>                     .build();
>
> sourceStream.addSink(sink).setParallelism(parallelism);*
>
> I need to undetstand why its ran for few hours than start failing. Please 
> help me to understand this.
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to