Hi,

I have created Confluent Connect (http://docs.confluent.io/2.0.1/connect/?) 
sink and source tasks. While working in standalone mode there are no errrors or 
exceptions.


But, when i start tasks in distributed mode i get NullPointerException while 
OffsetStorageWriter is executing doFlush() method.

Full stack trace:


ERROR Unhandled exception when committing 
WorkerSourceTask{id=distributed-s3-source-0}:  
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
java.lang.NullPointerException
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.set(KafkaOffsetBackingStore.java:122)
at 
org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:161)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:267)
at 
org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
at 
org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


After a closer look i have determined that in KafkaOffsetBackingStore in set() 
method, parameter [final Map<ByteBuffer, ByteBuffer> values] is null, therefore 
next line:

 - SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), 
callback);      throws exception.

After that exception sink and source tasks continue working without any 
problems.
When kafka tries to do another flush, it throws following exception:

ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the 
framework should not allow this 
(org.apache.kafka.connect.storage.OffsetStorageWriter:108)
[2016-03-23 14:57:33,220] ERROR Unhandled exception when committing 
WorkerSourceTask{id=distributed-s3-source-0}:  
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
already flushing
at 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:227)
at 
org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
at 
org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Does anyone know how to fix this exception?


I am using confluent-2.0.1 with kafka-9.0.1-cp1.




Reply via email to