It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by https://issues.apache.org/jira/browse/KAFKA-4494
On Tue, 1 Aug 2017 at 15:40 Marcus Clendenin <marcusc...@gmail.com> wrote: > Hi All, > > > > I have a kafka streams application that is doing a join between a KTable > and a KStream and it seems that after it starts loading the KTable if I > either restart the application or start a new jar with the same > application-id it starts failing. It looks like when it tries to rejoin the > application-id and do a rebalance of the partitions it throws an error > regarding a null value coming from RocksDB. Any thoughts on where this is > coming from? I am running this inside of a docker container if that affects > anything but the RocksDB folder is mounted as a volume on the host machine. > > > > > > Stacktrace: > > > > 2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Starting > > 2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.c.consumer.internals.AbstractCoordinator - Discovered coordinator > xxxx.com:9092 (id: 2147483535 <(214)%20748-3535> rack: null) for group > test-application-id. > > 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.c.consumer.internals.ConsumerCoordinator - Revoking previously > assigned partitions [] for group test-application-id > > 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] at state RUNNING: partitions [] revoked at the beginning > of consumer rebalance. > > 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. > > 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams org.apache.kafka.streams.KafkaStreams > - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c] > State transition from RUNNING to REBALANCING. > > 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Updating suspended tasks to contain active tasks [] > > 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Removing all active tasks [] > > 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Removing all standby tasks [] > > 2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.c.consumer.internals.AbstractCoordinator - (Re-)joining group > test-application-id > > 2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread > [StreamThread-1] Constructed client metadata > {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null, > > consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441], > state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) > prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member > subscriptions. > > 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread > [StreamThread-1] Completed validating internal topics in partition assignor > > 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread > [StreamThread-1] Completed validating internal topics in partition assignor > > 2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread > [StreamThread-1] Assigned tasks to clients as > {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3, > 0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks: > ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}. > > 2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.c.consumer.internals.AbstractCoordinator - Successfully joined group > test-application-id with generation 56 > > 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.k.c.consumer.internals.ConsumerCoordinator - Setting newly assigned > partitions [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3, > stream_topic-4, stream_topic-5, table_topic-4, table_topic-5, > table_topic-0, table_topic-1, table_topic-2, table_topic-3] for group > test-application-id > > 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] at state PARTITIONS_REVOKED: new partitions > [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3, > stream_topic-4, stream_topic-5, table_topic-4, table_topic-5, > table_topic-0, table_topic-1, table_topic-2, table_topic-3] assigned at the > end of consumer rebalance. > > 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams org.apache.kafka.streams.KafkaStreams > - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c] > State transition from REBALANCING to REBALANCING. > > 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Creating active task 0_0 with assigned partitions > [stream_topic-0, table_topic-0] > > 2017-08-01 13:31:50,448 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamTask - task [0_0] Initializing > state stores > > 2017-08-01 13:31:50,743 trackingId=X thread=[StreamThread-1] logType=ERROR > > module=kafka.streams > o.a.k.c.consumer.internals.ConsumerCoordinator - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > test-application-id failed on partition assignment > > java.lang.NullPointerException: null > > at org.rocksdb.RocksDB.put(RocksDB.java:488) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) > > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > 2017-08-01 13:31:50,752 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Shutting down > > 2017-08-01 13:31:50,753 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > > 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Removing all active tasks [] > > 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Removing all standby tasks [] > > 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Stream thread shutdown complete > > 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=WARN > > module=kafka.streams > o.a.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Unexpected state transition from ASSIGNING_PARTITIONS to > NOT_RUNNING. >