Hello,

I am trying to perform a simple join operation. I am using Kafka 0.10.2

I have a "raw" table and a "cache" topics and just 1 partition in my local
environment.

ktable has these entries

{"Joe": {"location": "US", "gender": "male"}}
{"Julie": {"location": "US", "gender": "female"}}
{"Kawasaki": {"location": "Japan", "gender": "male"}}

The kstream gets a event

{"user": "Joe", "custom": {"choice":"vegan"}}

I want a output as a join

{"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
"gender": "male"}*} }

I want to take whats in ktable and add to enriched section of the output
stream.

I have defined serde

//This is the same serde code from the example.

final TestStreamsSerializer<JsonNode> jsonSerializer = new
TestStreamsSerializer();
final TestStreamsDeserialzer<JsonNode> jsonDeserializer = new
TestStreamsDeserialzer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);

//

KStream<String,JsonNode> raw = builder.stream(Serdes.String(),
jsonSerde, "raw");
KTable <String,JsonNode> cache = builder.table("cache", "local-cache");

raw.leftJoin(cache,
        (record1, record2) -> record1.get("user") + "-" + record2).to("output");

I am having trouble understanding how to call the join api.

With the above code, I seem to get a error:

[2017-06-22 09:23:31,836] ERROR User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
streams-pipe failed on partition assignment
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

java.lang.NullPointerException

at org.rocksdb.RocksDB.put(RocksDB.java:488)

at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)

at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)

at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)

at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)

at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)

at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

[2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
(org.apache.kafka.streams.processor.internals.StreamThread)

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] Failed to rebalance

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

Caused by: java.lang.NullPointerException

at org.rocksdb.RocksDB.put(RocksDB.java:488)

at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)

at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)

at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)

at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)

at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)

at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

... 1 more

Reply via email to