I think I figured out what the problem is, though I'm not sure how to fix
it.


I've managed to debug through the embedded broker's callback for the
TopicChangeListener#handleChildChange() int he PartitionStateMachine class.

The following line from that function that's failing look this:

val addedPartitionReplicaAssignment =
ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
from the /brokers/topics/testtopic znode's data and it appears the json
blob has some extra bytes @ the beginning of it that are making it
unparseable once pulled from zookeeper.

Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
holding me back right now from getting my tests functional.


On Thu, May 14, 2015 at 4:29 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I raised the log levels to try to figure out what happens. I see log
> statements on the broker stating:
>
> "New topic creation callback for "
> "New partition creation callback for "
> "Invoking state change to NewPartition for partitions "
> "Invoking state change to OnlinePartitions for partitions "
> "Error while fetching metadata for partition [testtopic, 0]
> kafka.common.LeaderNotAvailableExzception: No leader exists for partition
> 0..."
>
> I'm not sure what's happening between the time I create my topic and the
> time the broker sees that it needs to add the partition assignment to
> zookeeper with itself as the leader but it's strange that the log messages
> above seem like they are missing the data. "New topic creation callback for
> " seems like it should be listing a topic and not blank.
>
> Any ideas?
>
> On Thu, May 14, 2015 at 1:00 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
>> found on Github) so that I can run an end-to-end test ingesting data
>> through a kafka topic with consumers in Spark Streaming pushing to
>> Accumulo.
>>
>> Thus far, my code is doing this:
>>
>> 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
>> from the mini Accumulo cluster)
>> 2) Creating a topic using AdminUtil
>> 3) Starting up a Spark streaming context using a Kafka stream that puts
>> all data into Accumulo
>> 4) Creating a producer and sending a message to the Kafka topic.
>>
>>
>> Looking @ the topic metadata in zookeeper after the topic is created,
>> let's say "testtopic", I never see the metadata for a leader in
>> /brokers/topics/testtopic/partitions show up. If I understand correctly,
>> creating a topic does this:
>>
>> 1) Adds a persistent node into Zookeeper with some json data to denote
>> the topic's name as well as the partitions and the list of each broker id
>> for each partition.
>> 2) I am still in the process of digging into this part but I think the
>> first item in the list of replicas for each partition is used to define the
>> initial leader and the leader is notified via a watcher and told to create
>> an ephemeral node so that it can know when that node goes down to assign
>> another.
>>
>> If I'm correct about #2, it seems like that watcher is never being
>> invoked. Any attempt to produce to the topic just returns an error back to
>> the producer that says there was no leader selected.
>>
>> Anything advice would be much appreciated. I really would like to get our
>> stack tested fully through automated testing and Kafka is the last piece we
>> need to assemble.
>>
>>
>>
>

Reply via email to