Hi All,
I have a kafka streams application that is doing a join between a KTable and a KStream and it seems that after it starts loading the KTable if I either restart the application or start a new jar with the same application-id it starts failing. It looks like when it tries to rejoin the application-id and do a rebalance of the partitions it throws an error regarding a null value coming from RocksDB. Any thoughts on where this is coming from? I am running this inside of a docker container if that affects anything but the RocksDB folder is mounted as a volume on the host machine. Stacktrace: 2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Starting 2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.c.consumer.internals.AbstractCoordinator - Discovered coordinator xxxx.com:9092 (id: 2147483535 rack: null) for group test-application-id. 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.c.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group test-application-id 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams org.apache.kafka.streams.KafkaStreams - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c] State transition from RUNNING to REBALANCING. 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [] 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [] 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [] 2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.c.consumer.internals.AbstractCoordinator - (Re-)joining group test-application-id 2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null, consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions. 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.s.p.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}. 2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.c.consumer.internals.AbstractCoordinator - Successfully joined group test-application-id with generation 56 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.k.c.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3, stream_topic-4, stream_topic-5, table_topic-4, table_topic-5, table_topic-0, table_topic-1, table_topic-2, table_topic-3] for group test-application-id 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] at state PARTITIONS_REVOKED: new partitions [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3, stream_topic-4, stream_topic-5, table_topic-4, table_topic-5, table_topic-0, table_topic-1, table_topic-2, table_topic-3] assigned at the end of consumer rebalance. 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams org.apache.kafka.streams.KafkaStreams - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c] State transition from REBALANCING to REBALANCING. 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating active task 0_0 with assigned partitions [stream_topic-0, table_topic-0] 2017-08-01 13:31:50,448 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamTask - task [0_0] Initializing state stores 2017-08-01 13:31:50,743 trackingId=X thread=[StreamThread-1] logType=ERROR module=kafka.streams o.a.k.c.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group test-application-id failed on partition assignment java.lang.NullPointerException: null at org.rocksdb.RocksDB.put(RocksDB.java:488) at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) 2017-08-01 13:31:50,752 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Shutting down 2017-08-01 13:31:50,753 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [] 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [] 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Stream thread shutdown complete 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=WARN module=kafka.streams o.a.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Unexpected state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.