Sanjar Akhmedov created FLINK-3101:
--------------------------------------

             Summary: Flink Kafka consumer crashes with NPE when it sees 
deleted record
                 Key: FLINK-3101
                 URL: https://issues.apache.org/jira/browse/FLINK-3101
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 0.10.1, 1.0.0
         Environment: Apache Flink 0.10.1 binary for Hadoop 2.6.0       with 
Scala 2.10. 
            Reporter: Sanjar Akhmedov


Kafka allows a records to be deleted from the log by sending a record with key 
and null payload. Consumers still can see those null values (deletes) before 
they are compacted (delete retention point).

Flink Kafka consumer crashes with NPE when it sees such record:
{noformat}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to