I think I figured out what the problem is, though I'm not sure how to fix it.
I've managed to debug through the embedded broker's callback for the TopicChangeListener#handleChildChange() int he PartitionStateMachine class. The following line from that function that's failing look this: val addedPartitionReplicaAssignment = ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) Inside the getReplicaAssignmentForTopics() it is pulling back a json blob from the /brokers/topics/testtopic znode's data and it appears the json blob has some extra bytes @ the beginning of it that are making it unparseable once pulled from zookeeper. Any ideas to what this could be? I'm using 0.8.2.0- this is really what's holding me back right now from getting my tests functional. On Thu, May 14, 2015 at 4:29 PM, Corey Nolet <cjno...@gmail.com> wrote: > I raised the log levels to try to figure out what happens. I see log > statements on the broker stating: > > "New topic creation callback for " > "New partition creation callback for " > "Invoking state change to NewPartition for partitions " > "Invoking state change to OnlinePartitions for partitions " > "Error while fetching metadata for partition [testtopic, 0] > kafka.common.LeaderNotAvailableExzception: No leader exists for partition > 0..." > > I'm not sure what's happening between the time I create my topic and the > time the broker sees that it needs to add the partition assignment to > zookeeper with itself as the leader but it's strange that the log messages > above seem like they are missing the data. "New topic creation callback for > " seems like it should be listing a topic and not blank. > > Any ideas? > > On Thu, May 14, 2015 at 1:00 PM, Corey Nolet <cjno...@gmail.com> wrote: > >> I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I >> found on Github) so that I can run an end-to-end test ingesting data >> through a kafka topic with consumers in Spark Streaming pushing to >> Accumulo. >> >> Thus far, my code is doing this: >> >> 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers >> from the mini Accumulo cluster) >> 2) Creating a topic using AdminUtil >> 3) Starting up a Spark streaming context using a Kafka stream that puts >> all data into Accumulo >> 4) Creating a producer and sending a message to the Kafka topic. >> >> >> Looking @ the topic metadata in zookeeper after the topic is created, >> let's say "testtopic", I never see the metadata for a leader in >> /brokers/topics/testtopic/partitions show up. If I understand correctly, >> creating a topic does this: >> >> 1) Adds a persistent node into Zookeeper with some json data to denote >> the topic's name as well as the partitions and the list of each broker id >> for each partition. >> 2) I am still in the process of digging into this part but I think the >> first item in the list of replicas for each partition is used to define the >> initial leader and the leader is notified via a watcher and told to create >> an ephemeral node so that it can know when that node goes down to assign >> another. >> >> If I'm correct about #2, it seems like that watcher is never being >> invoked. Any attempt to produce to the topic just returns an error back to >> the producer that says there was no leader selected. >> >> Anything advice would be much appreciated. I really would like to get our >> stack tested fully through automated testing and Kafka is the last piece we >> need to assemble. >> >> >> >