I thought, we drop records with null key? No? -Matthias
On 6/23/17 12:25 AM, Damian Guy wrote: > My guess is it is because the record doesn't have a key, i.e., the key is > null. We have a fix for this in 0.11, in that we will skip records with a > null key during restore. > > On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax <matth...@confluent.io> wrote: > >> Hi, >> >> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1? >> >> It's unclear to me, how an NPE can occur. It seems to happen within >> Streams library. Might be a bug. Not sure atm. >> >> >> -Matthias >> >> On 6/22/17 9:43 AM, Shekar Tippur wrote: >>> Hello, >>> >>> I am trying to perform a simple join operation. I am using Kafka 0.10.2 >>> >>> I have a "raw" table and a "cache" topics and just 1 partition in my >> local >>> environment. >>> >>> ktable has these entries >>> >>> {"Joe": {"location": "US", "gender": "male"}} >>> {"Julie": {"location": "US", "gender": "female"}} >>> {"Kawasaki": {"location": "Japan", "gender": "male"}} >>> >>> The kstream gets a event >>> >>> {"user": "Joe", "custom": {"choice":"vegan"}} >>> >>> I want a output as a join >>> >>> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": >> "US", >>> "gender": "male"}*} } >>> >>> I want to take whats in ktable and add to enriched section of the output >>> stream. >>> >>> I have defined serde >>> >>> //This is the same serde code from the example. >>> >>> final TestStreamsSerializer<JsonNode> jsonSerializer = new >>> TestStreamsSerializer(); >>> final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new >>> TestStreamsDeserialzer(); >>> final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, >>> jsonDeserializer); >>> >>> // >>> >>> KStream<String,JsonNode> raw = builder.stream(Serdes.String(), >>> jsonSerde, "raw"); >>> KTable <String,JsonNode> cache = builder.table("cache", "local-cache"); >>> >>> raw.leftJoin(cache, >>> (record1, record2) -> record1.get("user") + "-" + >> record2).to("output"); >>> >>> I am having trouble understanding how to call the join api. >>> >>> With the above code, I seem to get a error: >>> >>> [2017-06-22 09:23:31,836] ERROR User provided listener >>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group >>> streams-pipe failed on partition assignment >>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) >>> >>> java.lang.NullPointerException >>> >>> at org.rocksdb.RocksDB.put(RocksDB.java:488) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) >>> >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) >>> >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) >>> >>> [2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected >>> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING. >>> (org.apache.kafka.streams.processor.internals.StreamThread) >>> >>> Exception in thread "StreamThread-1" >>> org.apache.kafka.streams.errors.StreamsException: stream-thread >>> [StreamThread-1] Failed to rebalance >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) >>> >>> Caused by: java.lang.NullPointerException >>> >>> at org.rocksdb.RocksDB.put(RocksDB.java:488) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) >>> >>> at >>> >> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >>> >>> at >>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) >>> >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) >>> >>> at >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) >>> >>> at >>> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) >>> >>> ... 1 more >>> >> >> >
signature.asc
Description: OpenPGP digital signature