[
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236512#comment-15236512
]
Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM:
------------------------------------------------------------
Not sure of the best way to share the topology. Here's the relevant part of the
code:
{code}
builder
.stream(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming")
.map((k, v) -> KeyValue.pair(v.getUserId(), v))
.to(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming-user_id");
KStream<String, RoomOperationMessage>
roomOperationMessagesByUserId = builder
.stream(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming-user_id");
KStream<String, UserBroadcastsMessage>
userBroadcastsMessagesByUserId =
roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId,
UserBroadcastsMessage::new);
{code}
In this example roomOperationSerde is a Serde for a custom avro type. I'm
basically pivoting the first stream onto a foreign key and then creating
another KStream off of that output for a join downstream.
The topology is failing to build on the user_space_broadcasts-user_id topic:
{code}
Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
room_operation_message_incoming-user_id
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
[0/1952]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete
[StreamThread-2]
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
room_operation_message_incoming-user_id
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
{code}
was (Author: gfodor):
Not sure of the best way to share the topology. Here's the relevant part of the
code:
{code}
builder
.stream(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming")
.map((k, v) -> KeyValue.pair(v.getUserId(), v))
.to(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming-user_id");
KStream<String, RoomOperationMessage>
roomOperationMessagesByUserId = builder
.stream(Serdes.String(), roomOperationSerde,
"room_operation_message_incoming-user_id");
KStream<String, UserBroadcastsMessage>
userBroadcastsMessagesByUserId =
roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId,
UserBroadcastsMessage::new);
{code}
In this example roomOperationSerde is a Serde for a custom avro type. I'm
basically pivoting the first stream onto a foreign key and then creating
another KStream off of that output for a join downstream.
The topology is failing to build on the user_space_broadcasts-user_id topic:
{code}
Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
room_operation_message_incoming-user_id
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
[0/1952]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete
[StreamThread-2]
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
room_operation_message_incoming-user_id
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
{code}
> Missing topics on startup
> -------------------------
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Greg Fodor
> Assignee: Guozhang Wang
> Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores,
> on the first run I get errors due to some of the intermediate topics not
> existing. Subsequent runs work OK. My assumption is streams may be creating
> topics lazily, so if downstream tasks are initializing before their parents
> have had a chance to create their necessary topics then the children will
> attempt to start consuming from topics that do not exist yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)