[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236512#comment-15236512 ]
Greg Fodor commented on KAFKA-3544: ----------------------------------- Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.Long(), userSpaceBroadcastSerde, "positron-db-user_space_broadcasts") .map((id, broadcast) -> KeyValue.pair(broadcast.getUserId().toString(), broadcast)) .to(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id"); KTable<String, UserSpaceBroadcasts> userSpaceBroadcastsByUserId = builder .stream(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id") .aggregateByKey(...); {code} In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating a KTable off of that output by tapping it and then aggregating. (Given our discussions on other tickets there may be a way to simplify this, but I wanted to capture it as-is for this report.) The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2] Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) {code} > Missing topics on startup > ------------------------- > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Greg Fodor > Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)