auto topic creation not working for attempts to consume non-existing topic

2014-10-02 Thread Stevo Slavić
Hello Apache Kafka community,

auto.create.topics.enable configuration option docs state:
"Enable auto creation of topic on the server. If this is set to true then
attempts to produce, consume, or fetch metadata for a non-existent topic
will automatically create it with the default replication factor and number
of partitions."

I read this that topic should be created on any attempt to consume
non-existing topic.

With auto.create.topics.enable left at default or explicitly set to true,
attempts to consume non existing topic, using blocking consumer, or a
non-blocking consumer with positive consumer.timeout.ms configured, will
not result in topic creation (I cannot see one registered in ZooKeeper).

Additionally, for non-blocking consumer with timeout, no offset will be
recorded. This further means, if such consumer had auto.offset.reset set to
largest, that it will miss at least one message (initial one that when
published creates the topic), even though consumer attempted to read before
first message was published.

I'm using Kafka 0.8.1.1 but I see same issue exists in current trunk.

Is this a known issue? Or are my expectations/assumptions wrong and this is
expected behavior?

Kind regards,
Stevo Slavic.


Re: auto topic creation not working for attempts to consume non-existing topic

2014-10-03 Thread Stevo Slavić
OK, thanks,

Do you agree then that the docs for auto topic creation configuration
parameter are misleading and should be changed?

Another issue is that when the topic auto creation is disabled, attempts to
publish a message on a non-existing topic using high-level api will throw a
generic FailedToSendMessageException (even when message.send.max.retries is
0) without having UnknownTopicOrPartitionException at least as cause. Is
this feature or a bug, and more importantly could it be improved?

Kind regards,
Stevo Slavic.
On Oct 3, 2014 6:30 AM, "Jun Rao"  wrote:

> In general, only writers should trigger auto topic creation, but not the
> readers. So, a topic can be auto created by the producer, but not the
> consumer.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 2, 2014 at 2:44 PM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > auto.create.topics.enable configuration option docs state:
> > "Enable auto creation of topic on the server. If this is set to true then
> > attempts to produce, consume, or fetch metadata for a non-existent topic
> > will automatically create it with the default replication factor and
> number
> > of partitions."
> >
> > I read this that topic should be created on any attempt to consume
> > non-existing topic.
> >
> > With auto.create.topics.enable left at default or explicitly set to true,
> > attempts to consume non existing topic, using blocking consumer, or a
> > non-blocking consumer with positive consumer.timeout.ms configured, will
> > not result in topic creation (I cannot see one registered in ZooKeeper).
> >
> > Additionally, for non-blocking consumer with timeout, no offset will be
> > recorded. This further means, if such consumer had auto.offset.reset set
> to
> > largest, that it will miss at least one message (initial one that when
> > published creates the topic), even though consumer attempted to read
> before
> > first message was published.
> >
> > I'm using Kafka 0.8.1.1 but I see same issue exists in current trunk.
> >
> > Is this a known issue? Or are my expectations/assumptions wrong and this
> is
> > expected behavior?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


HL publishing, retries and potential race condition

2014-10-06 Thread Stevo Slavić
Hello Apache Kafka community,

When trying to publish (using high level sync producer) a message on a
non-existing topic (with implicit topic creation enabled), with
message.send.max.retries set to 1, sending will fail with
FailedToSendMessageException (and LeaderNotAvailableException swallowed).

Am I doing something wrong, are my expectations wrong that this should work
without exception thrown, or is this a known Kafka (with ZooKeeper as
storage) issue?

I'm using Kafka 0.8.1.1.

Kind regards,
Stevo Slavic.


Re: auto topic creation not working for attempts to consume non-existing topic

2014-10-06 Thread Stevo Slavić
Created https://issues.apache.org/jira/browse/KAFKA-1674 for likely docs
issue.

Is this umbrella/epic for new producer api
https://issues.apache.org/jira/browse/KAFKA-1239 ?
Is it planned to be complete in 0.9.0 release? If so, then consider
switching in KAFKA-1239 0.9.0 from "affects version" to "fix version".

Thanks for fast feedback!

Kind regards,
Stevo Slavic

On Mon, Oct 6, 2014 at 5:17 AM, Jun Rao  wrote:

> Yes, the docs can be improved. Could you file a jira?
>
> For the 2nd issue, the new java producer handles this better.
>
> Thanks,
>
> jun
>
> On Fri, Oct 3, 2014 at 1:31 AM, Stevo Slavić  wrote:
>
> > OK, thanks,
> >
> > Do you agree then that the docs for auto topic creation configuration
> > parameter are misleading and should be changed?
> >
> > Another issue is that when the topic auto creation is disabled, attempts
> to
> > publish a message on a non-existing topic using high-level api will
> throw a
> > generic FailedToSendMessageException (even when message.send.max.retries
> is
> > 0) without having UnknownTopicOrPartitionException at least as cause. Is
> > this feature or a bug, and more importantly could it be improved?
> >
> > Kind regards,
> > Stevo Slavic.
> > On Oct 3, 2014 6:30 AM, "Jun Rao"  wrote:
> >
> > > In general, only writers should trigger auto topic creation, but not
> the
> > > readers. So, a topic can be auto created by the producer, but not the
> > > consumer.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 2, 2014 at 2:44 PM, Stevo Slavić 
> wrote:
> > >
> > > > Hello Apache Kafka community,
> > > >
> > > > auto.create.topics.enable configuration option docs state:
> > > > "Enable auto creation of topic on the server. If this is set to true
> > then
> > > > attempts to produce, consume, or fetch metadata for a non-existent
> > topic
> > > > will automatically create it with the default replication factor and
> > > number
> > > > of partitions."
> > > >
> > > > I read this that topic should be created on any attempt to consume
> > > > non-existing topic.
> > > >
> > > > With auto.create.topics.enable left at default or explicitly set to
> > true,
> > > > attempts to consume non existing topic, using blocking consumer, or a
> > > > non-blocking consumer with positive consumer.timeout.ms configured,
> > will
> > > > not result in topic creation (I cannot see one registered in
> > ZooKeeper).
> > > >
> > > > Additionally, for non-blocking consumer with timeout, no offset will
> be
> > > > recorded. This further means, if such consumer had auto.offset.reset
> > set
> > > to
> > > > largest, that it will miss at least one message (initial one that
> when
> > > > published creates the topic), even though consumer attempted to read
> > > before
> > > > first message was published.
> > > >
> > > > I'm using Kafka 0.8.1.1 but I see same issue exists in current trunk.
> > > >
> > > > Is this a known issue? Or are my expectations/assumptions wrong and
> > this
> > > is
> > > > expected behavior?
> > > >
> > > > Kind regards,
> > > > Stevo Slavic.
> > > >
> > >
> >
>


Re: Scala+Kafka puzzler

2014-10-10 Thread Stevo Slavić
Oh, right, well spotted, I was looking in wrong branch. In 0.8.2 branch
it's no longer private.

Thanks!

Kind regards,
Stevo Slavic

On Fri, Oct 10, 2014 at 12:21 PM, Sean Owen  wrote:

> It's private to package "kafka":
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/cluster/Broker.scala#L29
>
> On Fri, Oct 10, 2014 at 10:47 AM, Stevo Slavić  wrote:
> > Hello Apache Kafka community,
> >
> > Attached trivial Maven built project with Kafka code fails to compile,
> with
> > error:
> >
> > "class Broker in package cluster cannot be accessed in package
> > kafka.cluster"
> >
> > Does anyone have an idea what's the actual problem here and how to
> > workaround it?
> >
> > Kind regards,
> > Stevo Slavic.
>


Reusable consumer across consumer groups

2014-10-14 Thread Stevo Slavić
Hello Apache Kafka community,

Current (Kafka 0.8.1.1) high-level API's KafkaConsumer is not lightweight
object, it's creation takes some time and resources, and it does not seem
to be thread-safe. It's API also does not support reuse, for consuming
messages from different consumer groups.

I see even in the coming (0.8.2) redesigned API it will not be possible to
reuse consumer instance to poll messages from different consumer groups.

Can something be done to support this?

Would it help if there was consumer group as a separate entity from
consumer, for all the subscription management tasks?

Kind regards,
Stevo Slavic


Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Hello Apache Kafka users,

Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally),
with auto topic creation disabled, in a test I have topic created with
AdminUtils.createTopic (AdminUtils.topicExists returns true) but
KafkaProducer on send request keeps throwing
UnknownTopicOrPartitionException even after 100 retries, both when
topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
defaults, and when customized.

Am I doing something wrong or is this a known bug?

How long does it typically take for metadata to be refreshed?
How long does it take for leader to be elected?

Documentation for retry.backoff.ms states:
"Before each retry, the producer refreshes the metadata of relevant topics
to see if a new leader has been elected. Since leader election takes a bit
of time, this property specifies the amount of time that the producer waits
before refreshing the metadata."

Do I understand this docs correctly - on failure to send a message, such as
unknown topic, if retries are configured producer will wait for configured
retry.backoff.ms, then it will initiate and wait for metadata refresh to
complete, and only then retry sending?

Kind regards,
Stevo Slavic.


Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
kafka-topics.sh execution, from latest trunk:

~/git/oss/kafka [trunk|✔]
21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Error while executing topic command next on empty iterator
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)


Output from same command on 0.8.1 branch is better, but still same
exception:

~/git/oss/kafka [0.8.1|✔]
21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
Error while executing topic command null
java.util.NoSuchElementException
at scala.collection.IterableLike$class.head(IterableLike.scala:101)
at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang  wrote:

> Hello Stevo,
>
> Your understanding about the configs are correct, and it is indeed wired
> that the producer gets the exception after topic is created. Could you use
> the kafka-topics command to check if the leaders exist?
>
> kafka-topics.sh --zookeeper XXX --topic [topic-name] describe
>
> Guozhang
>
> On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka users,
> >
> > Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running
> locally),
> > with auto topic creation disabled, in a test I have topic created with
> > AdminUtils.createTopic (AdminUtils.topicExists returns true) but
> > KafkaProducer on send request keeps throwing
> > UnknownTopicOrPartitionException even after 100 retries, both when
> > topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
> > defaults, and when customized.
> >
> > Am I doing something wrong or is this a known bug?
> >
> > How long does it typically take for metadata to be refreshed?
> > How long does it take for leader to be elected?
> >
> > Documentation for retry.backoff.ms states:
> > "Before each retry, the producer refreshes the metadata of relevant
> topics
> > to see if a new leader has been elected. Since leader election takes a
> bit
> > of time, this property specifies the amount of time that the producer
> waits
> > before refreshing the metadata."
> >
> > Do I understand this docs correctly - on failure to send a message, such
> as
> > unknown topic, if retries are configured producer will wait for
> configured
> > retry.backoff.ms, then it will initiate and wait for metadata refresh to
> > complete, and only then retry sending?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>
>
>
> --
> -- Guozhang
>


Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Output on trunk is clean too, after clean build:

~/git/oss/kafka [trunk|✔]
22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
Error while executing topic command next on empty iterator
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić  wrote:

> kafka-topics.sh execution, from latest trunk:
>
> ~/git/oss/kafka [trunk|✔]
> 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Error while executing topic command next on empty iterator
> java.util.NoSuchElementException: next on empty iterator
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>
>
> Output from same command on 0.8.1 branch is better, but still same
> exception:
>
> ~/git/oss/kafka [0.8.1|✔]
> 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> Error while executing topic command null
> java.util.NoSuchElementException
> at scala.collection.IterableLike$class.head(IterableLike.scala:101)
> at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>
> On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang  wrote:
>
>> Hello Stevo,
>>
>> Your understanding about the configs are correct, and it is indeed wired
>> that the producer gets the exception after topic is created. Could you use
>> the kafka-topics command to check if the leaders exist?
>>
>> kafka-topics.sh --zookeeper XXX --topic [topic-name] describe
>>
>> Guozhang
>>
>> On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić  wrote:
>>
>> > Hello Apache Kafka users,
>> >
>> > Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running
>> locally),
>> > with auto topic creation disabled, in a test I have topic created with
>> > AdminUtils.createTopic (AdminUtils.topicExists returns true) but
>> > KafkaProducer on send request keeps throwing
>> > UnknownTopicOrPartitionException even after 100 retries, both when
>> > topic.metadata.refresh.interv

Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Still have to understand what is going on, but when I set
kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used in
AdminUtils calls, KafkaProducer could see created topic...
Default ZkSerializer is
org.I0Itec.zkclient.serialize.SerializableSerializer.

Kind regards,
Stevo Slavic.

On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić  wrote:

> Output on trunk is clean too, after clean build:
>
> ~/git/oss/kafka [trunk|✔]
> 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> Error while executing topic command next on empty iterator
> java.util.NoSuchElementException: next on empty iterator
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
> at
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>
> On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić  wrote:
>
>> kafka-topics.sh execution, from latest trunk:
>>
>> ~/git/oss/kafka [trunk|✔]
>> 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Error while executing topic command next on empty iterator
>> java.util.NoSuchElementException: next on empty iterator
>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>> at scala.collection.AbstractIterable.head(Iterable.scala:54)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
>> at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
>> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>>
>>
>> Output from same command on 0.8.1 branch is better, but still same
>> exception:
>>
>> ~/git/oss/kafka [0.8.1|✔]
>> 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
>> Error while executing topic command null
>> java.util.NoSuchElementException
>> at scala.collection.IterableLike$class.head(IterableLike.scala:101)
>> at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
>> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
>> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>>
>> On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang 
>> wrote:
>>
>>> Hello Stevo,
>>>
>>> Your understanding about the configs are correct, and it is indeed wired
>>> that the producer gets the exception after topic is created. Could you
>>> use
>>> the kafka-topics command

Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
It seems that used ZkSerializer has to be aligned with KafkaProducer
configured key.serializer.class.

On Thu, Oct 23, 2014 at 1:13 AM, Stevo Slavić  wrote:

> Still have to understand what is going on, but when I set
> kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used in
> AdminUtils calls, KafkaProducer could see created topic...
> Default ZkSerializer is
> org.I0Itec.zkclient.serialize.SerializableSerializer.
>
> Kind regards,
> Stevo Slavic.
>
> On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić  wrote:
>
>> Output on trunk is clean too, after clean build:
>>
>> ~/git/oss/kafka [trunk|✔]
>> 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
>> Error while executing topic command next on empty iterator
>> java.util.NoSuchElementException: next on empty iterator
>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>> at scala.collection.AbstractIterable.head(Iterable.scala:54)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
>> at
>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
>> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
>> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>>
>> On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić  wrote:
>>
>>> kafka-topics.sh execution, from latest trunk:
>>>
>>> ~/git/oss/kafka [trunk|✔]
>>> 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
>>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Error while executing topic command next on empty iterator
>>> java.util.NoSuchElementException: next on empty iterator
>>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>>> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>>> at scala.collection.AbstractIterable.head(Iterable.scala:54)
>>> at
>>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
>>> at
>>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
>>> at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
>>> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>>>
>>>
>>> Output from same command on 0.8.1 branch is better, but still same
>>> exception:
>>>
>>> ~/git/oss/kafka [0.8.1|✔]
>>> 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
>>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
>>> Error while executing topic command null
>>> java.util.NoSuchElementException
>>> at scala.collection.IterableLike$class.head(IterableLike.scala:101)
>>> at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
>>> at
>>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
>>> at
>>> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>>> at kafka.admin.TopicCommand$.describeTopic(TopicCom

Re: Reusable consumer across consumer groups

2014-10-23 Thread Stevo Slavić
Imagine exposing Kafka over various remoting protocols, where incoming
poll/read requests may come in concurrently for different consumer groups,
especially in a case with lots of different consumer groups.
If you create and destroy KafkaConsumer for each such request, response
times and throughput will be very low, and doing that is one of the ways to
reproduce https://issues.apache.org/jira/browse/KAFKA-1716

It would be better if one could reuse a (pool of) Consumer instances, and
through a read operation parameter specify for which consumer group should
read be performed.

Kind regards,
Stevo Slavic.

On Tue, Oct 14, 2014 at 6:17 PM, Neha Narkhede 
wrote:

> Stevo,
>
> The new consumer API is planned for 0.9, not 0.8.2. You can take a look at
> a detailed javadoc here
> <
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >
> .
>
> Can you explain why you would like to poll messages across consumer groups
> using just one instance?
>
> Thanks,
> Neha
>
> On Tue, Oct 14, 2014 at 1:03 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > Current (Kafka 0.8.1.1) high-level API's KafkaConsumer is not lightweight
> > object, it's creation takes some time and resources, and it does not seem
> > to be thread-safe. It's API also does not support reuse, for consuming
> > messages from different consumer groups.
> >
> > I see even in the coming (0.8.2) redesigned API it will not be possible
> to
> > reuse consumer instance to poll messages from different consumer groups.
> >
> > Can something be done to support this?
> >
> > Would it help if there was consumer group as a separate entity from
> > consumer, for all the subscription management tasks?
> >
> > Kind regards,
> > Stevo Slavic
> >
>


Re: Delete Specific Message from a Topic (partition)

2014-10-26 Thread Stevo Slavić
Have group 1 act like a filter, publish to a new topic all messages that
group 2 should process and then have group 2 actually consume only new
topic.

Kind regards,
Stevo Slavic
On Oct 26, 2014 2:36 AM, "Srinivas Reddy Kancharla" 
wrote:

> Hi,
>
> I have a scenario where I produce messages for a given topic (say having 10
> partitions), and I have consumer group ( say mygroupID1) with 10 threads
> reading those 10 partitions.  After consuming, I would like to delete
> specific messages from that topic (i.e. from a given partition).
>
> How should I restrict other consumer group say mygroupID2 to read messages
> from the same topic?
>
> Thanks for the help,
> Srini
>


Re: Explicit topic creation and topic metadata availability

2014-10-27 Thread Stevo Slavić
OK,

thanks for heads up!

Is this requirement documented somewhere?
Would it make sense then to have AdminUtils, call setZkSerializer on
zkClient passed to it? Or maybe provide a factory method(s) for ZkClient in
AdminUtils, which would ensure  ZkSerializer is appropriate.

Kind regards,
Stevo Slavic.

On Mon, Oct 27, 2014 at 12:43 AM, Guozhang Wang  wrote:

> Hi Stevo,
>
> Sorry for the late reply.
>
> Yes, you have to use ZKStringSerializer when initializing zkClient. However
> this is not related to key.serializer.class of the producer (not the
> KafkaProducer, which is the new producer that does not need any key
> serializer any more).
>
> key.serializer.class and value serializer.class are used to specify how to
> encode / decode key value bytes of the message, while ZKStringSerializer
> specifies how to encode the bytes written in ZK metadata path.
>
> Guozhang
>
> On Wed, Oct 22, 2014 at 5:36 PM, Stevo Slavić  wrote:
>
> > It seems that used ZkSerializer has to be aligned with KafkaProducer
> > configured key.serializer.class.
> >
> > On Thu, Oct 23, 2014 at 1:13 AM, Stevo Slavić  wrote:
> >
> > > Still have to understand what is going on, but when I set
> > > kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used in
> > > AdminUtils calls, KafkaProducer could see created topic...
> > > Default ZkSerializer is
> > > org.I0Itec.zkclient.serialize.SerializableSerializer.
> > >
> > > Kind regards,
> > > Stevo Slavic.
> > >
> > > On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić 
> > wrote:
> > >
> > >> Output on trunk is clean too, after clean build:
> > >>
> > >> ~/git/oss/kafka [trunk|✔]
> > >> 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> > >> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> > >> Error while executing topic command next on empty iterator
> > >> java.util.NoSuchElementException: next on empty iterator
> > >> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> > >> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> > >> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
> > >> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> > >> at
> > >>
> >
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
> > >> at
> > >>
> >
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
> > >> at
> > >>
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > >> at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > >> at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
> > >> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
> > >> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> > >>
> > >> On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić 
> > wrote:
> > >>
> > >>> kafka-topics.sh execution, from latest trunk:
> > >>>
> > >>> ~/git/oss/kafka [trunk|✔]
> > >>> 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> > >>> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> > >>> SLF4J: Class path contains multiple SLF4J bindings.
> > >>> SLF4J: Found binding in
> > >>>
> >
> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > >>> SLF4J: Found binding in
> > >>>
> >
> [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > >>> explanation.
> > >>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> > >>> Error while executing topic command next on empty iterator
> > >>> java.util.NoSuchElementException: next on empty iterator
> > >>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> > >>> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> > >>> at
> scala.collection.IterableLike$class.head(IterableLike.scala:91)
> > >>> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> > >>> at
&

Re: Delete Specific Message from a Topic (partition)

2014-10-27 Thread Stevo Slavić
You have (at least) two problems there, stuck processing and blocking
processing retry indefinitely.
To solve first issue, there is no general answer, it depends on what kind
of processing you're doing that gets stuck, that will influence how you can
terminate/interrupt it.
I assume you use Java or at least JVM. It will help with both issues, to
split reading a message and processing a message into separate threads,
actors, or whatever your favorite concurrent computation abstraction.
For second issue, reading/consuming part can wait, not indefinitely, but
until a configurable timeout for processing to complete, and on timeout
retry processing, giving work to a new worker.

Kind regards,
Stevo Slavic.

On Sun, Oct 26, 2014 at 6:17 PM, Srinivas Reddy Kancharla <
getre...@gmail.com> wrote:

> Thanks for your response. I agree on sending messages to other new topic
> and thats what I am doing now. The reason of asking deleting of messages
> from log :
> Say I have a topic "InitialState_topic1" where one consumer group
> mygroupID1 is reading and is sending messages to new topic
> "InprogressState_topic1" -> which is read by another consumer group
> "mygroupID2". Now one of the consumer thread in mygroupID2 is stuck or hung
> in reading one of the partition, I was under impression that there should
> be expiry rule which I can set on each message so that I can delete such
> message from that partition and put it back to "initialState_topic1" topic
> so that other idle threads can pick it up again.
>
> So if a thread of a consumer group is stuck or hung (with one single
> message) and it holds a partition, how can I make other idle thread (from
> same consumer group) to take control of that partition so that it can read
> other pending messages?
>
> Thanks and regards,
> Srini
>
> On Sun, Oct 26, 2014 at 12:40 AM, Stevo Slavić  wrote:
>
> > Have group 1 act like a filter, publish to a new topic all messages that
> > group 2 should process and then have group 2 actually consume only new
> > topic.
> >
> > Kind regards,
> > Stevo Slavic
> > On Oct 26, 2014 2:36 AM, "Srinivas Reddy Kancharla" 
> > wrote:
> >
> > > Hi,
> > >
> > > I have a scenario where I produce messages for a given topic (say
> having
> > 10
> > > partitions), and I have consumer group ( say mygroupID1) with 10
> threads
> > > reading those 10 partitions.  After consuming, I would like to delete
> > > specific messages from that topic (i.e. from a given partition).
> > >
> > > How should I restrict other consumer group say mygroupID2 to read
> > messages
> > > from the same topic?
> > >
> > > Thanks for the help,
> > > Srini
> > >
> >
>


Re: Explicit topic creation and topic metadata availability

2014-10-29 Thread Stevo Slavić
Created https://issues.apache.org/jira/browse/KAFKA-1737

Thanks for support!

Kind regards,
Stevo Slavic.

On Tue, Oct 28, 2014 at 2:07 AM, Guozhang Wang  wrote:

> I think AdminUtils just take a zkClient as its parameter, and the zkClient
> should setZkSerializer at the time when it is initialized.
>
> You can take a look at TopicCommand, which triggers AdminUtils.createTopic
> by initializing a zkClient and pass it in. I agree that we probably have to
> make it clearer in AdminUtils' comments (hence in the built scaladoc) since
> it is a public API. Then when scaladoc is built it can h
>
> Stevo, could you file a JIRA for the comments improvements?
>
> Guozhang
>
>
> On Mon, Oct 27, 2014 at 1:55 AM, Stevo Slavić  wrote:
>
> > OK,
> >
> > thanks for heads up!
> >
> > Is this requirement documented somewhere?
> > Would it make sense then to have AdminUtils, call setZkSerializer on
> > zkClient passed to it? Or maybe provide a factory method(s) for ZkClient
> in
> > AdminUtils, which would ensure  ZkSerializer is appropriate.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Mon, Oct 27, 2014 at 12:43 AM, Guozhang Wang 
> > wrote:
> >
> > > Hi Stevo,
> > >
> > > Sorry for the late reply.
> > >
> > > Yes, you have to use ZKStringSerializer when initializing zkClient.
> > However
> > > this is not related to key.serializer.class of the producer (not the
> > > KafkaProducer, which is the new producer that does not need any key
> > > serializer any more).
> > >
> > > key.serializer.class and value serializer.class are used to specify how
> > to
> > > encode / decode key value bytes of the message, while
> ZKStringSerializer
> > > specifies how to encode the bytes written in ZK metadata path.
> > >
> > > Guozhang
> > >
> > > On Wed, Oct 22, 2014 at 5:36 PM, Stevo Slavić 
> wrote:
> > >
> > > > It seems that used ZkSerializer has to be aligned with KafkaProducer
> > > > configured key.serializer.class.
> > > >
> > > > On Thu, Oct 23, 2014 at 1:13 AM, Stevo Slavić 
> > wrote:
> > > >
> > > > > Still have to understand what is going on, but when I set
> > > > > kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used
> > in
> > > > > AdminUtils calls, KafkaProducer could see created topic...
> > > > > Default ZkSerializer is
> > > > > org.I0Itec.zkclient.serialize.SerializableSerializer.
> > > > >
> > > > > Kind regards,
> > > > > Stevo Slavic.
> > > > >
> > > > > On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić 
> > > > wrote:
> > > > >
> > > > >> Output on trunk is clean too, after clean build:
> > > > >>
> > > > >> ~/git/oss/kafka [trunk|✔]
> > > > >> 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
> > > > >> 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
> > > > >> Error while executing topic command next on empty iterator
> > > > >> java.util.NoSuchElementException: next on empty iterator
> > > > >> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> > > > >> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> > > > >> at
> > scala.collection.IterableLike$class.head(IterableLike.scala:91)
> > > > >> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> > > > >> at
> > > > >>
> > > >
> > >
> >
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
> > > > >> at
> > > > >>
> > > >
> > >
> >
> kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
> > > > >> at
> > > > >>
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > > > >> at
> > > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > >> at
> > kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
> > > > >> at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
> > > > >> at kafka.admin.TopicCommand.main(TopicCommand.scala)
> > > > >

Plugable metadata store

2014-11-14 Thread Stevo Slavić
Hello Apache Kafka community,

Is it already possible to configure/use a different metadata store (topics,
consumer groups, consumer to partition assignments, etc.) instead of
ZooKeeper?
If not, are there any plans to make it plugable in future?

Kind regards,
Stevo Slavic


Re: Plugable metadata store

2014-11-14 Thread Stevo Slavić
Isn't it only for offset management?

Kind regards,
Stevo

On Fri, Nov 14, 2014 at 2:16 PM, Sharninder  wrote:

> I haven't been following closely but getting rid of zookeeper is in the
> pipeline. Look up 0.9 plans. They're somewhere on the wiki.
>
> Sent from my iPhone
>
> > On 14-Nov-2014, at 5:18 pm, Stevo Slavić  wrote:
> >
> > Hello Apache Kafka community,
> >
> > Is it already possible to configure/use a different metadata store
> (topics,
> > consumer groups, consumer to partition assignments, etc.) instead of
> > ZooKeeper?
> > If not, are there any plans to make it plugable in future?
> >
> > Kind regards,
> > Stevo Slavic
>


Re: Client Offset Storage

2014-12-15 Thread Stevo Slavić
I have no experience with, but https://github.com/gerritjvv/kafka-fast
seems to fit your description.

Kind regards,
Stevo Slavic

On Fri, Dec 12, 2014 at 7:18 PM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:
>
> Basically, don't want to use ZK, for the reasons driving the new client
> offset features in 0.8.2.
>
> So we are looking for a library we could use that provides an alternative
> client offset implementation (broker-side, redis, local file, anything
> else). We could roll our own and put it behind the client offset api but I
> wanted to see if anyone had an implementation.
>
> -Suren
>
>
> On Fri, Dec 12, 2014 at 12:41 PM, Steve Morin 
> wrote:
> >
> > Suren,
> >   Like out of the box storage or roll your own?
> > -Steve
> >
> > On Fri, Dec 12, 2014 at 6:33 AM, Surendranauth Hiraman <
> > suren.hira...@velos.io> wrote:
> >
> > > My team is using Kafka 0.8.1 and we may not be able to upgrade to 0.8.2
> > to
> > > take advantage of the broker-side commit of client offsets.
> > >
> > > Is anyone aware of a Java/Scala library for client offset storage
> outside
> > > of ZK?
> > >
> > >
> > > -- Suren
> > >
> > >
> > > SUREN HIRAMAN, VP TECHNOLOGY
> > > Velos
> > > Accelerating Machine Learning
> > >
> > > 440 NINTH AVENUE, 11TH FLOOR
> > > NEW YORK, NY 10001
> > > O: (917) 525-2466 ext. 105
> > > F: 646.349.4063
> > > E: suren.hiraman@v elos.io
> > > W: www.velos.io
> > >
> >
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>


Current vote - 0.8.2.0-RC1 or 0.8.2.0?

2015-01-14 Thread Stevo Slavić
Hello Apache Kafka community,

Is currently active vote for 0.8.2.0-RC1 or 0.8.2.0?

If the vote is for 0.8.2.0-RC1 why isn't that reflected in artifact
metadata? Version should be 0.8.2.0-RC1, 0.8.2-RC1 or something similar
(0.8.2 beta release had "-beta" and no ".0" suffix - see
http://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2-beta/ )
If it stays like this, and final gets released later with same 0.8.2.0
version, but different content - repositories, both local and remote will
get polluted with junk.

If the vote is for 0.8.2.0 final GA release, why call the vote candidate 1?

Also, version related - none of the previous 0.8.x releases had ".0"
release i.e. 0.8.x.0. Is this change in version numbering intentional?

Kind regards,
Stevo Slavic.


Re: Current vote - 0.8.2.0-RC1 or 0.8.2.0?

2015-02-09 Thread Stevo Slavić
Thanks for heads up!

Please consider updating versions in JIRA - 0.8.2 --> 0.8.2.0, and labeling
0.8.2.0 as released.

Kind regards,
Stevo Slavic.

On Wed, Jan 14, 2015 at 6:54 PM, Jun Rao  wrote:

> About the versioning, we had released 0.8.1 and 0.8.1.1 before, which is a
> bit inconsistent in terms of versioning format. So picking 0.8.2.0 is
> intended to fix that inconsistency.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 14, 2015 at 9:12 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > Is currently active vote for 0.8.2.0-RC1 or 0.8.2.0?
> >
> > If the vote is for 0.8.2.0-RC1 why isn't that reflected in artifact
> > metadata? Version should be 0.8.2.0-RC1, 0.8.2-RC1 or something similar
> > (0.8.2 beta release had "-beta" and no ".0" suffix - see
> > http://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2-beta/ )
> > If it stays like this, and final gets released later with same 0.8.2.0
> > version, but different content - repositories, both local and remote will
> > get polluted with junk.
> >
> > If the vote is for 0.8.2.0 final GA release, why call the vote candidate
> 1?
> >
> > Also, version related - none of the previous 0.8.x releases had ".0"
> > release i.e. 0.8.x.0. Is this change in version numbering intentional?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Kafka metrics: high level vs simple consumer

2015-02-26 Thread Stevo Slavić
Hello Apache Kafka community,

In Kafka 0.8.1.1, are Kafka metrics updated/tracked/marked by simple
consumer implementation or only by high level one?

Kind regards,
Stevo Slavic.


Re: Database Replication Question

2015-03-03 Thread Stevo Slavić
Have you considered including order information in messages that are sent
to Kafka, and then restoring order in logic that is processing messages
consumed from Kafka?
http://www.enterpriseintegrationpatterns.com/Resequencer.html

Kind regards,
Stevo Slavic.

On Wed, Mar 4, 2015 at 12:15 AM, Josh Rader  wrote:

> Hi Kafka Experts,
>
>
>
> We have a use case around RDBMS replication where we are investigating
> Kafka.  In this case ordering is very important.  Our understanding is
> ordering is only preserved within a single partition.  This makes sense as
> a single thread will consume these messages, but our question is can we
> somehow parallelize this for better performance?   Is there maybe some
> partition key strategy trick to have your cake and eat it too in terms of
> keeping ordering, but also able to parallelize the processing?
>
>
>
> I am sorry if this has already been asked, but we tried to search through
> the archives and couldn’t find this response.
>
>
>
> Thanks,
>
> Josh
>


Kafka elastic no downtime scalability

2015-03-13 Thread Stevo Slavić
Hello Apache Kafka community,

On Apache Kafka website home page http://kafka.apache.org/ it is stated
that Kafka "can be elastically and transparently expanded without downtime."
Is that really true? More specifically, can one just add one more broker,
have another partition added for the topic, have new broker assigned to be
the leader for new partition, have producers correctly write to the new
partition, and consumers read from it, with no broker, consumer or producer
downtime, no data loss, no manual action to move data from existing
partitions to new partition?

Kind regards,
Stevo Slavic.


Re: Kafka elastic no downtime scalability

2015-03-13 Thread Stevo Slavić
OK, thanks for heads up.

When reading Apache Kafka docs, and reading what Apache Kafka "can" I
expect it to already be available in latest general availability release,
not what's planned as part of some other project.

Kind regards,
Stevo Slavic.

On Fri, Mar 13, 2015 at 2:32 PM, Joe Stein  wrote:

> Hey Stevo, "can be elastically and transparently expanded without
> downtime." is
> the goal of Kafka on Mesos https://github.com/mesos/kafka as Kafka as the
> ability (knobs/levers) to-do this but has to be made to-do this out of the
> box.
>
> e.g. in Kafka on Mesos when a broker fails, after the configurable max fail
> over timeout (meaning it is truly deemed hard failure) then a broker (with
> the same id) will automatically be started on a another machine, data
> replicated and back in action once that is done, automatically. Lots more
> features already in there... we are also in progress to auto balance
> partitions when increasing/decreasing the size of the cluster and some more
> goodies too.
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Fri, Mar 13, 2015 at 8:43 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > On Apache Kafka website home page http://kafka.apache.org/ it is stated
> > that Kafka "can be elastically and transparently expanded without
> > downtime."
> > Is that really true? More specifically, can one just add one more broker,
> > have another partition added for the topic, have new broker assigned to
> be
> > the leader for new partition, have producers correctly write to the new
> > partition, and consumers read from it, with no broker, consumer or
> producer
> > downtime, no data loss, no manual action to move data from existing
> > partitions to new partition?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: Kafka elastic no downtime scalability

2015-03-13 Thread Stevo Slavić
These features are all nice, if one adds new brokers to support additional
topics, or to move existing partitions or whole topics to new brokers.
Referenced sentence is in paragraph named scalability. When I read
"expanded" I was thinking of scaling out, extending parallelization
capabilities, and parallelism in Kafka is achieved with partitions. So I
understood that sentence that it is possible to add more partitions to
existing topics at runtime, with no downtime.

I just found in source that there is API for adding new partitions to
existing topics (see
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/AdminUtils.scala#L101
). Have to try it. I guess it should work during runtime, causing no
downtime or data loss, or moving data from existing to new partition.
Producers and consumers will eventually start writing to and reading from
new partition, and consumers should be able to read previously published
messages from old partitions, even messages which if they were sent again
would end up assigned/written to new partition.


Kind regards,
Stevo Slavic.

On Fri, Mar 13, 2015 at 8:27 PM, Joe Stein  wrote:

> https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Fri, Mar 13, 2015 at 3:05 PM, sunil kalva  wrote:
>
> > Joe
> >
> > "Well, I know it is semantic but right now it "can" be elastically scaled
> > without down time but you have to integrate into your environment for
> what
> > that means it has been that way since 0.8.0 imho"
> >
> > here what do you mean "you have to integrate into your environment", how
> do
> > i achieve elastically scaled cluster seamlessly ?
> >
> > SunilKalva
> >
> > On Fri, Mar 13, 2015 at 10:27 PM, Joe Stein 
> wrote:
> >
> > > Well, I know it is semantic but right now it "can" be elastically
> scaled
> > > without down time but you have to integrate into your environment for
> > what
> > > that means it has been that way since 0.8.0 imho.
> > >
> > > My point was just another way to-do that out of the box... folks do
> this
> > > elastic scailing today with AWS CloudFormation and internal systems
> they
> > > built too.
> > >
> > > So, it can be done... you just have todo it.
> > >
> > > ~ Joe Stein
> > > - - - - - - - - - - - - - - - - -
> > >
> > >   http://www.stealth.ly
> > > - - - - - - - - - - - - - - - - -
> > >
> > > On Fri, Mar 13, 2015 at 12:39 PM, Stevo Slavić 
> > wrote:
> > >
> > > > OK, thanks for heads up.
> > > >
> > > > When reading Apache Kafka docs, and reading what Apache Kafka "can" I
> > > > expect it to already be available in latest general availability
> > release,
> > > > not what's planned as part of some other project.
> > > >
> > > > Kind regards,
> > > > Stevo Slavic.
> > > >
> > > > On Fri, Mar 13, 2015 at 2:32 PM, Joe Stein 
> > wrote:
> > > >
> > > > > Hey Stevo, "can be elastically and transparently expanded without
> > > > > downtime." is
> > > > > the goal of Kafka on Mesos https://github.com/mesos/kafka as Kafka
> > as
> > > > the
> > > > > ability (knobs/levers) to-do this but has to be made to-do this out
> > of
> > > > the
> > > > > box.
> > > > >
> > > > > e.g. in Kafka on Mesos when a broker fails, after the configurable
> > max
> > > > fail
> > > > > over timeout (meaning it is truly deemed hard failure) then a
> broker
> > > > (with
> > > > > the same id) will automatically be started on a another machine,
> data
> > > > > replicated and back in action once that is done, automatically.
> Lots
> > > more
> > > > > features already in there... we are also in progress to auto
> balance
> > > > > partitions when increasing/decreasing the size of the cluster and
> > some
> > > > more
> > > > > goodies too.
> > > > >
> > > > > ~ Joe Stein
> > > > > - - - - - - - - - - - - - - - - -
> > > > >
> > > > >   http://www.stealth.ly
> > > > > - - - - - - - - - - - - - - - - -
> > > > >
> > > > > On Fri, Mar 13, 2015 at 8:43 AM, Stevo Slavić 
> > > wrote:
> > > > >
> > > > > > Hello Apache Kafka community,
> > > > > >
> > > > > > On Apache Kafka website home page http://kafka.apache.org/ it is
> > > > stated
> > > > > > that Kafka "can be elastically and transparently expanded without
> > > > > > downtime."
> > > > > > Is that really true? More specifically, can one just add one more
> > > > broker,
> > > > > > have another partition added for the topic, have new broker
> > assigned
> > > to
> > > > > be
> > > > > > the leader for new partition, have producers correctly write to
> the
> > > new
> > > > > > partition, and consumers read from it, with no broker, consumer
> or
> > > > > producer
> > > > > > downtime, no data loss, no manual action to move data from
> existing
> > > > > > partitions to new partition?
> > > > > >
> > > > > > Kind regards,
> > > > > > Stevo Slavic.
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > SunilKalva
> >
>


Re: Reusable consumer across consumer groups

2015-03-13 Thread Stevo Slavić
Sorry for late reply. Not sure what more details you need.
Here's example http://confluent.io/docs/current/kafka-rest/docs/intro.html
of exposing Kafka through remoting (http/rest) :-)
One can without looking into kafka rest proxy code see based on its
limitations that it's using HL consumer, with all its deficiencies.
E.g. commit "request *must* be made to the specific REST proxy instance
holding the consumer instance" (see
http://confluent.io/docs/current/kafka-rest/docs/api.html#post--consumers-%28string-group_name%29-instances-%28string-instance%29-offsets
). Also "because consumers are stateful, any consumer instances created
with the REST API are tied to a specific REST proxy instance", and
"consumers may not change the set of topics they are subscribed to once
they have started consuming messages" (see
http://confluent.io/docs/current/kafka-rest/docs/api.html#consumers )

One of the things making high level consumer objects heavy is that each one
starts many threads, so a limited number of HL consumer instances can be
created per node (before OOM is thrown, not because there's not enough
memory, but because there are too many threads started).

With 0.8.2.x not much has changed on ability to reuse HL consumer instances
to poll on behalf of different consumer groups, consumer instances are
stateful - most importantly offset and lock(s) that active consumer is
holding. Luckily, there's simple consumer API.

Kind regards,
Stevo Slavic.

On Thu, Oct 23, 2014 at 6:36 PM, Neha Narkhede 
wrote:

> I'm wondering how much of this can be done using careful system design vs
> building it within the consumer itself. You could distribute the several
> consumer instances across machines since it is built for distributed load
> balancing. That will sufficiently isolate the resources required to run the
> various consumers. But probably you have a specific use case in mind for
> running several consumer groups on the same machine. Would you mind giving
> more details?
>
> On Thu, Oct 23, 2014 at 12:55 AM, Stevo Slavić  wrote:
>
> > Imagine exposing Kafka over various remoting protocols, where incoming
> > poll/read requests may come in concurrently for different consumer
> groups,
> > especially in a case with lots of different consumer groups.
> > If you create and destroy KafkaConsumer for each such request, response
> > times and throughput will be very low, and doing that is one of the ways
> to
> > reproduce https://issues.apache.org/jira/browse/KAFKA-1716
> >
> > It would be better if one could reuse a (pool of) Consumer instances, and
> > through a read operation parameter specify for which consumer group
> should
> > read be performed.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Tue, Oct 14, 2014 at 6:17 PM, Neha Narkhede 
> > wrote:
> >
> > > Stevo,
> > >
> > > The new consumer API is planned for 0.9, not 0.8.2. You can take a look
> > at
> > > a detailed javadoc here
> > > <
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >
> > > .
> > >
> > > Can you explain why you would like to poll messages across consumer
> > groups
> > > using just one instance?
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Tue, Oct 14, 2014 at 1:03 AM, Stevo Slavić 
> wrote:
> > >
> > > > Hello Apache Kafka community,
> > > >
> > > > Current (Kafka 0.8.1.1) high-level API's KafkaConsumer is not
> > lightweight
> > > > object, it's creation takes some time and resources, and it does not
> > seem
> > > > to be thread-safe. It's API also does not support reuse, for
> consuming
> > > > messages from different consumer groups.
> > > >
> > > > I see even in the coming (0.8.2) redesigned API it will not be
> possible
> > > to
> > > > reuse consumer instance to poll messages from different consumer
> > groups.
> > > >
> > > > Can something be done to support this?
> > > >
> > > > Would it help if there was consumer group as a separate entity from
> > > > consumer, for all the subscription management tasks?
> > > >
> > > > Kind regards,
> > > > Stevo Slavic
> > > >
> > >
> >
>


Re: Kafka metrics: high level vs simple consumer

2015-03-14 Thread Stevo Slavić
Hvala Otis,

It works well, had a bug in reporter configuration.

Kind regards,
Stevo Slavic.
On Feb 27, 2015 5:19 PM, "Otis Gospodnetic" 
wrote:

> Bok Stevo,
>
> Simple as well, if I'm not mistaken.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Fri, Feb 27, 2015 at 2:35 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > In Kafka 0.8.1.1, are Kafka metrics updated/tracked/marked by simple
> > consumer implementation or only by high level one?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


kafka.admin as separate module

2015-03-25 Thread Stevo Slavić
Hello Apache Kafka community,

I like that kafka-clients is now separate module, and has no scala
dependency even. I'd like to propose that kafka.admin package gets
published as separate module too.

I'm writing some tests, and to be able to use kafka.admin tools/utils in
them I have to bring in too large kafka module, with server stuff and all
dependencies, like netty. Test framework happens to use netty too but
different version - classpath hell.

Any thoughts? Is proposal sound enough for a JIRA ticket?

Kind regards,
Stevo Slavic.


Re: kafka.admin as separate module

2015-03-25 Thread Stevo Slavić
Ah, great, thanks for heads up Andrii!
On Mar 25, 2015 5:39 PM, "Andrii Biletskyi" 
wrote:

> Hi Stevo,
>
> JFYI: we are working now on new centralized API for Admin commands.
> This will include:
> - Public API to perform TopicCommands (Create/Alter/Delete/List/Describe)
> - Out-of-box java client for Admin API: AdminClient - will be part of
> /clients
> - Interactive cli for admin commands
> The plan was to have all mentioned above in 0.9 and probably remove
> existing
> tools at that point.
>
> You can check for details:
> Confluence -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations
> JIRA - https://issues.apache.org/jira/browse/KAFKA-1694
> Mailing list - thread "[DISCUSS] KIP-4 - Command line and centralized
> administrative operations"
>
> Thanks,
> Andrii Biletskyi
>
>
> On Wed, Mar 25, 2015 at 5:26 PM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > I like that kafka-clients is now separate module, and has no scala
> > dependency even. I'd like to propose that kafka.admin package gets
> > published as separate module too.
> >
> > I'm writing some tests, and to be able to use kafka.admin tools/utils in
> > them I have to bring in too large kafka module, with server stuff and all
> > dependencies, like netty. Test framework happens to use netty too but
> > different version - classpath hell.
> >
> > Any thoughts? Is proposal sound enough for a JIRA ticket?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: Dropping support for Scala 2.9.x

2015-03-27 Thread Stevo Slavić
+1 for dropping 2.9.x support

Kind regards,
Stevo Slavic.

On Fri, Mar 27, 2015 at 3:20 PM, Ismael Juma  wrote:

> Hi all,
>
> The Kafka build currently includes support for Scala 2.9, which means that
> it cannot take advantage of features introduced in Scala 2.10 or depend on
> libraries that require it.
>
> This restricts the solutions available while trying to solve existing
> issues. I was browsing JIRA looking for areas to contribute and I quickly
> ran into two issues where this is the case:
>
> * KAFKA-1351: "String.format is very expensive in Scala" could be solved
> nicely by using the String interpolation feature introduced in Scala 2.10.
>
> * KAFKA-1595: "Remove deprecated and slower scala JSON parser from
> kafka.consumer.TopicCount" could be solved by using an existing JSON
> library, but both jackson-scala and play-json require 2.10 (argonaut
> supports Scala 2.9, but it brings other dependencies like scalaz). We can
> workaround this by writing our own code instead of using libraries, of
> course, but it's not ideal.
>
> Other features like Scala Futures and value classes would also be useful in
> some situations, I would think (for a more extensive list of new features,
> see
>
> http://scala-language.1934581.n4.nabble.com/Scala-2-10-0-now-available-td4634126.html
> ).
>
> Another pain point of supporting 2.9.x is that it doubles the number of
> build and test configurations required from 2 to 4 (because the 2.9.x
> series was not necessarily binary compatible).
>
> A strong argument for maintaining support for 2.9.x was the client library,
> but that has been rewritten in Java.
>
> It's also worth mentioning that Scala 2.9.1 was released in August 2011
> (more than 3.5 years ago) and the 2.9.x series hasn't received updates of
> any sort since early 2013. Scala 2.10.0, in turn, was released in January
> 2013 (over 2 years ago) and 2.10.5, the last planned release in the 2.10.x
> series, has been recently released (so even 2.10.x won't be receiving
> updates any longer).
>
> All in all, I think it would not be unreasonable to drop support for Scala
> 2.9.x in a future release, but I may be missing something. What do others
> think?
>
> Ismael
>


Offset management: client vs broker side responsibility

2015-04-21 Thread Stevo Slavić
Hello Apache Kafka community,

Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
management responsibility is mainly client/consumer side responsibility.

Wouldn't it be better if it was broker side only responsibility?

E.g. now if one wants to use custom offset management, any of the Kafka
monitoring tools cannot see the offsets - they would need to use same
custom client implementation which is practically not possible.

Kind regards,
Stevo Slavic.


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Stevo Slavić
Please correct me if wrong, but I think it is really not hard constraint
that one cannot have more consumers (from same group) than partitions on
single topic - all the surplus consumers will not be assigned to consume
any partition, but they can be there and as soon as one active consumer
from same group goes offline (its connection to ZK is dropped), consumers
from the group will be rebalanced so one passively waiting consumer will
become active.

Kind regards,
Stevo Slavic.

On Wed, Apr 29, 2015 at 2:25 PM, David Corley  wrote:

> If the 100 partitions are all for the same topic, you can have up to 100
> consumers working as part of a single consumer group for that topic.
> You cannot have more consumers than there are partitions within a given
> consumer group.
>
> On 29 April 2015 at 08:41, Nimi Wariboko Jr  wrote:
>
> > Hi,
> >
> > I was wondering what options there are for horizontally scaling kafka
> > consumers? Basically if I have 100 partitions and 10 consumers, and want
> to
> > temporarily scale up to 50 consumers, what options do I have?
> >
> > So far I've thought of just simply tracking consumer membership somehow
> > (either through Raft or zookeeper's znodes) on the consumers.
> >
>


Re: Empty Partitions Impact on Performance

2015-05-06 Thread Stevo Slavić
I noticed similar behavior on similar small 3 Kafka broker with 3 ZooKeeper
node cluster, Kafka 0.8.1.1 and ZooKeeper 3.4.6, with ~5K topics, most of
them with single partition, replication factor of 1, and most of them
unused for long time, but brokers are busy and performance especially
producer performance is slim. Plan to investigate root cause. Would be nice
if it was documented, by design how many topics/partitions can be handled
by single Kafka broker node, and then also scalability limits/constraints.
Both Kafka and ZooKeeper need to be checked for limits.

Kind regards,
Stevo Slavic.

On Wed, May 6, 2015 at 1:32 PM, Pavel Sýkora  wrote:

> Hi,
>
>
>
> I've started playing with Kafka_2.10-0.8.2.1 (a cluster of 3 VM nodes). I
> found a strange behaviour: Inbound throughput was about 60 thousand
> messages
> per second for a partition with 50 partitions. But after adding topics with
> 950 partitions total, the performance dropped under 25 thousand messages
> per
> second, even if the 950 new partitions were unused and empty. In other
> words: a simple existence of empty unused partitions can drop Kafka
> performance to less than 50 %.
>
>
>
>
> Has anybody else observed such Kafka behaviour? Does anybody have an
> explanation for this?
>
>
>
>
> Regards,
>
>
>
>
> Pavel
>


Re: [Announcement] Hermes - pub / sub broker built on top of Kafka

2015-05-16 Thread Stevo Slavić
Nice, thanks for sharing!

Is 30k msgs/sec publishing or push  throughput? Will check, hopefully
performance tests are included in sources.

Does Hermes have same max number of topics limitations as Kafka or does it
include a solution to have that aspect scalable as well?
On May 16, 2015 8:02 AM, "Marcin Kuthan"  wrote:

> Hi Everyone
>
> Hermes is an asynchronous message broker built on top of Kafka. It
> provides reliable, fault tolerant REST interface for message
> publishing and adaptive push mechanisms for message sending. Hermes is
> used as a message broker for asynchronous communication between
> microservices.
>
> Some of the main features:
>
> - Performance and scalability - Hermes in production handling up to
> 30.000 msgs/sec with 99.9th percentile latency below 100 ms on a
> 4-node cluster.
>
> - Reliability - Hermes is used for publishing sensitive data, such as
> billing events, user account changes etc. Hermes allows to define more
> reliable policy for those important events - require acknowledge from
> all Kafka replicas and increase request timeouts.
>
> - Push model - It makes receiving messages from Hermes dead simple:
> you just have to write one HTTP endpoint in your service. It’s up to
> Hermes to create Kafka consumer, redeliver messages, keep eye on
> throughput limits etc.
>
> Feedback and comments are welcome, you can find Hermes documentation at:
> http://hermes-pubsub.readthedocs.org/en/latest/index.html
>
> Hermes is published under Apache Licence:
> https://github.com/allegro/hermes
>
> Best Regards,
> Marcin
>


Re: Offset management: client vs broker side responsibility

2015-05-27 Thread Stevo Slavić
It could be a separate server component, does not have to be
monolith/coupled with broker.
Such solution would have benefits - single API, pluggable implementations.

On Wed, May 27, 2015 at 8:57 AM, Shady Xu  wrote:

> Storing and managing offsets by broker will leave high pressure on the
> brokers which will affect the performance of the cluster.
>
> You can use the advanced consumer APIs, then you can get the offsets either
> from zookeeper or the __consumer_offsets__ topic. On the other hand, if you
> use the simple consumer APIs, you mean to manage offsets yourself, then you
> should monitor them yourself, simple and plain, right?
>
> 2015-04-22 14:36 GMT+08:00 Stevo Slavić :
>
> > Hello Apache Kafka community,
> >
> > Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
> > management responsibility is mainly client/consumer side responsibility.
> >
> > Wouldn't it be better if it was broker side only responsibility?
> >
> > E.g. now if one wants to use custom offset management, any of the Kafka
> > monitoring tools cannot see the offsets - they would need to use same
> > custom client implementation which is practically not possible.
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


NoSuchElementException while retrieving logStartOffset

2015-05-29 Thread Stevo Slavić
Hello Kafka community,

We had a ton of test topics, and deleted them using Kafka admin scripts -
then our metrics error log started filling up with exceptions.
Kafka metric reporter is trying to read LogStartOffset gauge value, and
that throws NoSuchElementException.

java.util.NoSuchElementException
at
java.util.concurrent.ConcurrentSkipListMap$Iter.advance(ConcurrentSkipListMap.java:2299)
at
java.util.concurrent.ConcurrentSkipListMap$ValueIterator.next(ConcurrentSkipListMap.java:2326)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at kafka.log.Log.logStartOffset(Log.scala:502)
at kafka.log.Log$$anon$2.value(Log.scala:86)
at kafka.log.Log$$anon$2.value(Log.scala:85)

Kafka version is 0.8.2.1 and this is logStartOffset method implementation:

def logStartOffset: Long = logSegments.head.baseOffset

It assumes logSegments is non empty. I see in few places in kafka.log.Log
methods are trying to ensure that there is always at least one segment.

I guess not all possible cases are covered, so it can happen that Log has 0
logSegments.
Not even restart of all Kafka brokers in the cluster didn't help.

Any suggestions how to workaround this are welcome.

Kind regards,
Stevo Slavic.


Re: NoSuchElementException while retrieving logStartOffset

2015-05-29 Thread Stevo Slavić
Correction - restart did help, those exceptions are not logged anymore.
Still, it seems there is a bug in topic deletion, some kafka.log.Log
references continue to live after topic is deleted, until broker is
restarted.
Will try to create reproducible failing test.

On Fri, May 29, 2015 at 11:50 PM, Stevo Slavić  wrote:

> Hello Kafka community,
>
> We had a ton of test topics, and deleted them using Kafka admin scripts -
> then our metrics error log started filling up with exceptions.
> Kafka metric reporter is trying to read LogStartOffset gauge value, and
> that throws NoSuchElementException.
>
> java.util.NoSuchElementException
> at
> java.util.concurrent.ConcurrentSkipListMap$Iter.advance(ConcurrentSkipListMap.java:2299)
> at
> java.util.concurrent.ConcurrentSkipListMap$ValueIterator.next(ConcurrentSkipListMap.java:2326)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
> at scala.collection.IterableLike$class.head(IterableLike.scala:107)
> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> at kafka.log.Log.logStartOffset(Log.scala:502)
> at kafka.log.Log$$anon$2.value(Log.scala:86)
> at kafka.log.Log$$anon$2.value(Log.scala:85)
>
> Kafka version is 0.8.2.1 and this is logStartOffset method implementation:
>
> def logStartOffset: Long = logSegments.head.baseOffset
>
> It assumes logSegments is non empty. I see in few places in kafka.log.Log
> methods are trying to ensure that there is always at least one segment.
>
> I guess not all possible cases are covered, so it can happen that Log has
> 0 logSegments.
> Not even restart of all Kafka brokers in the cluster didn't help.
>
> Any suggestions how to workaround this are welcome.
>
> Kind regards,
> Stevo Slavic.
>
>


KafkaMetricsConfig not documented

2015-05-31 Thread Stevo Slavić
Hello Apache Kafka community,

In current (v0.8.2.1) documentation at
http://kafka.apache.org/documentation.html#configuration I cannot find
anything about two configuration properties used in
kafka.metrics.KafkaMetricsConfig, namely "kafka.metrics.reporters" and
"kafka.metrics.polling.interval.secs". I'd expect them in broker
configuration section. Am I missing something? Or are those two
intentionally left out of configuration documentation?

Kind regards,
Stevo Slavic.


Re: KafkaMetricsConfig not documented

2015-06-02 Thread Stevo Slavić
Created https://issues.apache.org/jira/browse/KAFKA-2244

On Mon, Jun 1, 2015 at 7:18 AM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Yeah, they aren't included in KafkaConfig for some reason but I think they
> should. Can you file a jira?
>
> Aditya
>
> ____
> From: Stevo Slavić [ssla...@gmail.com]
> Sent: Sunday, May 31, 2015 3:57 PM
> To: users@kafka.apache.org
> Subject: KafkaMetricsConfig not documented
>
> Hello Apache Kafka community,
>
> In current (v0.8.2.1) documentation at
> http://kafka.apache.org/documentation.html#configuration I cannot find
> anything about two configuration properties used in
> kafka.metrics.KafkaMetricsConfig, namely "kafka.metrics.reporters" and
> "kafka.metrics.polling.interval.secs". I'd expect them in broker
> configuration section. Am I missing something? Or are those two
> intentionally left out of configuration documentation?
>
> Kind regards,
> Stevo Slavic.
>


Re: backup(dump) and restore environment

2015-06-09 Thread Stevo Slavić
Hello Jakub,

Maybe it will work for you to combine MirrorMaker
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
and Burrow: https://github.com/linkedin/Burrow
See recent announcement for Burrow
http://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCACrdVJpS3k1ZxCVHGqs0H5d7gfUUsQXdZ66DMRUEAccPrCOzvg%40mail.gmail.com%3E
which mentions how it helps with monitoring MirrorMaker.

Kind regards,
Stevo Slavic.

On Tue, Jun 9, 2015 at 11:00 AM, Jakub Muszynski  wrote:

> Hi
>
> I'm looking for the best way to "dump" current system state, and recreate
> it on the new, autonomic environment.
> Lets say I'd like to create a copy of Production, and based on that, create
> new, separate environment for testing.
>
> Can You suggest some solutions?
>
> greetings
> Jakub
>


Re: Offset management: client vs broker side responsibility

2015-06-16 Thread Stevo Slavić
Found out that there is standard API for retrieving and committing offsets
(see
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
)

Problem is that the server/broker side is not extensible (see
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L142
) - i.e. there is no API one can implement and deploy/configure together
with Kafka binary with support for handling unsupported or overriding
handling of already supported
offsetCommitRequest.versionId/offsetFetchRequest.versionId

It does not prevent one to implement custom offset management on client
side (instead of using standard API to commit and retrieve offsets, one can
directly talk with custom offset store) but then problem arises that no
commercial or FOSS kafka monitoring solution support it out of the box.

I know I would, but the question to Apache Kafka community is would you
like to have Kafka broker commit/fetch extensible, and then also what
committers think about this?

Kind regards,
Stevo Slavic.


On Tue, Jun 2, 2015 at 7:11 PM, Otis Gospodnetic  wrote:

> Hi,
>
> I haven't followed the changes to offset tracking closely, other than that
> storing them in ZK is not the only option any more.
> I think what Stevo is asking about/suggesting is that there there be a
> single API from which offset information can be retrieved (e.g. by
> monitoring tools), so that monitoring tools work regardless of where one
> chose to store offsets.
> I know we'd love to have this for SPM's Kafka monitoring and can tell you
> that adding support for N different APIs for N different offset storage
> systems would be hard/time-consuming/expensive.
> But maybe this single API already exists?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Jun 1, 2015 at 4:41 PM, Jason Rosenberg  wrote:
>
> > Stevo,
> >
> > Both of the main solutions used by the high-level consumer are
> standardized
> > and supported directly by the kafka client libraries (e.g. maintaining
> > offsets in zookeeper or in kafka itself).  And for the zk case, there is
> > the consumer offset checker (which is good for monitoring).  Consumer
> > offset checker still needs to be extended for offsets stored in kafka
> > _consumer_offset topics though.
> >
> > Anyway, I'm not sure I understand your question, you want something for
> > better monitoring of all possible clients (some of which might choose to
> > manage offsets in their own way)?
> >
> > It's just not part of the kafka design to directly track individual
> > consumers.
> >
> > Jason
> >
> > On Wed, May 27, 2015 at 7:42 AM, Shady Xu  wrote:
> >
> > > I guess adding a new component will increase the complexity of the
> system
> > > structure. And if the new component consists of one or a few nodes, it
> > may
> > > becomes the bottleneck of the whole system, if it consists of many
> nodes,
> > > it will make the system even more complex.
> > >
> > > Although every solution has its downsides, I think the current one is
> > > decent.
> > >
> > > 2015-05-27 17:10 GMT+08:00 Stevo Slavić :
> > >
> > > > It could be a separate server component, does not have to be
> > > > monolith/coupled with broker.
> > > > Such solution would have benefits - single API, pluggable
> > > implementations.
> > > >
> > > > On Wed, May 27, 2015 at 8:57 AM, Shady Xu  wrote:
> > > >
> > > > > Storing and managing offsets by broker will leave high pressure on
> > the
> > > > > brokers which will affect the performance of the cluster.
> > > > >
> > > > > You can use the advanced consumer APIs, then you can get the
> offsets
> > > > either
> > > > > from zookeeper or the __consumer_offsets__ topic. On the other
> hand,
> > if
> > > > you
> > > > > use the simple consumer APIs, you mean to manage offsets yourself,
> > then
> > > > you
> > > > > should monitor them yourself, simple and plain, right?
> > > > >
> > > > > 2015-04-22 14:36 GMT+08:00 Stevo Slavić :
> > > > >
> > > > > > Hello Apache Kafka community,
> > > > > >
> > > > > > Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x)
> offset
> > > > > > management responsibility is mainly client/consumer side
> > > > responsibility.
> > > > > >
> > > > > > Wouldn't it be better if it was broker side only responsibility?
> > > > > >
> > > > > > E.g. now if one wants to use custom offset management, any of the
> > > Kafka
> > > > > > monitoring tools cannot see the offsets - they would need to use
> > same
> > > > > > custom client implementation which is practically not possible.
> > > > > >
> > > > > > Kind regards,
> > > > > > Stevo Slavic.
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: At-least-once guarantees with high-level consumer

2015-06-17 Thread Stevo Slavić
With auto-commit one can only have at-most-once delivery guarantee - after
commit but before message is delivered for processing, or even after it is
delivered but before it is processed, things can fail, causing event not to
be processed, which is basically same outcome as if it was not delivered.

On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann  wrote:

> Hi
>
> ** Disclaimer: I know there's a new consumer API on the way, this mail is
> about the currently available API. I also apologise if the below has
> already been discussed previously. I did try to check previous discussions
> on ConsumerIterator **
>
> It seems to me that the high-level consumer would be able to support
> at-least-once messaging, even if one uses auto-commit, by changing
> kafka.consumer.ConsumerIterator.next() to call
> currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
> consumer thread for a KafkaStream could just loop:
>
> while (true) {
> MyMessage message = iterator.next().message();
> process(message);
> }
>
> Each call to "iterator.next()" then updates the offset to commit to the end
> of the message that was just processed. When offsets are committed for the
> ConsumerConnector (either automatically or manually), the commit will not
> include offsets of messages that haven't been fully processed.
>
> I've tested the following ConsumerIterator.next(), and it seems to work as
> I expect:
>
>   override def next(): MessageAndMetadata[K, V] = {
> // New code: reset consumer offset to the end of the previously
> consumed message:
> if (consumedOffset > -1L && currentTopicInfo != null) {
> currentTopicInfo.resetConsumeOffset(consumedOffset)
> val topic = currentTopicInfo.topic
> trace("Setting %s consumed offset to %d".format(topic,
> consumedOffset))
> }
>
> // Old code, excluding reset:
> val item = super.next()
> if(consumedOffset < 0)
>   throw new KafkaException("Offset returned by the message set is
> invalid %d".format(consumedOffset))
> val topic = currentTopicInfo.topic
> consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> item
>   }
>
> I've seen several people asking about managing commit offsets manually with
> the high level consumer. I suspect that this approach (the modified
> ConsumerIterator) would scale better than having a separate
> ConsumerConnecter per stream just so that you can commit offsets with
> at-least-once semantics. The downside of this approach is more duplicate
> deliveries after recovery from hard failure (but this is "at least once",
> right, not "exactly once").
>
> I don't propose that the code necessarily be changed like this in trunk, I
> just want to know if the approach seems reasonable.
>
> Regards
> Carl Heymann
>


Re: how to modify offsets stored in Kafka in 0.8.2.1 version?

2015-06-19 Thread Stevo Slavić
Hello Marina,

There's Kafka API to fetch and commit offsets
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
- maybe it will work for you.

Kind regards,
Stevo Slavic.

On Fri, Jun 19, 2015 at 3:23 PM, Marina  wrote:

> Hi,
>
> in older Kafka versions where offsets were stored in Zookeeper - I could
> manually update the value of the Zookeeper's node:
>
> /consumers//offsets///.
>
> In 0.8.2.1 - there are no values in offsets anymore, but there is a new
> topic,
> __consumer_offsets, where as I understand offsets are tracked now.
>
> the ConsumerOffsetChecker tool seems to be able to get the offsets values
> from this topic , since I see correct value running it.
> So, how do I access this info myself?
>
>
> I tried:
>
> ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> __consumer_offsets --from-beginning
>
> but it does not show anything
> Also, how would I change the offset? I need to do this sometimes if I want
> to skip/ignore some messages and just advance offset manually.
>
> thanks,
> Marina
>


Re: How can I unsubscribe from this mailing list

2015-06-25 Thread Stevo Slavić
I believe it's: users-unsubscr...@kafka.apache.org

Maybe http://kafka.apache.org/contact.html could be updated with
unsubscribe info.

Kind regards,
Stevo Slavic.

On Thu, Jun 25, 2015 at 9:25 AM, Monika Garg  wrote:

> Hi
>
> I want to unsubscribe from this mailing list.
>
> I have sent the unsubscription mail on below two ids :
>
> kafka-users-unsubscr...@incubator.apache.org
> unsubscr...@kafka.apache.org
>
> But delivery to these two addresses getting failed.
>
> Please help me how can I subscribe from this mailing list.
>
>
> --
> *Moniii*
>


Indication in FetchResponse that fetch size is too small

2015-07-02 Thread Stevo Slavić
Hello Apache Kafka community,

Couldn't broker return a special error code in FetchResponse for a given
partition(s) where it detects that there was something to return/read from
partition but actual FetchResponse contains no messages for that partition
since fetch size in FetchRequest for that partition is too small?

I don't see it already in existing Kafka wire protocol docs
.
It doesn't seem to be supported, since docs for broker and consumer
configuration mention that the maximums should be in sync, otherwise too
large message could block consumer.

If it was available this feature would be useful for simple consumer API
users, in the cases when range of possible message sizes on same topic is
relatively large (e.g. most very small messages, but some small fraction of
relatively large messages). Would help with being able to  soft and hard
maximums, so most of the time one could read with soft max, and only in
case of this error code is on, attempt a read using hard max limit.

Assuming this feature is not there, I guess what would need to be done at
minimum is:
- minimal change in protocol and docs, just one new error code
- accompanying detection of this condition on broker side, so extend
handling of FetchRequest
- maybe in high level consumer make use of this error code in a special way
(e.g. just log a warning/error that there is potential misconfiguration).

Kind regards,
Stevo Slavic.


Re: Indication in FetchResponse that fetch size is too small

2015-07-03 Thread Stevo Slavić
Thanks for heads up Joel!

Kind regards,
Stevo Slavic.

On Thu, Jul 2, 2015 at 10:17 PM, Joel Koshy  wrote:

> A somewhat related request came up in another thread and I think it is
> reasonable to provide this. However, there are already some indicators
> that you can use:
> - The consumer iterator throws a MessageSizeTooLargeException if it
>   cannot extract any messages out of the next chunk.
> - If you are using the simple consumer, the fetch response includes
>   the high watermark - so if the HW > your fetch offset but there are
>   no iterable messages, then you will know that your fetch size needs
>   to increase.
>
> Thanks,
>
> Joel
>
> On Thu, Jul 02, 2015 at 05:32:20PM +0200, Stevo Slavić wrote:
> > Hello Apache Kafka community,
> >
> > Couldn't broker return a special error code in FetchResponse for a given
> > partition(s) where it detects that there was something to return/read
> from
> > partition but actual FetchResponse contains no messages for that
> partition
> > since fetch size in FetchRequest for that partition is too small?
> >
> > I don't see it already in existing Kafka wire protocol docs
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >.
> > It doesn't seem to be supported, since docs for broker and consumer
> > configuration mention that the maximums should be in sync, otherwise too
> > large message could block consumer.
> >
> > If it was available this feature would be useful for simple consumer API
> > users, in the cases when range of possible message sizes on same topic is
> > relatively large (e.g. most very small messages, but some small fraction
> of
> > relatively large messages). Would help with being able to  soft and hard
> > maximums, so most of the time one could read with soft max, and only in
> > case of this error code is on, attempt a read using hard max limit.
> >
> > Assuming this feature is not there, I guess what would need to be done at
> > minimum is:
> > - minimal change in protocol and docs, just one new error code
> > - accompanying detection of this condition on broker side, so extend
> > handling of FetchRequest
> > - maybe in high level consumer make use of this error code in a special
> way
> > (e.g. just log a warning/error that there is potential misconfiguration).
> >
> > Kind regards,
> > Stevo Slavic.
>
>


Re: failing to fetch topic metadata

2015-07-05 Thread Stevo Slavić
Hello Emanuele,

>From logs it seems that auto.create.topics.enable is not overriden for the
embedded broker.
It also seems that test is explicitly creating topic before publishing
message to it.
Consider commenting out explicit topic creation and rely on implicit topic
creation.

Kind regards,
Stevo Slavic.

On Sun, Jul 5, 2015 at 11:32 AM, Emanuele Blanco 
wrote:

> Hi,
>
> When trying to send a message to a local Kafka (created via
> KafkaServerStartable), I'm getting loads of
>
>  (Logging.scala:83) - Error while fetching metadata [{TopicMetadata for
> topic test_topic ->
> No partition metadata for topic test_topic due to
> kafka.common.LeaderNotAvailableException}] for topic [test_topic]: class
> kafka.common.LeaderNotAvailableException
>
> and I'm failing to send the message. I started ZooKeeper using
> NIOServerCnxnFactory and a standalone ZooKeeperServer. I set
> advertised.host.name in the Kafka properties and I've been using 127.0.0.1
> everywhere instead of localhost to avoid ipv6 problems.
>
> From what I can see from the logs though - available at
> https://gist.github.com/manub/06b5beedf6ece9701655 - the topic is created,
> but somehow I'm not able to get the metadata while sending a message.
>
> Any suggestions in what I'm doing wrong?
>
> Many thanks
>
> Emanu
> ​​
> ele Blanco
> Twitter - @manub 
>


Kafka settings for (more) reliable/durable messaging

2015-07-07 Thread Stevo Slavić
Hello Apache Kafka community,

Documentation for min.insync.replicas in
http://kafka.apache.org/documentation.html#brokerconfigs states:

"When used together, min.insync.replicas and request.required.acks allow
you to enforce greater durability guarantees. A typical scenario would be
to create a topic with a replication factor of 3, set min.insync.replicas
to 2, and produce with request.required.acks of -1. This will ensure that
the producer raises an exception if a majority of replicas do not receive a
write."

Correct me if wrong (doc reference?), I assume min.insync.replicas includes
lead, so with min.insync.replicas=2, lead and one more replica besides lead
will have to ACK writes.

In such setup, with minimalistic 3 brokers cluster, given that
- all 3 replicas are insync
- a batch of messages is written and ends up on lead and one replica ACKs
- another batch of messages ends up on lead and different replica ACKs

Is it possible that when lead crashes, while replicas didn't catch up,
(part of) one batch of messages could be lost (since one replica becomes a
new lead, and it's only serving all reads and requests, and replication is
one way)?

Kind regards,
Stevo Slavic.


Re: Kafka settings for (more) reliable/durable messaging

2015-07-07 Thread Stevo Slavić
Hello Gwen,

Thanks for fast response!

Btw, congrats on officially becoming a Kafka committer and thanks, among
other things, for great "Intro to Kafka" video
http://shop.oreilly.com/product/0636920038603.do !

Have to read more docs and/or source. I thought this scenario is possible
because replica can fall behind (replica.lag.max.messages) and still be
considered ISR. Then I assumed also write can be ACKed by any ISR, and then
why not by one which has fallen more behind.

Kind regards,
Stevo Slavic.

On Tue, Jul 7, 2015 at 4:47 PM, Gwen Shapira  wrote:

> I am not sure "different replica" can ACK the second back of messages
> while not having the first - from what I can see, it will need to be
> up-to-date on the latest messages (i.e. correct HWM) in order to ACK.
>
> On Tue, Jul 7, 2015 at 7:13 AM, Stevo Slavić  wrote:
> > Hello Apache Kafka community,
> >
> > Documentation for min.insync.replicas in
> > http://kafka.apache.org/documentation.html#brokerconfigs states:
> >
> > "When used together, min.insync.replicas and request.required.acks allow
> > you to enforce greater durability guarantees. A typical scenario would be
> > to create a topic with a replication factor of 3, set min.insync.replicas
> > to 2, and produce with request.required.acks of -1. This will ensure that
> > the producer raises an exception if a majority of replicas do not
> receive a
> > write."
> >
> > Correct me if wrong (doc reference?), I assume min.insync.replicas
> includes
> > lead, so with min.insync.replicas=2, lead and one more replica besides
> lead
> > will have to ACK writes.
> >
> > In such setup, with minimalistic 3 brokers cluster, given that
> > - all 3 replicas are insync
> > - a batch of messages is written and ends up on lead and one replica ACKs
> > - another batch of messages ends up on lead and different replica ACKs
> >
> > Is it possible that when lead crashes, while replicas didn't catch up,
> > (part of) one batch of messages could be lost (since one replica becomes
> a
> > new lead, and it's only serving all reads and requests, and replication
> is
> > one way)?
> >
> > Kind regards,
> > Stevo Slavic.
>


Re: Kafka settings for (more) reliable/durable messaging

2015-07-07 Thread Stevo Slavić
Thanks for heads up and code reference!

Traced back required offset to
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L303

Have to investigate more, but from initial check was expecting to see there
reference to "replica.lag.max.messages" (so even when replica is between 0
and maxLagMessages behind to be considered on required offset to be
considered as insync). Searching through trunk cannot find where in main
code is "replica.lag.max.messages" configuration property used.

Used search query
https://github.com/apache/kafka/search?utf8=%E2%9C%93&q=%22replica.lag.max.messages%22&type=Code

Maybe it's going to be removed in next release?!

Time based lag is still there.

Anyway, if I understood correctly, with request.required.acks=-1, when a
message/batch is published, it's first written to lead, then other
partition replicas either continuously poll and get in sync with lead, or
through zookeeper get notified that they are behind and poll and get in
sync with lead, and as soon as enough (min.insync.replicas - 1) replicas
are detected to be fully in sync with lead, ACK is sent to producer (unless
timeout occurs first).

On Tue, Jul 7, 2015 at 5:15 PM, Gwen Shapira  wrote:

> Ah, I think I see the confusion: Replicas don't actually ACK at all.
> What happens is that the replica manager waits for enough ISR replicas
> to reach the correct offset
> Partition.checkEnoughReplicasReachOffset(...) has this logic. A
> replica can't reach offset of second batch, without first having
> written the first batch. So I believe we are safe in this scenario.
>
> Gwen
>
> On Tue, Jul 7, 2015 at 8:01 AM, Stevo Slavić  wrote:
> > Hello Gwen,
> >
> > Thanks for fast response!
> >
> > Btw, congrats on officially becoming a Kafka committer and thanks, among
> > other things, for great "Intro to Kafka" video
> > http://shop.oreilly.com/product/0636920038603.do !
> >
> > Have to read more docs and/or source. I thought this scenario is possible
> > because replica can fall behind (replica.lag.max.messages) and still be
> > considered ISR. Then I assumed also write can be ACKed by any ISR, and
> then
> > why not by one which has fallen more behind.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Tue, Jul 7, 2015 at 4:47 PM, Gwen Shapira 
> wrote:
> >
> >> I am not sure "different replica" can ACK the second back of messages
> >> while not having the first - from what I can see, it will need to be
> >> up-to-date on the latest messages (i.e. correct HWM) in order to ACK.
> >>
> >> On Tue, Jul 7, 2015 at 7:13 AM, Stevo Slavić  wrote:
> >> > Hello Apache Kafka community,
> >> >
> >> > Documentation for min.insync.replicas in
> >> > http://kafka.apache.org/documentation.html#brokerconfigs states:
> >> >
> >> > "When used together, min.insync.replicas and request.required.acks
> allow
> >> > you to enforce greater durability guarantees. A typical scenario
> would be
> >> > to create a topic with a replication factor of 3, set
> min.insync.replicas
> >> > to 2, and produce with request.required.acks of -1. This will ensure
> that
> >> > the producer raises an exception if a majority of replicas do not
> >> receive a
> >> > write."
> >> >
> >> > Correct me if wrong (doc reference?), I assume min.insync.replicas
> >> includes
> >> > lead, so with min.insync.replicas=2, lead and one more replica besides
> >> lead
> >> > will have to ACK writes.
> >> >
> >> > In such setup, with minimalistic 3 brokers cluster, given that
> >> > - all 3 replicas are insync
> >> > - a batch of messages is written and ends up on lead and one replica
> ACKs
> >> > - another batch of messages ends up on lead and different replica ACKs
> >> >
> >> > Is it possible that when lead crashes, while replicas didn't catch up,
> >> > (part of) one batch of messages could be lost (since one replica
> becomes
> >> a
> >> > new lead, and it's only serving all reads and requests, and
> replication
> >> is
> >> > one way)?
> >> >
> >> > Kind regards,
> >> > Stevo Slavic.
> >>
>


Re: Kafka settings for (more) reliable/durable messaging

2015-07-07 Thread Stevo Slavić
Great feedback, thank you very much to both!

Kind regards,
Stevo Slavic.

On Tue, Jul 7, 2015 at 7:33 PM, Jiangjie Qin 
wrote:

> The replica lag definition now is time based, so as long as a replica can
> catch up with leader in replica.lag.time.max.ms, it is in ISR, no matter
> how many messages it is behind.
>
> And yes, your understanding is correct - ACK is sent back either when all
> replica in ISR got the message or the request timeout.
>
> I had some related slides here might help a bit.
> http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kaf
> ka-49753844
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 7/7/15, 9:28 AM, "Stevo Slavić"  wrote:
>
> >Thanks for heads up and code reference!
> >
> >Traced back required offset to
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/serve
> >r/ReplicaManager.scala#L303
> >
> >Have to investigate more, but from initial check was expecting to see
> >there
> >reference to "replica.lag.max.messages" (so even when replica is between 0
> >and maxLagMessages behind to be considered on required offset to be
> >considered as insync). Searching through trunk cannot find where in main
> >code is "replica.lag.max.messages" configuration property used.
> >
> >Used search query
> >
> https://github.com/apache/kafka/search?utf8=%E2%9C%93&q=%22replica.lag.max
> >.messages%22&type=Code
> >
> >Maybe it's going to be removed in next release?!
> >
> >Time based lag is still there.
> >
> >Anyway, if I understood correctly, with request.required.acks=-1, when a
> >message/batch is published, it's first written to lead, then other
> >partition replicas either continuously poll and get in sync with lead, or
> >through zookeeper get notified that they are behind and poll and get in
> >sync with lead, and as soon as enough (min.insync.replicas - 1) replicas
> >are detected to be fully in sync with lead, ACK is sent to producer
> >(unless
> >timeout occurs first).
> >
> >On Tue, Jul 7, 2015 at 5:15 PM, Gwen Shapira 
> >wrote:
> >
> >> Ah, I think I see the confusion: Replicas don't actually ACK at all.
> >> What happens is that the replica manager waits for enough ISR replicas
> >> to reach the correct offset
> >> Partition.checkEnoughReplicasReachOffset(...) has this logic. A
> >> replica can't reach offset of second batch, without first having
> >> written the first batch. So I believe we are safe in this scenario.
> >>
> >> Gwen
> >>
> >> On Tue, Jul 7, 2015 at 8:01 AM, Stevo Slavić  wrote:
> >> > Hello Gwen,
> >> >
> >> > Thanks for fast response!
> >> >
> >> > Btw, congrats on officially becoming a Kafka committer and thanks,
> >>among
> >> > other things, for great "Intro to Kafka" video
> >> > http://shop.oreilly.com/product/0636920038603.do !
> >> >
> >> > Have to read more docs and/or source. I thought this scenario is
> >>possible
> >> > because replica can fall behind (replica.lag.max.messages) and still
> >>be
> >> > considered ISR. Then I assumed also write can be ACKed by any ISR, and
> >> then
> >> > why not by one which has fallen more behind.
> >> >
> >> > Kind regards,
> >> > Stevo Slavic.
> >> >
> >> > On Tue, Jul 7, 2015 at 4:47 PM, Gwen Shapira 
> >> wrote:
> >> >
> >> >> I am not sure "different replica" can ACK the second back of messages
> >> >> while not having the first - from what I can see, it will need to be
> >> >> up-to-date on the latest messages (i.e. correct HWM) in order to ACK.
> >> >>
> >> >> On Tue, Jul 7, 2015 at 7:13 AM, Stevo Slavić 
> >>wrote:
> >> >> > Hello Apache Kafka community,
> >> >> >
> >> >> > Documentation for min.insync.replicas in
> >> >> > http://kafka.apache.org/documentation.html#brokerconfigs states:
> >> >> >
> >> >> > "When used together, min.insync.replicas and request.required.acks
> >> allow
> >> >> > you to enforce greater durability guarantees. A typical scenario
> >> would be
> >> >> > to create a topic with a replication factor of 3, set
> >> min.insync.replicas
> >> >> > to 2, and produce with request.required.acks of -1. This wi

Re: kafka connection with zookeeper

2015-07-08 Thread Stevo Slavić
Hello Gaurav,

There are several Zookeeper client connection configuration properties you
can tune for Kafka brokers. They are document @
http://kafka.apache.org/documentation.html#brokerconfigs (all begin with
"zookeeper." prefix)

Did you change any already? I'd start with defaults (they can and sometimes
do change from version to version of Kafka).

Kind regards,
Stevo Slavic.

On Wed, Jul 8, 2015 at 11:59 AM, Gaurav Agarwal 
wrote:

> can we find from some api in kafka that how many number of connections
> we have kafka broker to zookeeper, as my kafka is getting down again
> and again .
>


Custom topic metadata

2015-07-12 Thread Stevo Slavić
Hello Apache Kafka Community,

Is it possible to store and retrieve additional custom topic metadata along
with existing Kafka managed ones, using some Kafka API? If not would it be
a problem (e.g. for Kafka broker or some client APIs) if I was to
store/retrieve additional custom topic metadata using ZooKeeper API?

Kind regards,
Stevo Slavic.


Re: Delete topic using Admintools is not working

2015-07-16 Thread Stevo Slavić
Hello Sivananda,

Calling AdminUtils.deleteTopic just requests topic to be deleted - it does
not actually delete topic immediately. Requests for topic deletion get
saved in ZooKeeper as a node (named by topic name), under
/admin/delete_topics node.

If brokers in the cluster are configured with topic deletion enabled, and
if they are running, they will notice requests to delete the topic, perform
actual deletion (all partitions on all brokers, lead and replicas, as well
as deletion of topic metadata in zookeeper), and also delete saved request
for topic deletion (that node under /admin/delete_topics). This broker side
part of topic deletion process is only working in Kafka 0.8.2.x.

>From your stack trace it seems you've requested topic deletion at least
twice, while topic has not been deleted yet, so zk client used by admin
utils, throws exception signalling that the node (request for topic
deletion) already exists.

If topic is not getting deleted even after a while I'd suggest that you
check version, configuration and running status of your Kafka cluster.

Kind regards,
Stevo Slavic.

On Fri, Jul 17, 2015 at 12:40 AM, Sivananda Reddy 
wrote:

> Hi,
>
> As per the jira https://issues.apache.org/jira/browse/KAFKA-1737, I have
> set ZKStringSerializer, while creating the ZkClient and still the delete
> topic function call is not working:
>
> Version: kafka_2.11-0.8.2.1
>
> *My code looks like this*:
> ZkClient zkClient = new ZkClient("localhost:2181", 6000, 6000,
> ZKStringSerializer$.MODULE$);
> AdminUtils.deleteTopic(zkClient, "testTopic17");
>
> *I am getting the following exception's*:
> Exception in thread "main"
> org.I0Itec.zkclient.exception.ZkNodeExistsException:
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
> NodeExists for /admin/delete_topics/testTopic17
> at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:55)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
> at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:243)
> at kafka.utils.ZkUtils$.createPersistentPath(ZkUtils.scala:306)
> at kafka.admin.AdminUtils$.deleteTopic(AdminUtils.scala:159)
> at kafka.admin.AdminUtils.deleteTopic(AdminUtils.scala)
> at Test.deleteTopic(Test.java:98)
> at Test.main(Test.java:76)
> Caused by: org.apache.zookeeper.KeeperException$NodeExistsException:
> KeeperErrorCode = NodeExists for /admin/delete_topics/testTopic17
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
> at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
> at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
> at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> ... 7 more
>
> *The corresponding zookeeper logs*:
> [2015-07-16 15:31:14,383] INFO Accepted socket connection from /
> 127.0.0.1:23363 (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2015-07-16 15:31:14,385] INFO Client attempting to establish new session
> at /127.0.0.1:23363 (org.apache.zookeeper.server.ZooKeeperServer)
> [2015-07-16 15:31:14,391] INFO Established session 0x14e98d2ce98000c with
> negotiated timeout 6000 for client /127.0.0.1:23363
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2015-07-16 15:31:14,510] INFO Got user-level KeeperException when
> processing sessionid:0x14e98d2ce98000c type:create cxid:0x1 zxid:0x963
> txntype:-1 reqpath:n/a Error Path:/admin/delete_topics/testTopic17
> Error:KeeperErrorCode = NodeExists for /admin/delete_topics/testTopic17
> (org.apache.zookeeper.server.PrepRequestProcessor)
> [2015-07-16 15:31:14,842] WARN caught end of stream exception
> (org.apache.zookeeper.server.NIOServerCnxn)
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x14e98d2ce98000c, likely client has closed socket
> at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> at java.lang.Thread.run(Thread.java:744)
> [2015-07-16 15:31:14,843] INFO Closed socket connection for client /
> 127.0.0.1:23363 which had sessionid 0x14e98d2ce98000c
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2015-07-16 15:31:21,000] INFO Expiring session 0x14e98d2ce98000c, timeout
> of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2015-07-16 15:31:21,001] INFO Processed session termination for sessionid:
> 0x14e98d2ce98000c (org.apache.zookeeper.server.PrepRequestProcessor)
>
> Could someone please let me know if I am missing anything?
>
> Thank you,
> Siva.
>


New consumer - consumer group init

2015-07-17 Thread Stevo Slavić
Hello Apache Kafka community,

In new KafkaConsumer API on trunk, it seems it's only possible to define
consumer group id at construction time of KafkaConsumer, through property
with group.id key.

Would it make sense and be possible to support setting/changing consumer
group id after construction, but before it's actually used for the first
time, similar to how subscription is supported through "public void
subscribe(String... topics)"?

Maybe this can be done through additional method "public void
subscribe(String consumerGroupId, String... topics)" which would first set
provided consumer group id in coordinator and then call "public void
subscribe(String... topics)".

Kind regards,
Stevo Slavic.


Re: New consumer - consumer group init

2015-07-19 Thread Stevo Slavić
Hello Guozhang,

It would be enough if consumer group could, besides at construction time,
be set once only after construction. Have to retest, but high level
consumer in 0.8.1 used to be very heavy weight object (lots of threads
started, and it would block and take time to construct it). It's
understandable, considering all of the high level features it has, and
since it's supposed to be long living object. What would improve with this
change is that construction penalty could be paid upfront, while topic
subscription and joining consumer group ensemble could be done on first
use, so that first use does not have to suffer from both init and
subscription penalties.

It would be nice also if consumer group just as subscription could be
changed later even, so multiple times throughout lifetime of high level
consumer instance, to avoid constructing new consumer when instance purpose
changes.

After looking more into the HLC API, thought maybe this is not needed,
since there is "public void subscribe(TopicPartition... partitions)" which
does not use consumer group management, but problem is that there is no
matching explicit commit where one could pass consumer group parameter as
well, to label for which consumer group should offset(s) be committed.

Seems like new HLC has split personality. Maybe (at least) two APIs could
have been provided instead of one with such differing behaviors.

Kind regards,
Stevo Slavic.

On Sun, Jul 19, 2015 at 12:01 AM, Guozhang Wang  wrote:

> Hi Stevo,
>
> Hmm this is interesting, do you have any use cases in mind that need
> dynamic group changing?
>
> Guozhang
>
> On Fri, Jul 17, 2015 at 11:13 PM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > In new KafkaConsumer API on trunk, it seems it's only possible to define
> > consumer group id at construction time of KafkaConsumer, through property
> > with group.id key.
> >
> > Would it make sense and be possible to support setting/changing consumer
> > group id after construction, but before it's actually used for the first
> > time, similar to how subscription is supported through "public void
> > subscribe(String... topics)"?
> >
> > Maybe this can be done through additional method "public void
> > subscribe(String consumerGroupId, String... topics)" which would first
> set
> > provided consumer group id in coordinator and then call "public void
> > subscribe(String... topics)".
> >
> > Kind regards,
> > Stevo Slavic.
> >
>
>
>
> --
> -- Guozhang
>


Re: New consumer - consumer group init

2015-07-21 Thread Stevo Slavić
Thanks all for fast feedback!

It's great news if that aspect is improved as well in new HLC. I will test
and get back with any related findings.

Kind regards,
Stevo Slavic.

On Mon, Jul 20, 2015 at 9:57 PM, Guozhang Wang  wrote:

> Hi Stevo,
>
> I am still not very clear on your point yet, I guess I was trying to figure
> out under which circumstances would users prefer to re-set the group id at
> an existing consumer rather than creating a new instance. As Jason
> mentioned, since the new consumer is single threaded it should usually be
> cheap to construct.
>
> Guozhang
>
> On Mon, Jul 20, 2015 at 11:06 AM, Jason Gustafson 
> wrote:
>
> > Hey Stevo,
> >
> > The new consumer doesn't have any threads of its own, so I think
> > construction should be fairly cheap.
> >
> > -Jason
> >
> > On Sun, Jul 19, 2015 at 2:13 PM, Stevo Slavić  wrote:
> >
> > > Hello Guozhang,
> > >
> > > It would be enough if consumer group could, besides at construction
> time,
> > > be set once only after construction. Have to retest, but high level
> > > consumer in 0.8.1 used to be very heavy weight object (lots of threads
> > > started, and it would block and take time to construct it). It's
> > > understandable, considering all of the high level features it has, and
> > > since it's supposed to be long living object. What would improve with
> > this
> > > change is that construction penalty could be paid upfront, while topic
> > > subscription and joining consumer group ensemble could be done on first
> > > use, so that first use does not have to suffer from both init and
> > > subscription penalties.
> > >
> > > It would be nice also if consumer group just as subscription could be
> > > changed later even, so multiple times throughout lifetime of high level
> > > consumer instance, to avoid constructing new consumer when instance
> > purpose
> > > changes.
> > >
> > > After looking more into the HLC API, thought maybe this is not needed,
> > > since there is "public void subscribe(TopicPartition... partitions)"
> > which
> > > does not use consumer group management, but problem is that there is no
> > > matching explicit commit where one could pass consumer group parameter
> as
> > > well, to label for which consumer group should offset(s) be committed.
> > >
> > > Seems like new HLC has split personality. Maybe (at least) two APIs
> could
> > > have been provided instead of one with such differing behaviors.
> > >
> > > Kind regards,
> > > Stevo Slavic.
> > >
> > > On Sun, Jul 19, 2015 at 12:01 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hi Stevo,
> > > >
> > > > Hmm this is interesting, do you have any use cases in mind that need
> > > > dynamic group changing?
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Jul 17, 2015 at 11:13 PM, Stevo Slavić 
> > > wrote:
> > > >
> > > > > Hello Apache Kafka community,
> > > > >
> > > > > In new KafkaConsumer API on trunk, it seems it's only possible to
> > > define
> > > > > consumer group id at construction time of KafkaConsumer, through
> > > property
> > > > > with group.id key.
> > > > >
> > > > > Would it make sense and be possible to support setting/changing
> > > consumer
> > > > > group id after construction, but before it's actually used for the
> > > first
> > > > > time, similar to how subscription is supported through "public void
> > > > > subscribe(String... topics)"?
> > > > >
> > > > > Maybe this can be done through additional method "public void
> > > > > subscribe(String consumerGroupId, String... topics)" which would
> > first
> > > > set
> > > > > provided consumer group id in coordinator and then call "public
> void
> > > > > subscribe(String... topics)".
> > > > >
> > > > > Kind regards,
> > > > > Stevo Slavic.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


New consumer - ConsumerRecords partitions

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

New HLC poll returns ConsumerRecords.

Do ConsumerRecords contain records for every partition that HLC is actively
subscribed on for every poll request, or does it contain only records for
partitions which had messages and which were retrieved in poll request?

If latter, then please consider adding a method to ConsumerRecords class,
"public Iterable getPartitions()" that the ConsumerRecords
has. I could provide a PR.

Kind regards,
Stevo Slavic.


New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
I'm not sure what the outcome will be, what is expected in following
scenario:

- kafkaConsumer is instantiated with auto-commit off
- kafkaConsumer.subscribe(someTopic)
- kafkaConsumer.position is called for every TopicPartition HLC is actively
subscribed on

and then when doing multiple poll calls in succession (without calling
commit), does seek have to be called in between poll calls to position HLC
to skip what was read in previous poll, or does HLC keep that state
(position after poll) in memory, so that next poll (without seek in between
two poll calls) will continue from where last poll stopped?

Could be it's just me not understanding this from javadoc. If not, maybe
javadoc can be improved to make this (even) more obvious.

Kind regards,
Stevo Slavic.


New producer hangs inifitely when it looses connection to Kafka cluster

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

Just noticed that :
- message is successfully published using new 0.8.2.1 producer
- and then Kafka is stopped

next attempt to publish message using same instance of new producer hangs
forever, and following stacktrace gets logged repeatedly:

[WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/
127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
~[na:1.8.0_31]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
[kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]


I expect producer to respect timeout settings even in this connection lost
scenario.

Is this a known bug? Is there something I can do/configure as a workaround?

Kind regards,
Stevo Slavic.


New consumer - offset storage options

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

It seems new high level consumer coming in 0.8.3 will support only offset
storage in Kafka topic.
Can somebody please confirm/comment?

Kind regards,
Stevo Slavic.


New consumer - partitions auto assigned only on poll

2015-07-22 Thread Stevo Slavić
Hello Apache Kafka community,

In the new consumer I encountered unexpected behavior. After constructing
KafakConsumer instance with configured consumer rebalance callback handler,
and subscribing to a topic with "consumer.subscribe(topic)", retrieving
subscriptions would return empty set and callback handler would not get
called (no partitions ever assigned or revoked), no matter how long
instance was up.

Then I found by inspecting KafkaConsumer code that partition assignment
will only be triggered on first poll, pollOnce has:

// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();

Would it make sense to include this fragment in KafkaConsumer.subscriptions
accessor as well?

Kind regards,
Stevo Slavic.


Re: New consumer - ConsumerRecords partitions

2015-07-22 Thread Stevo Slavić
Hello Jason,

Thanks for feedback. I've created JIRA ticket for feature request:
https://issues.apache.org/jira/browse/KAFKA-2356

Kind regards,
Stevo Slavic.

On Tue, Jul 21, 2015 at 6:17 PM, Jason Gustafson  wrote:

> Hey Stevo,
>
> I think ConsumerRecords only contains the partitions which had messages.
> Would you mind creating a jira for the feature request? You're welcome to
> submit a patch as well.
>
> -Jason
>
> On Tue, Jul 21, 2015 at 2:27 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > New HLC poll returns ConsumerRecords.
> >
> > Do ConsumerRecords contain records for every partition that HLC is
> actively
> > subscribed on for every poll request, or does it contain only records for
> > partitions which had messages and which were retrieved in poll request?
> >
> > If latter, then please consider adding a method to ConsumerRecords class,
> > "public Iterable getPartitions()" that the
> ConsumerRecords
> > has. I could provide a PR.
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: High Level Consumer use case

2015-07-22 Thread Stevo Slavić
Hello Sreenivasulu,

It's automatic, just start them, and as each HLC starts it registers in ZK,
rebalancing of the HLC to partition assignments happens.

Be gentle when starting consumers, there is a bug reported that if multiple
HLCs are started in short time, some of them may end up without any
partition assigned, while others could have more than one assigned (see
https://issues.apache.org/jira/browse/KAFKA-2329 ).

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 6:02 AM, Sreenivasulu NALLAPATI <
sreenivasulu.nallap...@amadeus.com> wrote:

>  Hello,
>
>
>
> I would like to introduce myself first. I am Sreeni working as a technical
> lead with Amadeus. We planned to use Kafka for our billing messages.
>
>
>
> Here is my use case:
>
> We wanted to run a batch job every 15min with High Level Consumer by
> changing *auto.commit.enable=false. *
>
> For now, we have a topic with 20 partitions and we would like to start 20
> consumers one per topic. In the future the partition count may go up and
> for scalability we want to use High Level Consumer.
>
> We were unable to find a best way to do this with High Level Consumer.
>
> Can you please suggest how to map one consumer to one/more partitions? We
> are using Java Consumer API for this activity.
>
>
>
> Thanks
> *Sreenivasulu Nallapati*
> * Senior Member Technical Staff*
> R&D-ALB-AQG-DUI-LBD
> Amadeus Software Labs, India Pvt. Ltd
> T: 4914 6827
> M: +919902065578
> sreenivasulu.nallap...@amadeus.com
>
>
>


Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-23 Thread Stevo Slavić
Thanks Ewen for heads up.

It's great that seek is not needed in between poll when business goes as
usual.

In edge case, when my logic detects it needs to go back and reread events
from given position in history, I use seek. I found out that next poll
after seek will not respect offset used in seek. It is strange that event
Consumer.position returns same offset that seek has set for the consumer
instance, but poll still does not return messages starting from that offset.

E.g. say there are 5 messages published to a single partition of a single
topic. Consumer subscribes to that topic partition, with smallest/earliest
offset reset strategy configured, and consumer.position confirms that the
consumer is at position 0.
Then poll is issued and it returns all 5 messages. Logic processes
messages, detects it needs to go back in history to position 0, it does not
commit messages but calls seek to 0, position confirms consumer is at
offset 0. Next poll does not return any messages.

So seek is not really working what it should do, according to its javadoc:

/**
 * Overrides the fetch offsets that the consumer will use on the next
{@link #poll(long) poll(timeout)}. If this API
 * is invoked for the same partition more than once, the latest offset will
be used on the next poll(). Note that
 * you may lose data if this API is arbitrarily used in the middle of
consumption, to reset the fetch offsets
 */

I've checked also, calling seek multiple times does not help to get poll to
use offset set with last seek.
Could be something is wrong with poll implementation, making it not respect
offset set with seek.

Kind regards,
Stevo Slavic.


On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava 
wrote:

> It should just continue consuming using the existing offsets. It'll have to
> refresh metadata to pick up the leadership change, but once it does it can
> just pick up where consumption from the previous leader stopped -- all the
> ISRs should have the same data, so the new leader will have all the same
> data the previous leader had (assuming unclean leader election is not
> enabled).
>
> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng  wrote:
>
> >
> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava 
> > wrote:
> > >
> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić 
> wrote:
> > >
> > >> Hello Apache Kafka community,
> > >>
> > >> I find new consumer poll/seek javadoc a bit confusing. Just by reading
> > docs
> > >> I'm not sure what the outcome will be, what is expected in following
> > >> scenario:
> > >>
> > >> - kafkaConsumer is instantiated with auto-commit off
> > >> - kafkaConsumer.subscribe(someTopic)
> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
> > actively
> > >> subscribed on
> > >>
> > >> and then when doing multiple poll calls in succession (without calling
> > >> commit), does seek have to be called in between poll calls to position
> > HLC
> > >> to skip what was read in previous poll, or does HLC keep that state
> > >> (position after poll) in memory, so that next poll (without seek in
> > between
> > >> two poll calls) will continue from where last poll stopped?
> > >>
> > >
> > > The position is tracked in-memory within the consumer, so as long as
> > there
> > > isn't a consumer rebalance, consumption will just proceed with
> subsequent
> > > messages (i.e. the behavior I think most people would find intuitive).
> > > However, if a rebalance occurs (another consumer instance joins the
> group
> > > or some leave), then a partition may be assigned to an different
> consumer
> > > instance that has no idea about the current position and will restart
> > based
> > > on the offset reset setting (because attempting to fetch the committed
> > > offset will fail since no offsets have been committed).
> > >
> >
> > Ewen,
> >
> > What happens if there is a broker failure and a new broker becomes the
> > partition leader? Does the high level consumer start listening to the new
> > partition leader at the in-memory position, or does it restart based on
> > saved offsets?
> >
> > Thanks,
> > -James
> >
> > > -Ewen
> > >
> > >
> > >> Could be it's just me not understanding this from javadoc. If not,
> maybe
> > >> javadoc can be improved to make this (even) more obvious.
> > >>
> > >> Kind regards,
> > >> Stevo Slavic.
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> >
> >
>
>
> --
> Thanks,
> Ewen
>


Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-23 Thread Stevo Slavić
Strange, if after seek I make several poll requests, eventually it will
read/return messages from offset that seek set.

On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić  wrote:

> Thanks Ewen for heads up.
>
> It's great that seek is not needed in between poll when business goes as
> usual.
>
> In edge case, when my logic detects it needs to go back and reread events
> from given position in history, I use seek. I found out that next poll
> after seek will not respect offset used in seek. It is strange that event
> Consumer.position returns same offset that seek has set for the consumer
> instance, but poll still does not return messages starting from that offset.
>
> E.g. say there are 5 messages published to a single partition of a single
> topic. Consumer subscribes to that topic partition, with smallest/earliest
> offset reset strategy configured, and consumer.position confirms that the
> consumer is at position 0.
> Then poll is issued and it returns all 5 messages. Logic processes
> messages, detects it needs to go back in history to position 0, it does not
> commit messages but calls seek to 0, position confirms consumer is at
> offset 0. Next poll does not return any messages.
>
> So seek is not really working what it should do, according to its javadoc:
>
> /**
>  * Overrides the fetch offsets that the consumer will use on the next
> {@link #poll(long) poll(timeout)}. If this API
>  * is invoked for the same partition more than once, the latest offset
> will be used on the next poll(). Note that
>  * you may lose data if this API is arbitrarily used in the middle of
> consumption, to reset the fetch offsets
>  */
>
> I've checked also, calling seek multiple times does not help to get poll
> to use offset set with last seek.
> Could be something is wrong with poll implementation, making it not
> respect offset set with seek.
>
> Kind regards,
> Stevo Slavic.
>
>
> On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava 
> wrote:
>
>> It should just continue consuming using the existing offsets. It'll have
>> to
>> refresh metadata to pick up the leadership change, but once it does it can
>> just pick up where consumption from the previous leader stopped -- all the
>> ISRs should have the same data, so the new leader will have all the same
>> data the previous leader had (assuming unclean leader election is not
>> enabled).
>>
>> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng  wrote:
>>
>> >
>> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava > >
>> > wrote:
>>
>> > >
>> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić 
>> wrote:
>> > >
>> > >> Hello Apache Kafka community,
>> > >>
>> > >> I find new consumer poll/seek javadoc a bit confusing. Just by
>> reading
>> > docs
>> > >> I'm not sure what the outcome will be, what is expected in following
>> > >> scenario:
>> > >>
>> > >> - kafkaConsumer is instantiated with auto-commit off
>> > >> - kafkaConsumer.subscribe(someTopic)
>> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
>> > actively
>> > >> subscribed on
>> > >>
>> > >> and then when doing multiple poll calls in succession (without
>> calling
>> > >> commit), does seek have to be called in between poll calls to
>> position
>> > HLC
>> > >> to skip what was read in previous poll, or does HLC keep that state
>> > >> (position after poll) in memory, so that next poll (without seek in
>> > between
>> > >> two poll calls) will continue from where last poll stopped?
>> > >>
>> > >
>> > > The position is tracked in-memory within the consumer, so as long as
>> > there
>> > > isn't a consumer rebalance, consumption will just proceed with
>> subsequent
>> > > messages (i.e. the behavior I think most people would find intuitive).
>> > > However, if a rebalance occurs (another consumer instance joins the
>> group
>> > > or some leave), then a partition may be assigned to an different
>> consumer
>> > > instance that has no idea about the current position and will restart
>> > based
>> > > on the offset reset setting (because attempting to fetch the committed
>> > > offset will fail since no offsets have been committed).
>> > >
>> >
>> > Ewen,
>> >
>> > What happens if there is a broker failure and a new broker becomes the
>> > partition leader? Does the high level consumer start listening to the
>> new
>> > partition leader at the in-memory position, or does it restart based on
>> > saved offsets?
>> >
>> > Thanks,
>> > -James
>> >
>> > > -Ewen
>> > >
>> > >
>> > >> Could be it's just me not understanding this from javadoc. If not,
>> maybe
>> > >> javadoc can be improved to make this (even) more obvious.
>> > >>
>> > >> Kind regards,
>> > >> Stevo Slavic.
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Thanks,
>> > > Ewen
>> >
>> >
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>


Re: New consumer - partitions auto assigned only on poll

2015-07-24 Thread Stevo Slavić
Hello Jason,

Thanks for feedback. I've created ticket for this
https://issues.apache.org/jira/browse/KAFKA-2359

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 6:18 PM, Jason Gustafson  wrote:

> Hey Stevo,
>
> That's a good point. I think the javadoc is pretty clear that this could
> return no partitions when the consumer has no active assignment, but it may
> be a little unintuitive to have to call poll() after subscribing before you
> can get the assigned partitions. I can't think of a strong reason not to go
> ahead with the assignment in subscriptions() other than to keep it
> non-blocking. Perhaps you can open a ticket and we can get feedback from
> some other devs?
>
> Thanks,
> Jason
>
> On Wed, Jul 22, 2015 at 2:09 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > In the new consumer I encountered unexpected behavior. After constructing
> > KafakConsumer instance with configured consumer rebalance callback
> handler,
> > and subscribing to a topic with "consumer.subscribe(topic)", retrieving
> > subscriptions would return empty set and callback handler would not get
> > called (no partitions ever assigned or revoked), no matter how long
> > instance was up.
> >
> > Then I found by inspecting KafkaConsumer code that partition assignment
> > will only be triggered on first poll, pollOnce has:
> >
> > // ensure we have partitions assigned if we expect to
> > if (subscriptions.partitionsAutoAssigned())
> > coordinator.ensurePartitionAssignment();
> >
> > Would it make sense to include this fragment in
> KafkaConsumer.subscriptions
> > accessor as well?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


New consumer - offset one gets in poll is not offset one is supposed to commit

2015-07-24 Thread Stevo Slavić
Hello Apache Kafka community,

Say there is only one topic with single partition and a single message on
it.
Result of calling a poll with new consumer will return ConsumerRecord for
that message and it will have offset of 0.

After processing message, current KafkaConsumer implementation expects one
to commit not offset 0 as processed, but to commit offset 1 - next
offset/position one would like to consume.

Does this sound strange to you as well?

Wondering couldn't this offset+1 handling for next position to read been
done in one place, in KafkaConsumer implementation or broker or whatever,
instead of every user of KafkaConsumer having to do it.

Kind regards,
Stevo Slavic.


Re: New consumer - offset one gets in poll is not offset one is supposed to commit

2015-07-28 Thread Stevo Slavić
Hello Jason,

Thanks for reply!

About your proposal, in general case it might be helpful. In my case it
will not help much - I'm allowing each ConsumerRecord or subset of
ConsumerRecords to be processed and ACKed independently and out of HLC
process/thread (not to block partition), and then committing largest
consecutive ACKed processed offset (+1) since current last committed offset
per partition.

Kind regards,
Stevo Slavic.

On Mon, Jul 27, 2015 at 6:52 PM, Jason Gustafson  wrote:

> Hey Stevo,
>
> I agree that it's a little unintuitive that what you are committing is the
> next offset that should be read from and not the one that has already been
> read. We're probably constrained in that we already have a consumer which
> implements this behavior. Would it help if we added a method on
> ConsumerRecords to get the next offset (e.g. nextOffset(partition))?
>
> Thanks,
> Jason
>
> On Fri, Jul 24, 2015 at 10:11 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > Say there is only one topic with single partition and a single message on
> > it.
> > Result of calling a poll with new consumer will return ConsumerRecord for
> > that message and it will have offset of 0.
> >
> > After processing message, current KafkaConsumer implementation expects
> one
> > to commit not offset 0 as processed, but to commit offset 1 - next
> > offset/position one would like to consume.
> >
> > Does this sound strange to you as well?
> >
> > Wondering couldn't this offset+1 handling for next position to read been
> > done in one place, in KafkaConsumer implementation or broker or whatever,
> > instead of every user of KafkaConsumer having to do it.
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: 0.8.2 Producer sends blocking when brokers are unavailable

2015-07-31 Thread Stevo Slavić
Hello David,

It's a known issue, see https://issues.apache.org/jira/browse/KAFKA-1788
and https://issues.apache.org/jira/browse/KAFKA-2120

Kind regards,
Stevo Slavic.

On Fri, Jul 31, 2015 at 10:15 AM, David KOCH  wrote:

> Hello,
>
> The new producer org.apache.kafka.clients.producer.KafkaProducer seems to
> block on send calls when the cluster is unavailable.
>
> How do I disable this and make send calls fast fail in the event of
> unavailable brokers? I know the 0.8.1 Scala-based producer did not block in
> that case.
>
> Regards,
>
> /David
>
>
>
> The polling errors I get periodically for each unavailable broker:
> <156>1 2015-07-31T07:12:22.026Z  - Audit [ezakus@18060 level="WARN"
> logger="org.apache.kafka.common.network.Selector"] Error in I/O with
> stag--002/xx.xxx.xx.xxx
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> at java.lang.Thread.run(Thread.java:745)
>


0.8.3 ETA?

2015-08-03 Thread Stevo Slavić
Hello Apache Kafka community,

If I recall well, two weeks ago it was mentioned in a discussion that Kafka
0.8.3 might be released in a month time.

Is this still Kafka dev team goal, in few weeks time to have Kafka 0.8.3
released? Or is more (re)work (e.g. more new consumer API changes) planned
for 0.8.3 than already in JIRA, which would further delay 0.8.3 release?

Btw, Kafka JIRA has quite a lot unresolved tickets targeting 0.8.3 as fix
version (see here

complete list).

Kind regards,
Stevo Slavic.


Re: 0.8.3 ETA?

2015-08-04 Thread Stevo Slavić
Thanks Jun for heads up!

On Mon, Aug 3, 2015 at 7:17 PM, Jun Rao  wrote:

> Hi, Stevo,
>
> Yes, we are still iterating on the new consumer a bit and are waiting for
> some of the security jiras to be committed. So now, we are shooting for
> releasing 0.8.3 in Oct (just updated
> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).
>
> As we are getting closer, we will clean up the 0.8.3 jiras and push
> non-critical ones to future releases.
>
> Thanks,
>
> Jun
>
> On Mon, Aug 3, 2015 at 5:52 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > If I recall well, two weeks ago it was mentioned in a discussion that
> Kafka
> > 0.8.3 might be released in a month time.
> >
> > Is this still Kafka dev team goal, in few weeks time to have Kafka 0.8.3
> > released? Or is more (re)work (e.g. more new consumer API changes)
> planned
> > for 0.8.3 than already in JIRA, which would further delay 0.8.3 release?
> >
> > Btw, Kafka JIRA has quite a lot unresolved tickets targeting 0.8.3 as fix
> > version (see here
> > <
> >
> https://issues.apache.org/jira/browse/KAFKA-1853?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.3%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
> > >
> > complete list).
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: [DISCUSSION] Kafka 0.8.2.2 release?

2015-08-16 Thread Stevo Slavić
+1 (non-binding) for 0.8.2.2 release

Would be nice to include in that release new producer resiliency bug fixes
https://issues.apache.org/jira/browse/KAFKA-1788 and
https://issues.apache.org/jira/browse/KAFKA-2120

On Fri, Aug 14, 2015 at 4:03 PM, Gwen Shapira  wrote:

> Will be nice to include Kafka-2308 and fix two critical snappy issues in
> the maintenance release.
>
> Gwen
> On Aug 14, 2015 6:16 AM, "Grant Henke"  wrote:
>
> > Just to clarify. Will KAFKA-2189 be the only patch in the release?
> >
> > On Fri, Aug 14, 2015 at 7:35 AM, Manikumar Reddy 
> > wrote:
> >
> > > +1  for 0.8.2.2 release
> > >
> > > On Fri, Aug 14, 2015 at 5:49 PM, Ismael Juma 
> wrote:
> > >
> > > > I think this is a good idea as the change is minimal on our side and
> it
> > > has
> > > > been tested in production for some time by the reporter.
> > > >
> > > > Best,
> > > > Ismael
> > > >
> > > > On Fri, Aug 14, 2015 at 1:15 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Everyone,
> > > > >
> > > > > Since the release of Kafka 0.8.2.1, a number of people have
> reported
> > an
> > > > > issue with snappy compression (
> > > > > https://issues.apache.org/jira/browse/KAFKA-2189). Basically, if
> > they
> > > > use
> > > > > snappy in 0.8.2.1, they will experience a 2-3X space increase. The
> > > issue
> > > > > has since been fixed in trunk (just a snappy jar upgrade). Since
> > 0.8.3
> > > is
> > > > > still a few months away, it may make sense to do an 0.8.2.2 release
> > > just
> > > > to
> > > > > fix this issue. Any objections?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >
>


Re: [DISCUSSION] Kafka 0.8.2.2 release?

2015-08-17 Thread Stevo Slavić
> >Record size funcition of record in mirror maker hit NPE when the
> message
> >value is null.
> >- KAFKA-2101 <https://issues.apache.org/jira/browse/KAFKA-2101>:
> >Metric metadata-age is reset on a failed update
> >- KAFKA-2112 <https://issues.apache.org/jira/browse/KAFKA-2112>: make
> >overflowWheel volatile
> >- KAFKA-2117 <https://issues.apache.org/jira/browse/KAFKA-2117>:
> >OffsetManager uses incorrect field for metadata
> >- KAFKA-2164 <https://issues.apache.org/jira/browse/KAFKA-2164>:
> >ReplicaFetcherThread: suspicious log message on reset offset
> >- KAFKA-1668 <https://issues.apache.org/jira/browse/KAFKA-1668>:
> >TopicCommand doesn't warn if --topic argument doesn't match any topics
> >- KAFKA-2198 <https://issues.apache.org/jira/browse/KAFKA-2198>:
> >kafka-topics.sh exits with 0 status on failures
> >- KAFKA-2235 <https://issues.apache.org/jira/browse/KAFKA-2235>:
> >LogCleaner offset map overflow
> >- KAFKA-2241 <https://issues.apache.org/jira/browse/KAFKA-2241>:
> >AbstractFetcherThread.shutdown() should not block on
> >ReadableByteChannel.read(buffer)
> >- KAFKA-2272 <https://issues.apache.org/jira/browse/KAFKA-2272>:
> >listeners endpoint parsing fails if the hostname has capital letter
> >- KAFKA-2345 <https://issues.apache.org/jira/browse/KAFKA-2345>:
> >Attempt to delete a topic already marked for deletion throws
> >ZkNodeExistsException
> >- KAFKA-2353 <https://issues.apache.org/jira/browse/KAFKA-2353>:
> >SocketServer.Processor should catch exception and close the socket
> properly
> >in configureNewConnections.
> >- KAFKA-1836 <https://issues.apache.org/jira/browse/KAFKA-1836>:
> >metadata.fetch.timeout.ms set to zero blocks forever
> >- KAFKA-2317 <https://issues.apache.org/jira/browse/KAFKA-2317>:
> De-register
> >isrChangeNotificationListener on controller resignation
> >
> > Note: KAFKA-2120 <https://issues.apache.org/jira/browse/KAFKA-2120> &
> > KAFKA-2421 <https://issues.apache.org/jira/browse/KAFKA-2421> were
> > mentioned in previous emails, but are not in the list because they are
> not
> > committed yet.
> >
> > Hope that helps the effort.
> >
> > Thanks,
> > Grant
> >
> > On Mon, Aug 17, 2015 at 12:09 AM, Grant Henke 
> wrote:
> >
> >> +1 to that suggestion. Though I suspect that requires a committer to do.
> >> Making it part of the standard commit process could work too.
> >> On Aug 16, 2015 11:01 PM, "Gwen Shapira"  wrote:
> >>
> >>> BTW. I think it will be great for Apache Kafka to have a 0.8.2 "release
> >>> manager" who's role is to cherrypick low-risk bug-fixes into the 0.8.2
> >>> branch and once enough bug fixes happened (or if sufficiently critical
> >>> fixes happened) to roll out a new maintenance release (with every 3
> month
> >>> as a reasonable bugfix release target).
> >>>
> >>> This will add some predictability regarding how fast we release fixes
> for
> >>> bugs.
> >>>
> >>> Gwen
> >>>
> >>> On Sun, Aug 16, 2015 at 8:09 PM, Jeff Holoman 
> >>> wrote:
> >>>
> >>> > +1 for the release and also including
> >>> >
> >>> > https://issues.apache.org/jira/browse/KAFKA-2114
> >>> >
> >>> > Thanks
> >>> >
> >>> > Jeff
> >>> >
> >>> > On Sun, Aug 16, 2015 at 2:51 PM, Stevo Slavić 
> >>> wrote:
> >>> >
> >>> > > +1 (non-binding) for 0.8.2.2 release
> >>> > >
> >>> > > Would be nice to include in that release new producer resiliency
> bug
> >>> > fixes
> >>> > > https://issues.apache.org/jira/browse/KAFKA-1788 and
> >>> > > https://issues.apache.org/jira/browse/KAFKA-2120
> >>> > >
> >>> > > On Fri, Aug 14, 2015 at 4:03 PM, Gwen Shapira 
> >>> wrote:
> >>> > >
> >>> > > > Will be nice to include Kafka-2308 and fix two critical snappy
> >>> issues
> >>> > in
> >>> > > > the maintenance release.
> >>> > > >
> >>> > > > Gwen
> >>> > > > On Aug 14, 2015 6:16 AM, "Grant Henke" 
>

Unclean leader election docs outdated

2015-09-11 Thread Stevo Slavić
Hello Apache Kafka community,

Current unclean leader election docs state:
"In the future, we would like to make this configurable to better support
use cases where downtime is preferable to inconsistency. "

If I'm not mistaken, since 0.8.2, unclean leader election strategy (whether
to allow it or not) is already configurable via
unclean.leader.election.enable broker config property.

Kind regards,
Stevo Slavic.


Re: Unclean leader election docs outdated

2015-09-11 Thread Stevo Slavić
That sentence is in both
https://svn.apache.org/repos/asf/kafka/site/083/design.html and
https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end of
"Unclean leader election: What if they all die?" section. Next one,
"Availability and Durability Guarantees", mentions ability to disable
unclean leader election, so likely just this one reference needs to be
updated.

On Sat, Sep 12, 2015 at 1:05 AM, Guozhang Wang  wrote:

> Hi Stevo,
>
> Could you point me to the link of the docs?
>
> Guozhang
>
> On Fri, Sep 11, 2015 at 5:47 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > Current unclean leader election docs state:
> > "In the future, we would like to make this configurable to better support
> > use cases where downtime is preferable to inconsistency. "
> >
> > If I'm not mistaken, since 0.8.2, unclean leader election strategy
> (whether
> > to allow it or not) is already configurable via
> > unclean.leader.election.enable broker config property.
> >
> > Kind regards,
> > Stevo Slavic.
> >
>
>
>
> --
> -- Guozhang
>


Re: open source projects based on kafka

2015-09-13 Thread Stevo Slavić
Have a look at https://github.com/allegro/hermes

On Mon, Sep 14, 2015, 01:28 David Luu  wrote:

> The toy project idea is good. Another option I think could be to look at
> the various Kafka client langague bindings and/or utilities (like
> kafkacat). And from there, another option is to build a client language
> binding for a language that's kind of lacking Kafka support, some have
> better support than others.
>
> On Sun, Sep 13, 2015 at 11:46 AM, Hisham Mardam-Bey 
> wrote:
>
> > Hi Li,
> >
> > You can take a look at mypipe.
> >
> > "MySQL binary log consumer with the ability to act on changed rows and
> > publish changes to different systems with emphasis on Apache Kafka."
> >
> > https://github.com/mardambey/mypipe
> >
> > It uses Kafka as well as some related concepts and technologies like
> Avro,
> > a schema repository for data, and Akka.
> >
> > Hope this helps (= Happy hacking!
> >
> > hmb.
> >
> >
> > On Sun, Sep 13, 2015 at 4:45 AM, Manasvi Gupta 
> > wrote:
> >
> > > Here's a toy project - analyzing twitter stream.
> > >
> > > 1) Create dev. account on twitter
> > > 2) Using your dev credentials, connect to twitter stream api to
> retrieve
> > > stream of tweets
> > > 3) Store tweets in Kafka (using Kafka producer)
> > > 4) Retrieve tweets (using Kafka consumer)
> > > 5) For each tweet (or group of tweets), compute some analysis either
> > using
> > > custom java OR use storm/samza/spark.  e.g. country of origin of tweet,
> > > sentiment analysis etc.
> > >
> > > Its very simple to do this and should not take you more than 1-2 days
> to
> > > implement.
> > >
> > > Thanks
> > > Manasvi
> > >
> > >
> > > On Sun, Sep 13, 2015 at 1:11 PM, Li Tao 
> wrote:
> > >
> > > > Hi Roger,
> > > >
> > > > Thanks for your recommendation. I just got to know Samza. and checked
> > its
> > > > code base. It is a little too huge for me.
> > > >
> > > > Maybe for now, I need to start a small project/application which
> > utilize
> > > > kafka as its infrastructure, so that I can use Kafka's API a lot and
> > know
> > > > Kafka better.
> > > >
> > > > It's hard for me to initiate such project(small, useful/meaningful,
> > kafka
> > > > based). Anyone has better idea?
> > > >
> > > > On Sun, Sep 13, 2015 at 2:21 PM, Roger Hoover <
> roger.hoo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Li,
> > > > >
> > > > > You might take a look at Apache Samza.  It's conceptually simple
> but
> > > > > powerful and makes heavy use of Kafka.
> > > > >
> > > > > Best,
> > > > >
> > > > > Roger
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Sep 12, 2015, at 10:34 PM, Li Tao 
> > wrote:
> > > > > >
> > > > > > Hi Hackers,
> > > > > >
> > > > > > This is Lee, a learner of kafka, i have read the original paper
> on
> > > > kafka,
> > > > > > and walked through the document.
> > > > > >
> > > > > > I think the best way to learn sth is to write and read code about
> > > it. I
> > > > > am
> > > > > > wondering is there any open source code / system which is based
> on
> > > > kafka
> > > > > so
> > > > > > that i can read or contribute to? Not too complex, not too
> simple.
> > > > > >
> > > > > > Thanks a lot!
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Hisham Mardam-Bey
> > -=[ CTO ]-=-[ Mate1 Inc. ]=-
> >
> > A: Because it messes up the order in which people normally read text.
> > Q: Why is top-posting such a bad thing?
> > A: Top-posting.
> > Q: What is the most annoying thing in e-mail?
> >
> > -=[ Codito Ergo Sum ]=-
> >
>


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Stevo Slavić
Hello Jason,

Maybe this answers your question:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAFc58G-UScVKrSF1kdsowQ8Y96OAaZEdiZsk40G8fwf7iToFaw%40mail.gmail.com%3E

Kind regards,
Stevo Slavic.


On Mon, Sep 14, 2015 at 8:56 AM, Jason Rosenberg  wrote:

> Hi Jun,
>
> Can you clarify, will there not be a 0.8.3.0 (and instead we move straight
> to 0.9.0.0)?
>
> Also, can you outline the man new features/updates for 0.9.0.0?
>
> Thanks,
>
> Jason
>
> On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao  wrote:
>
> > The following is a candidate list of jiras that we want to complete in
> the
> > upcoming release (0.9.0.0). Our goal is to finish at least all the
> blockers
> > and as many as the non-blockers possible in that list.
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
> >
> > Anything should be added/removed from this list?
> >
> > We are shooting to cut an 0.9.0.0 release branch in early October.
> >
> > Thanks,
> >
> > Jun
> >
>


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Stevo Slavić
Jun,

Would be nice to have https://issues.apache.org/jira/browse/KAFKA-2106 (if
not that, than related https://issues.apache.org/jira/browse/KAFKA-1792 )
in 0.9 release. Both have patch provided. If KAFKA-2106 is delivered, maybe
KAFKA-1792 is redundant, not needed.

For some reason KAFKA-2106 has "Affects Version/s" set to 0.9, maybe "Fix
Version/s" should be set to that value instead.

Kind regards,
Stevo Slavic.



On Mon, Sep 14, 2015 at 9:43 AM, Stevo Slavić  wrote:

> Hello Jason,
>
> Maybe this answers your question:
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAFc58G-UScVKrSF1kdsowQ8Y96OAaZEdiZsk40G8fwf7iToFaw%40mail.gmail.com%3E
>
> Kind regards,
> Stevo Slavic.
>
>
> On Mon, Sep 14, 2015 at 8:56 AM, Jason Rosenberg  wrote:
>
>> Hi Jun,
>>
>> Can you clarify, will there not be a 0.8.3.0 (and instead we move straight
>> to 0.9.0.0)?
>>
>> Also, can you outline the man new features/updates for 0.9.0.0?
>>
>> Thanks,
>>
>> Jason
>>
>> On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao  wrote:
>>
>> > The following is a candidate list of jiras that we want to complete in
>> the
>> > upcoming release (0.9.0.0). Our goal is to finish at least all the
>> blockers
>> > and as many as the non-blockers possible in that list.
>> >
>> >
>> >
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>> >
>> > Anything should be added/removed from this list?
>> >
>> > We are shooting to cut an 0.9.0.0 release branch in early October.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>>
>
>


Re: Unclean leader election docs outdated

2015-09-15 Thread Stevo Slavić
Created https://issues.apache.org/jira/browse/KAFKA-2551

On Mon, Sep 14, 2015 at 7:22 PM, Guozhang Wang  wrote:

> Yes you are right. Could you file a JIRA to edit the documents?
>
> Guozhang
>
> On Fri, Sep 11, 2015 at 4:41 PM, Stevo Slavić  wrote:
>
> > That sentence is in both
> > https://svn.apache.org/repos/asf/kafka/site/083/design.html and
> > https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end
> > of
> > "Unclean leader election: What if they all die?" section. Next one,
> > "Availability and Durability Guarantees", mentions ability to disable
> > unclean leader election, so likely just this one reference needs to be
> > updated.
> >
> > On Sat, Sep 12, 2015 at 1:05 AM, Guozhang Wang 
> wrote:
> >
> > > Hi Stevo,
> > >
> > > Could you point me to the link of the docs?
> > >
> > > Guozhang
> > >
> > > On Fri, Sep 11, 2015 at 5:47 AM, Stevo Slavić 
> wrote:
> > >
> > > > Hello Apache Kafka community,
> > > >
> > > > Current unclean leader election docs state:
> > > > "In the future, we would like to make this configurable to better
> > support
> > > > use cases where downtime is preferable to inconsistency. "
> > > >
> > > > If I'm not mistaken, since 0.8.2, unclean leader election strategy
> > > (whether
> > > > to allow it or not) is already configurable via
> > > > unclean.leader.election.enable broker config property.
> > > >
> > > > Kind regards,
> > > > Stevo Slavic.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: New Consumer & committed offsets

2015-09-15 Thread Stevo Slavić
Hello Damian,

Yes, there's a +1 difference. See related discussion
http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3CCAOeJiJh2SMzVn23JsoWiNk3sfsw82Jr_-kRLcNRd-oZ7pR1yWg%40mail.gmail.com%3E

Kind regards,
Stevo Slavic.

On Tue, Sep 15, 2015 at 3:56 PM, Damian Guy  wrote:

> I turned off compression and still get duplicates, but only 1 from each
> topic.
> Should the initial fetch offset for a partition be committed offset +1 ?
>
> Thanks,
> Damian
>
> On 15 September 2015 at 14:07, Damian Guy  wrote:
>
> > Hi,
> >
> > I've been trying out the new consumer and have noticed that i get
> > duplicate messages when i stop the consumer and then restart (different
> > processes, same consumer group).
> >
> > I consume all of the messages on the topic and commit the offsets for
> each
> > partition and stop the consumer. On the next run i expect to get 0
> > messages, however i get a batch of records from each partition - in this
> > case works out 1020 messages. Run it again and i get the same batch of
> > records.
> >
> > My logging shows that i've received messages with offsets lower than were
> > previously committed.
> >
> > committed:{damian_test_one-2=137669}
> > committed:{damian_test_one-0=139161}
> > committed:{damian_test_one-1=137663}
> >
> > min offsets received: {damian_test_one-0=138824,
> damian_test_one-1=137321,
> > damian_test_one-2=137331}
> >
> > I've debugged the initial fetch requests for offsets and the offsets
> match
> > up with what has been committed. Is this expected behaviour? Something to
> > do with batching of compression of message sets?
> >
> > TIA,
> > Damian
> >
>


How to verify offsets topic exists?

2015-10-05 Thread Stevo Slavić
Hello Apache Kafka community,

In my integration tests, with single 0.8.2.2 broker, for newly created
topic with single partition, after determining through topic metadata
request that partition has lead broker assigned, when I try to reset offset
for given consumer group, I first try to discover offset coordinator and
that lookup is throwing ConsumerCoordinatorNotAvailableException

On
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
it is documented that broker returns ConsumerCoordinatorNotAvailableCode
for consumer metadata requests or offset commit requests if the offsets
topic has not yet been created.

I wonder if this is really the case, that the offsets topic has not been
created. Any tips how to ensure/verify that offsets topic exists?

Kind regards,

Stevo Slavic.


Re: How to verify offsets topic exists?

2015-10-06 Thread Stevo Slavić
Thanks Grant for quick reply!

I've used AdminUtils.topicExists("__consumer_offsets") check and even 10sec
after Kafka broker startup, the check fails.

When, on which event, does this internal topic get created? Is there some
broker config property preventing it from being created? Does one have to
use high level consumer or make some special request (JoingGroup?) using
simple consumer API to trigger consumer offsets topic init on broker?

I'm using simple consumer API - I assume exclude.internal.topics,
offsets.storage or dual.commit.enabled however configured shouldn't affect
me, since I'm passing OffsetCommitRequest with version id 1, and even more
I do not even reach point where commit is done, since lookup of consumer
coordinator is throwing ConsumerCoordinatorNotAvailableException.

Kind regards,
Stevo Slavic.

On Mon, Oct 5, 2015 at 5:59 PM, Grant Henke  wrote:

> Hi Stevo,
>
> There are a couple of options to verify the topic exists:
>
>1. Consume from a topic with "offsets.storage=kafka". If its not created
>already, this should create it.
>2. List and describe the topic using the Kafka topics script. Ex:
>
> bin/kafka-topics.sh --zookeeper localhost:2181 --list
>
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic
> __consumer_offsets
>
>
>1. Check the ZNode exists in Zookeeper. Ex:
>
> bin/zookeeper-shell.sh localhost:2181
> ls /brokers/topics/__consumer_offsets
>
> get /brokers/topics/__consumer_offsets
>
>
> Thanks,
> Grant
>
> On Mon, Oct 5, 2015 at 10:44 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > In my integration tests, with single 0.8.2.2 broker, for newly created
> > topic with single partition, after determining through topic metadata
> > request that partition has lead broker assigned, when I try to reset
> offset
> > for given consumer group, I first try to discover offset coordinator and
> > that lookup is throwing ConsumerCoordinatorNotAvailableException
> >
> > On
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> > it is documented that broker returns ConsumerCoordinatorNotAvailableCode
> > for consumer metadata requests or offset commit requests if the offsets
> > topic has not yet been created.
> >
> > I wonder if this is really the case, that the offsets topic has not been
> > created. Any tips how to ensure/verify that offsets topic exists?
> >
> > Kind regards,
> >
> > Stevo Slavic.
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>


Re: How to verify offsets topic exists?

2015-10-06 Thread Stevo Slavić
Debugged, and found in KafkaApis.handleConsumerMetadataRequest that
consumer offsets topic gets created on first lookup of offsets topic
metadata, even when auto topic creation is disabled.

In that method there is following call:

// get metadata (and create the topic if necessary)
val offsetsTopicMetadata =
getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head

and it returns on first call offsetsTopicMetadata with empty
partitionsMetadata sequence and errorCode 5.

By the docs this means:
LeaderNotAvailable => This error is thrown if we are in the middle of a
leadership election and there is currently no leader for this partition and
hence it is unavailable for writes.

Since leadership election takes some time, especially for default 50
partitions of consumer offsets topic, first call to lookup consumer
metadata request will always return an error. This sounds like a bug,
especially if it's not documented as to be expected behavior. I'd prefer if
Kafka broker/controller did this init of consumer offsets topic on startup.

As workaround, as init step of my integration test, I will lookup offsets
topic metadata, until successful or timeout.

Kind regards,
Stevo Slavic.

On Tue, Oct 6, 2015 at 10:02 AM, Stevo Slavić  wrote:

> Thanks Grant for quick reply!
>
> I've used AdminUtils.topicExists("__consumer_offsets") check and even
> 10sec after Kafka broker startup, the check fails.
>
> When, on which event, does this internal topic get created? Is there some
> broker config property preventing it from being created? Does one have to
> use high level consumer or make some special request (JoingGroup?) using
> simple consumer API to trigger consumer offsets topic init on broker?
>
> I'm using simple consumer API - I assume exclude.internal.topics,
> offsets.storage or dual.commit.enabled however configured shouldn't affect
> me, since I'm passing OffsetCommitRequest with version id 1, and even more
> I do not even reach point where commit is done, since lookup of consumer
> coordinator is throwing ConsumerCoordinatorNotAvailableException.
>
> Kind regards,
> Stevo Slavic.
>
> On Mon, Oct 5, 2015 at 5:59 PM, Grant Henke  wrote:
>
>> Hi Stevo,
>>
>> There are a couple of options to verify the topic exists:
>>
>>1. Consume from a topic with "offsets.storage=kafka". If its not
>> created
>>already, this should create it.
>>2. List and describe the topic using the Kafka topics script. Ex:
>>
>> bin/kafka-topics.sh --zookeeper localhost:2181 --list
>>
>> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic
>> __consumer_offsets
>>
>>
>>1. Check the ZNode exists in Zookeeper. Ex:
>>
>> bin/zookeeper-shell.sh localhost:2181
>> ls /brokers/topics/__consumer_offsets
>>
>> get /brokers/topics/__consumer_offsets
>>
>>
>> Thanks,
>> Grant
>>
>> On Mon, Oct 5, 2015 at 10:44 AM, Stevo Slavić  wrote:
>>
>> > Hello Apache Kafka community,
>> >
>> > In my integration tests, with single 0.8.2.2 broker, for newly created
>> > topic with single partition, after determining through topic metadata
>> > request that partition has lead broker assigned, when I try to reset
>> offset
>> > for given consumer group, I first try to discover offset coordinator and
>> > that lookup is throwing ConsumerCoordinatorNotAvailableException
>> >
>> > On
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
>> > it is documented that broker returns ConsumerCoordinatorNotAvailableCode
>> > for consumer metadata requests or offset commit requests if the offsets
>> > topic has not yet been created.
>> >
>> > I wonder if this is really the case, that the offsets topic has not been
>> > created. Any tips how to ensure/verify that offsets topic exists?
>> >
>> > Kind regards,
>> >
>> > Stevo Slavic.
>> >
>>
>>
>>
>> --
>> Grant Henke
>> Software Engineer | Cloudera
>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>>
>
>


Re: How to verify offsets topic exists?

2015-10-08 Thread Stevo Slavić
There's another related bug - triggering offsets topic creation through
requesting metadata about that topic does not work in case of single broker
clean (no topics created yet) Kafka cluster running. In that case sequence
returned by KafkaApis.getAliveBrokers is empty, and
KafkaApis.getTopicMetadata will issue a AdminUtils.createTopic request with
offsetsTopicReplicationFactor of 3, which will fail with

ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest;
Version: 0; CorrelationId: 0; ClientId: ; Topics: __consumer_offsets
(kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than
available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:513)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at
kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:745)

So this code in KafkaApis.getTopicMetadata

val offsetsTopicReplicationFactor =
  if (aliveBrokers.length > 0)
Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
  else
config.offsetsTopicReplicationFactor

should probably look like

val offsetsTopicReplicationFactor =
  if (aliveBrokers.length > 0)
Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
  else
Math.min(config.offsetsTopicReplicationFactor,
ZkUtils.getChildrenParentMayNotExist(zkClient,
ZkUtils.BrokerIdsPath).length)

Rationale: since aliveBrokers is actually information from (metadata) cache
- if broker has in cache info that there are 0 brokers alive, that surely
means that cache hasn't been set/updated ever, and in that case do not
assume that there are enough (3) brokers up, but instead lookup in ZK how
many brokers are alive, and based on that determine replication factor for
offsets topic.

Does this make sense?

As workaround, I guess I will have to resort to explicitly creating offsets
topic if it doesn't exist already.

Kind regards,
Stevo Slavic.

On Tue, Oct 6, 2015 at 11:34 AM, Stevo Slavić  wrote:

> Debugged, and found in KafkaApis.handleConsumerMetadataRequest that
> consumer offsets topic gets created on first lookup of offsets topic
> metadata, even when auto topic creation is disabled.
>
> In that method there is following call:
>
> // get metadata (and create the topic if necessary)
> val offsetsTopicMetadata =
> getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
>
> and it returns on first call offsetsTopicMetadata with empty
> partitionsMetadata sequence and errorCode 5.
>
> By the docs this means:
> LeaderNotAvailable => This error is thrown if we are in the middle of a
> leadership election and there is currently no leader for this partition and
> hence it is unavailable for writes.
>
> Since leadership election takes some time, especially for default 50
> partitions of consumer offsets topic, first call to lookup consumer
> metadata request will always return an error. This sounds like a bug,
> especially if it's not documented as to be expected behavior. I'd prefer if
> Kafka broker/controller did this init of consumer offsets topic on startup.
>
> As workaround, as init step of my integration test, I will lookup offsets
> topic metadata, until successful or timeout.
>
> Kind regards,
> Stevo Slavic.
>
> On Tue, Oct 6, 2015 at 10:02 AM, Stevo Slavić  wrote:
>
>> Thanks Grant for quick reply!
>>
>> I've used AdminUtils.topicExists("__consumer_offsets") check and even
>> 10sec after Kafka broker startup, the check fails.
>>
>> When, on which event, does this internal topic get created? Is there some
>> broker config property preventing it from being created? Does one have to
>> use high level consumer or make some special request (JoingGroup?) using
>> simple consumer API to trigger consumer offsets topic init on broker?
>>
>> I'm using simple consumer API - I assume exclude.internal.topics,
>

Re: How to verify offsets topic exists?

2015-10-09 Thread Stevo Slavić
If I'm not mistaken, replication factor of a topic does not get stored in
ZK - on creation replication factor gets translated to topic replica
assignment which gets stored in ZK.

Again, please correct me if wrong - it seems that only during topic
creation it is verified that all replicas in replica assignment are
actually live brokers. Once topic is created, topic can become under
replicated - brokers from topic replica assignment can go down, and even in
that case topic may still be usable (online) depending on other settings.

Would it make sense, as exception for consumer offsets topic only, or as
rule for all topics - to allow topic to be created with replica assignment
which is referencing brokers which do not exist (just rely on monotonically
increasing broker ids convention) or are just not alive? None of the
brokers referenced in replica assignment would have to be alive or maybe
require just one referenced broker to be alive, or maybe make it
configuration option - switch whether to enforce that brokers referenced in
replica assignment are alive, or min alive brokers in replica assignment
setting, or both.

Kind regards,
Stevo Slavic.

On Thu, Oct 8, 2015 at 3:32 PM, Stevo Slavić  wrote:

> There's another related bug - triggering offsets topic creation through
> requesting metadata about that topic does not work in case of single broker
> clean (no topics created yet) Kafka cluster running. In that case sequence
> returned by KafkaApis.getAliveBrokers is empty, and
> KafkaApis.getTopicMetadata will issue a AdminUtils.createTopic request
> with offsetsTopicReplicationFactor of 3, which will fail with
>
> ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest;
> Version: 0; CorrelationId: 0; ClientId: ; Topics: __consumer_offsets
> (kafka.server.KafkaApis)
> kafka.admin.AdminOperationException: replication factor: 3 larger than
> available brokers: 1
> at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
> at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
> at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:513)
> at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> at scala.collection.SetLike$class.map(SetLike.scala:92)
> at scala.collection.AbstractSet.map(Set.scala:47)
> at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
> at
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:745)
>
> So this code in KafkaApis.getTopicMetadata
>
> val offsetsTopicReplicationFactor =
>   if (aliveBrokers.length > 0)
> Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
>   else
> config.offsetsTopicReplicationFactor
>
> should probably look like
>
> val offsetsTopicReplicationFactor =
>   if (aliveBrokers.length > 0)
> Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
>   else
> Math.min(config.offsetsTopicReplicationFactor,
> ZkUtils.getChildrenParentMayNotExist(zkClient,
> ZkUtils.BrokerIdsPath).length)
>
> Rationale: since aliveBrokers is actually information from (metadata)
> cache - if broker has in cache info that there are 0 brokers alive, that
> surely means that cache hasn't been set/updated ever, and in that case do
> not assume that there are enough (3) brokers up, but instead lookup in ZK
> how many brokers are alive, and based on that determine replication factor
> for offsets topic.
>
> Does this make sense?
>
> As workaround, I guess I will have to resort to explicitly creating
> offsets topic if it doesn't exist already.
>
> Kind regards,
> Stevo Slavic.
>
> On Tue, Oct 6, 2015 at 11:34 AM, Stevo Slavić  wrote:
>
>> Debugged, and found in KafkaApis.handleConsumerMetadataRequest that
>> consumer offsets topic gets created on first lookup of offsets topic
>> metadata, even when auto topic creation is disabled.
>>
>> In that method there is following call:
>>
>> // get metadata (and create the topic if necessary)
>> val offsetsTopicMetadata =
>> getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
>>
>> and it returns on fi

Re: ("offsets.storage","kafka") not working with Consumer API

2015-10-15 Thread Stevo Slavić
Hello Kiran,

Check how many brokers you have in the cluster. Consumer offsets topic
requires by default at least 3. In dev environment you could lower
replication factor for that topic (see broker config options).

Kind regards,
Stevo Slavic.

On Fri, Oct 16, 2015, 07:31 Kiran Singh  wrote:

> Hello
>
> I am trying to use ConsumerConnector API. But if i used
> ("offsets.storage","kafka") property
> "createJavaConsumerConnector" function will hang. And I removed this, API
> work fine.
>
> I am using kafka_2.10-0.8.2.2.jar
>
> Is there any issue or I am missing any thing.
>
> Thanks
> Kiran Singh
>


New producer and storing offsets in Kafka - previously committed offsets fetched as uncommitted

2015-10-30 Thread Stevo Slavić
Hello Apache Kafka community,

I'm trying to use new producer, from kafka-clients 0.8.2.2, together with
simple consumer to fetch and commit offsets stored in Kafka, and I'm seeing
strange behavior - a committed offset/message gets read multiple times,
offset fetch requests do not always see committed offsets as committed,
likelihood of message being read and committed multiple times seems to
increase with higher load.

Before trying to use new producer, I was using old one, sync mode, and same
consumer code with fetch/commit offsets stored in Kafka - all was well,
offsets/messages once committed were never read again. To get equivalent
behavior, old sync producer guarnatees, but with benefits of batching
multiple produce request together I'm trying to use new producer batch
async support, while blocking produce until response is received or produce
fails, waiting for all ISRs.

Is this a (known) bug or is it by design?

Kind regards,
Stevo Slavic.


Re: Problems getting offsets

2015-11-01 Thread Stevo Slavić
Hello David,

In short, problem is not with your topic, it is with consumer offsets topic
initialization.

You could modify your code to just retry fetching offsets (until successful
where successful is also return of -1 offset, or timeout), or alternatively
you could trigger consumer offsets topic init (by fetching consumer offsets
topic metadata from broker) before issuing offset fetch request.

For the init option you have (at least) two alternatives: do it after or
before request to create your topic.

If you chose to do consumer offsets topic init before request to create
your (first) topic, make sure to configure broker with replication factor
of 1 for consumer offsets topic (offsets.topic.replication.factor config
property) otherwise consumer offsets topic init will fail. Do this config
change only for integration test with single broker, but not for production
cluster which should have 3 or more brokers anyway.

Kind regards,
Stevo Slavic.



On Sun, Nov 1, 2015 at 9:38 PM, David Corbin  wrote:

> Yes.  I know all of that.  I guess my original message was not clear.
>
> The topic metadata indicates that there is a leader.
>
> Please see my comments interspersed below.
> Thanks
> David Corbin
>
> On 10/29/15, 17:53, "Mayuresh Gharat"  wrote:
>
> > NotCoordinatorForConsumerException is for consumer side.
>
> I am attempting to consume messages, so this means I¹m getting an
> exception that might be reasonable for what I¹m doing.
>
> >I think if you
> >are using simpleConsumer, then you have to do your own offset management.
>
> I am attempting to manage my offsets myself; that¹s why I called
> SimpleConsumer#fetchOffsets() which is throwing the exception.  I assume
> you are not suggesting that my offset management should be done entirely
> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods on
> SimpleConsumer refer to offsets?
>
>
> >
> >TopicMetadata tells you whether there is a leader for the topic
> >partitions,
> >replicas, ISR.
>
>
> Yes.  After I create the topic, I ³poll² the metadata until the one
> topic/partition has a leader, before moving on.  If the metadata says
> there is a leader, I don¹t understand whyI would get
> NotCoordinatorForConsumerException
>
> >
> >Thanks,
> >
> >Mayuresh
> >
> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin 
> wrote:
> >
> >> I'm working on a project that I hope to use the SimpleConsumer on.  I'm
> >> trying to write some test code around our code that is wrapping the
> >> SimpleConsumer.
> >>
> >> The environment is 1 kafka broker, and 1 zookeeper
> >> The test goes like this:
> >>
> >> Create a topic ("foo", 1 partition, 1 replica)
> >> Create a Producer, and send a message
> >> Create a SimpleConsumer, and try to read the offset
> >>
> >> Failure with: NotCoordinatorForConsumerException
> >>
> >> If I continue to require for an extended period, I continue to get that
> >> exception.
> >>
> >> As near as I can tell, the topic metadata says there is a leader, but
> >>the
> >> the broker thinks it's being rebalanced.
> >> I've done the above test immediately after stop, clean out old data, and
> >> restart both zookeeper and kafka.
> >>
> >> Suggestions welcome.
> >>
> >>
> >
> >
> >--
> >-Regards,
> >Mayuresh R. Gharat
> >(862) 250-7125
>
>


Re: Problems getting offsets

2015-11-02 Thread Stevo Slavić
 (consumer offsets) topic
creation not fail if there are not enough of live brokers to meet
configured replication factor, but even better Kafka cluster controller
broker could do this initialization of consumer offsets topic if it doesn't
already exist.

Btw, looking up topic metadata in zookeeper and checking that all of its
partitions have a leader assigned is not enough for topic partitions to be
writable/readable/online - if broker is still not aware that it is the
leader for a given partition (e.g. broker metadata cache is outdated), that
partition would still not be writable/readable. One could lookup topic
metadata from Kafka brokers, and more specifically from lead brokers of
partitions to check if they are ware that they are lead brokers for that
partition. Not even that is guarantee that publishing will be successful,
since leadership can change at any moment, and other things can fail (e.g.
unreliable network), so one has to handle errors. But, as you see problem
is not with your topic, but with internal consumer offsets topic.


On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić  wrote:

> Hello David,
>
> In short, problem is not with your topic, it is with consumer offsets
> topic initialization.
>
> You could modify your code to just retry fetching offsets (until
> successful where successful is also return of -1 offset, or timeout), or
> alternatively you could trigger consumer offsets topic init (by fetching
> consumer offsets topic metadata from broker) before issuing offset fetch
> request.
>
> For the init option you have (at least) two alternatives: do it after or
> before request to create your topic.
>
> If you chose to do consumer offsets topic init before request to create
> your (first) topic, make sure to configure broker with replication factor
> of 1 for consumer offsets topic (offsets.topic.replication.factor config
> property) otherwise consumer offsets topic init will fail. Do this config
> change only for integration test with single broker, but not for production
> cluster which should have 3 or more brokers anyway.
>
> Kind regards,
> Stevo Slavic.
>
>
>
> On Sun, Nov 1, 2015 at 9:38 PM, David Corbin  wrote:
>
>> Yes.  I know all of that.  I guess my original message was not clear.
>>
>> The topic metadata indicates that there is a leader.
>>
>> Please see my comments interspersed below.
>> Thanks
>> David Corbin
>>
>> On 10/29/15, 17:53, "Mayuresh Gharat"  wrote:
>>
>> > NotCoordinatorForConsumerException is for consumer side.
>>
>> I am attempting to consume messages, so this means I¹m getting an
>> exception that might be reasonable for what I¹m doing.
>>
>> >I think if you
>> >are using simpleConsumer, then you have to do your own offset management.
>>
>> I am attempting to manage my offsets myself; that¹s why I called
>> SimpleConsumer#fetchOffsets() which is throwing the exception.  I assume
>> you are not suggesting that my offset management should be done entirely
>> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods on
>> SimpleConsumer refer to offsets?
>>
>>
>> >
>> >TopicMetadata tells you whether there is a leader for the topic
>> >partitions,
>> >replicas, ISR.
>>
>>
>> Yes.  After I create the topic, I ³poll² the metadata until the one
>> topic/partition has a leader, before moving on.  If the metadata says
>> there is a leader, I don¹t understand whyI would get
>> NotCoordinatorForConsumerException
>>
>> >
>> >Thanks,
>> >
>> >Mayuresh
>> >
>> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin 
>> wrote:
>> >
>> >> I'm working on a project that I hope to use the SimpleConsumer on.  I'm
>> >> trying to write some test code around our code that is wrapping the
>> >> SimpleConsumer.
>> >>
>> >> The environment is 1 kafka broker, and 1 zookeeper
>> >> The test goes like this:
>> >>
>> >> Create a topic ("foo", 1 partition, 1 replica)
>> >> Create a Producer, and send a message
>> >> Create a SimpleConsumer, and try to read the offset
>> >>
>> >> Failure with: NotCoordinatorForConsumerException
>> >>
>> >> If I continue to require for an extended period, I continue to get that
>> >> exception.
>> >>
>> >> As near as I can tell, the topic metadata says there is a leader, but
>> >>the
>> >> the broker thinks it's being rebalanced.
>> >> I've done the above test immediately after stop, clean out old data,
>> and
>> >> restart both zookeeper and kafka.
>> >>
>> >> Suggestions welcome.
>> >>
>> >>
>> >
>> >
>> >--
>> >-Regards,
>> >Mayuresh R. Gharat
>> >(862) 250-7125
>>
>>
>


Scheduled job in Kafka, every 2h?

2015-11-06 Thread Stevo Slavić
Hello Apache Kafka community,

I'm seeing some cyclic load on cluster, trying to determine cause.

Couldn't determine from docs - is there some internal scheduled job in
Kafka (0.8.2.1) executing on every broker every two hours?

Kind regards,
Stevo Slavic.


Blocked consumer - next offset same as current offset

2015-11-11 Thread Stevo Slavić
Hello Apache Kafka community,


I'm using simple consumer with Kafka 0.8.2.2 and noticed that under some
conditions fetch response message set for a partition can contain at least
one (if not all) MessageAndOffset with nextOffset being equal to current
(committed) offset, offset used in fetch request. Not sure how it's related
but I could notice this behavior especially often when I was using new
async producer, and when fetch request was able to fetch several messages
all the way to the end of the partition.

Is this a feature or a bug?


Kind regards,

Stevo Slavic.


Re: Blocked consumer - next offset same as current offset

2015-11-11 Thread Stevo Slavić
Not sure if related, but this line in AbstractFetcherThread (Kafka 0.8.2
branch) looks suspicious
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L122

This logic seems to have been changed significantly via
https://github.com/apache/kafka/commit/79f7cca85e9ed6511ad50cb9412bfa5c8e5b9ddb#diff-2d03a5d4349a23e56c09b4e773055249
for https://issues.apache.org/jira/browse/KAFKA-1461
so in Kafka 0.9 branch it looks like this
https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L132

Have dirty workaround for the moment, not trusting nextOffset info from
MessageAndOffset, transforming response to a new one with custom logic
calculating nextOffset (= offset used in fetch request + index of message
in message set). Will recheck at latest once 0.9 is fully released.

Unwanted behavior seems would get triggered only when consumer lag is of
appropriate size - I guess async producer made this issue more likely,
because of batch sending multiple messages to same partition.

On Wed, Nov 11, 2015 at 9:40 AM, Stevo Slavić  wrote:

> Hello Apache Kafka community,
>
>
> I'm using simple consumer with Kafka 0.8.2.2 and noticed that under some
> conditions fetch response message set for a partition can contain at least
> one (if not all) MessageAndOffset with nextOffset being equal to current
> (committed) offset, offset used in fetch request. Not sure how it's related
> but I could notice this behavior especially often when I was using new
> async producer, and when fetch request was able to fetch several messages
> all the way to the end of the partition.
>
> Is this a feature or a bug?
>
>
> Kind regards,
>
> Stevo Slavic.
>


Re: Blocked consumer - next offset same as current offset

2015-11-11 Thread Stevo Slavić
Thanks Jun for heads up!

Looked it up in wiki page:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#id-0.8.0SimpleConsumerExample-ReadingtheData

"Also note that we are explicitly checking that the offset being read is
not less than the offset that we requested. This is needed since if Kafka
is compressing the messages, the fetch request will return an entire
compressed block even if the requested offset isn't the beginning of the
compressed block. Thus a message we saw previously may be returned again."

Thanks once more!

Kind regards,
Stevo Slavic.

On Wed, Nov 11, 2015 at 6:12 PM, Jun Rao  wrote:

> Are you using compressed messages? If so, when using SimpleConsumer, it's
> possible for you to see messages whose offset is smaller than the offset in
> the fetch request, if those messages are in the same compressed batch. It's
> the responsibility of the client to skip over those messages. Note that the
> high level consumer handles that logic already.
>
> Thanks,
>
> Jun
>
> On Wed, Nov 11, 2015 at 12:40 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> >
> > I'm using simple consumer with Kafka 0.8.2.2 and noticed that under some
> > conditions fetch response message set for a partition can contain at
> least
> > one (if not all) MessageAndOffset with nextOffset being equal to current
> > (committed) offset, offset used in fetch request. Not sure how it's
> related
> > but I could notice this behavior especially often when I was using new
> > async producer, and when fetch request was able to fetch several messages
> > all the way to the end of the partition.
> >
> > Is this a feature or a bug?
> >
> >
> > Kind regards,
> >
> > Stevo Slavic.
> >
>


Re: Problems getting offsets

2015-11-24 Thread Stevo Slavić
You can store offsets wherever you prefer, and it's separate from processes
you mentioned. Unfortunately custom offset storage support has to be
entirely on client side, one cannot extend (easily) Kafka broker with
support for different offset storage. This has as a consequence that
existing Kafka monitoring tools, open source or commercial, cannot be used
(easily, out of the box) to monitor consumer offsets if you use custom
offset storage.

So I'd recommend you to use (standard) Kafka offset storage, until you find
really good reason not to.

Also consider using new KafkaConsumer from just released Kafka
(kafka-clients) 0.9.0.0 instead of SimpleConsumer API.

On Tue, Nov 24, 2015 at 1:59 PM, David Corbin  wrote:

> Thank you for the detailed explanation.
>
> Is it essential that offsets be stored in Kafka, or could they be stored
> outside the kafka/zookeeper system?  Will it affect how logs are managed,
> and when “older” messages are purged? Or are they two independent systems?
>
> On 11/2/15, 03:51, "Stevo Slavić"  wrote:
>
> >Here is a bit longer and more detailed, not necessarily better
> >understandable explanation.
> >
> >When using Kafka for offsets storage, consumer offsets get stored in
> >(compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
> >partition of consumer offsets topic could store offsets from multiple
> >consumer groups, but offsets of a single consumer group will always be
> >stored in (get mapped to) same partition of consumer offsets topic (e.g.
> >consumer group x and y offsets could be both stored in partition 0, while
> >consumer group z offsets could be stored in partition 49 of consumer
> >offsets topic; even if consumer group x is consuming two different topics,
> >offsets would be stored in same partition of consumer offsets topic). Like
> >any other Kafka topic partition, one can read/write (consumer offsets)
> >topic partitions only from their lead brokers, and every partition has
> >only
> >one lead broker. Lead broker of a partition where offsets are stored for
> >given consumer group is called consumer coordinator broker for that
> >consumer group. To fetch/commit offsets for consumer group, you first need
> >to resolve consumer coordinator broker for that consumer group, and then
> >send fetch/commit offsets to that broker (btw, it's dynamic, can change
> >even after just being resolved which broker is coordinator broker for
> >given
> >consumer group, e.g. when initial lead broker is lost and a replica
> >becomes
> >a new lead). Until there is a leader assigned for a partition and broker
> >who is leader is not yet aware of that assignment, topic partition cannot
> >be read/written, it's offline - read/write requests made to that broker
> >will return error code in response. Consumer offsets topic is not created
> >automatically on broker/cluster startup - instead it gets created on first
> >(direct or indirect) request for consumer offsets topic metadata... That
> >lookup triggers creation of consumer offsets topic, parts of the topic
> >creation process happen async to request for topic creation, and it can
> >take time for topic creation to actually fully finish. Especially if you
> >leave consumer offsets topic default settings (replication factor of 3, 50
> >partitions) consumer offsets topic creation can take time - this is nice
> >default for production cluster, but not for one used in integration tests
> >with single broker.
> >
> >When you try to fetch (or commit) offset for the first time, broker
> >internally retrieves metadata about consumer offsets topic, which
> >initiates
> >consumer offsets topic creation when it doesn't exist, but that first
> >request to fetch offset (usually) fails, since chosen consumer coordinator
> >broker is not yet aware that it is coordinator for that group, and sends
> >back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException.
> >Docs
> >(see
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pro
> >tocol
> >) state that "The broker returns this error code if it receives an offset
> >fetch or commit request for a consumer group that it is not a coordinator
> >for."
> >
> >You could modify your code to retry fetching offsets, but not infinitely,
> >or you could trigger consumer offsets topic init before fetching offsets.
> >For the init option you have (at least) two alternatives.
> >
> >Modify your test, before reading/committing offsets but not before
> >creating
> >your topic to init (trigger creation of) consumer offsets t

Re: Failed attempt to delete topic

2015-12-03 Thread Stevo Slavić
Delete was actually considered to be working since Kafka 0.8.2 (although
there are still not easily reproducible edge cases when it doesn't work
well even in in 0.8.2 or newer).
In 0.8.1 one could request topic to be deleted (request gets stored as
entry in ZooKeeper), because of presence of the request for topic to be
deleted topic would become unusable (cannot publish or read), but broker
would actually never (work on the request to) delete topic.

Maybe it will be enough to delete from ZooKeeper entry for the topic
deletion request under /admin/delete_topics to have topic usable again.

Otherwise, just upgrade broker side to 0.8.2.x or latest 0.9.0.0 - new
broker should work with old clients so maybe you don't have to upgrade
client side immediately.

Kind regards,
Stevo Slavic.


On Fri, Dec 4, 2015 at 12:33 AM, Mayuresh Gharat  wrote:

> Can you paste some logs from the controller, when you deleted the topic?
>
> Thanks,
>
> Mayuresh
>
> On Thu, Dec 3, 2015 at 2:30 PM, Rakesh Vidyadharan <
> rvidyadha...@gracenote.com> wrote:
>
> > Hello,
> >
> > We are on an older kafka (0.8.1) version.  While a number of consumers
> > were running, we attempted to delete a few topics using the
> kafka-topics.sh
> > file (basically want to remove all messages in that topic and restart,
> > since our entities went through some incompatible changes).  We noticed
> > logs saying the topic has been queued for deletion.  After stopping all
> > processes accessing kafka, we restarted kafka and then our processes.
> The
> > old topics do not seem to have been deleted (I can still see the log
> > directories corresponding to the topics), and none of the clients are
> able
> > to either publish or read to the topics that we attempted to delete.
> > Attempting to read gives us the following type of error:
> >
> > Attempting to access an invalid KafkaTopic ( are you operating on a
> closed
> > KafkaTopic ?)
> >
> > Attempting to publish gives us a more general type of error:
> >
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > at kafka.producer.Producer.send(Producer.scala:76)
> >
> > How can be get around this issue and start using the topics that we tried
> > to clean up?  There may have been better ways to achieve what we wanted,
> if
> > so please suggest recommendations as well.
> >
> > Thanks
> > Rakesh
> >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


Re: Unable to set log.cleanup.policy on topic

2015-12-07 Thread Stevo Slavić
Hello Rakesh,

log.cleanup.policy is broker configuration property, while cleanup.policy
is topic configuration property (see
http://kafka.apache.org/documentation.html#topic-config ). Since you are
configuring particular topic, you need to use second one.

Kind regards,
Stevo Slavic.

On Mon, Dec 7, 2015 at 8:59 PM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Hello,
>
> I upgraded to the latest 0.8 release 0.8.2.2, and tried to set the log
> cleanup policy on some of our topics (I got same error with 0.8.1, but
> thought it may have been some issue with that release).
>
> /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic
> metamorphosis.team --config log.cleanup.policy=compact
>
> It gives me the following error (it looked identical on 0.8.1 as well)
>
> Error while executing topic command requirement failed: Unknown
> configuration "log.cleanup.policy".
> java.lang.IllegalArgumentException: requirement failed: Unknown
> configuration "log.cleanup.policy".
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:183)
> at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:182)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at kafka.log.LogConfig$.validateNames(LogConfig.scala:182)
> at kafka.log.LogConfig$.validate(LogConfig.scala:190)
> at
> kafka.admin.TopicCommand$.parseTopicConfigsToBeAdded(TopicCommand.scala:205)
> at
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:103)
> at
> kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:100)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:100)
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
> at kafka.admin.TopicCommand.main(TopicCommand.scala)
>
> The documentation indicates this can be modified after topic is created,
> not sure if that means we can modify the setting after messages have been
> posted to the topic, which is what we are trying to do.
>
> Thanks
> Rakesh
>


Re: How to reset a consumer-group's offset in kafka 0.9?

2015-12-29 Thread Stevo Slavić
Have you considered deleting and recreating topic used in test?
Once topic is clean, read/poll once - any committed offset should be
outside of the range, and consumer should reset offset.

On Tue, Dec 29, 2015 at 4:11 PM, Han JU  wrote:

> Hello,
>
> For local test purpose I need to frequently reset offset for a consumer
> group. In 0.8 I just delete the consumer group's zk node under
> `/consumers`. But with the redesign of the 0.9, how could I achieve the
> same thing?
>
> Thanks!
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 061960
>


Re: How to reset a consumer-group's offset in kafka 0.9?

2015-12-29 Thread Stevo Slavić
Then I guess @Before test, explicitly commit offset of 0.

There doesn't seem to be a tool for committing offset, only for
checking/fetching current offset (see
http://kafka.apache.org/documentation.html#operations )

On Tue, Dec 29, 2015 at 4:35 PM, Han JU  wrote:

> Hi Stevo,
>
> But by deleting and recreating the topic, do I remove also the messages
> ingested?
> My use case is that I ingest prepared messages once and run consumer tests
> multiple times, between each test run I reset the consumer group's offset
> so that each run starts from the beginning and consumers all the messages.
>
> 2015-12-29 16:19 GMT+01:00 Stevo Slavić :
>
> > Have you considered deleting and recreating topic used in test?
> > Once topic is clean, read/poll once - any committed offset should be
> > outside of the range, and consumer should reset offset.
> >
> > On Tue, Dec 29, 2015 at 4:11 PM, Han JU  wrote:
> >
> > > Hello,
> > >
> > > For local test purpose I need to frequently reset offset for a consumer
> > > group. In 0.8 I just delete the consumer group's zk node under
> > > `/consumers`. But with the redesign of the 0.9, how could I achieve the
> > > same thing?
> > >
> > > Thanks!
> > >
> > > --
> > > *JU Han*
> > >
> > > Software Engineer @ Teads.tv
> > >
> > > +33 061960
> > >
> >
>
>
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 061960
>


  1   2   >