Consumer seek on 0.9.0 API

2016-02-17 Thread Péricé Robin
Hi,

I'm trying to use the new Consumer API with this example :
https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

With a Producer I sent 1000 messages to my Kafka broker. I need to know if
it's possible, for example, to read message from offset 500 to 1000.

What I did :


   - consumer.seek(new TopicPartition("topic1", 0), 500);


   - final ConsumerRecords records =
   consumer.poll(1000);


But this didn't nothing (when I don't use seek() method I consume all the
messages without any problems).

Any help on this will be greatly appreciated !

Regards,

Robin


Re: Questions from new user

2016-02-17 Thread Alexis Midon
0. I don't understand how deleting log files quickly relates to file/page
cache. Only consumer read patterns are the main factor here afaik.
The OS will eventually discard unused cached pages. I'm not an expert of
page cache policies though, and will be happy to learn.

1. have a look at per-topic config
https://kafka.apache.org/documentation.html#topic-config

2.
- please tell us:
  . what version of the kafka producer/consumer you're using? 0.9, or 0.8?
  . what exact metrics you're referring to?
http://docs.confluent.io/1.0/kafka/monitoring.html

- I'm assuming you're talking about the request-size-avg metric of the
__producer__ 0.9, as described in
http://kafka.apache.org/documentation.html#new_producer_monitoring

If so, the produce request will be capped by the message max size indeed.
Another limiting factor would be the `linger.ms` setting of the producer,
in case of low message rate.
Otherwise, please share the exact metrics you're using for the consumer lag.

- Splitting/Assembling messages in the application sounds quite a pain.

- the lag could also be introduced by the application processing the
messages. Have you checked that side?



On Tue, Feb 16, 2016 at 7:30 PM allen chan 
wrote:

> Hi can anyone help with this?
>
> On Fri, Jan 29, 2016 at 11:50 PM, allen chan  >
> wrote:
>
> > Use case: We are using kafka as broker in one of our elasticsearch
> > clusters. Kafka caches the logs if elasticsearch has any performance
> > issues.  I have Kafka set to delete logs pretty quickly to keep things in
> > the file cache to limit IO.
> >
> > Questions:
> > 1. in 0.9 it seems like consumer offers are stored only in Kafka. Is
> there
> > a way to configure Kafka to delete my production logs pretty quickly but
> > have a different retention behavior for the consumer offsets?
> >
> > 2. Our consumer lag monitoring show us that a lot of times our consumers
> > are behind somewhere between 500 to 1000 messages. Looking at the JMX
> > metrics requestSizeAvg and requestSizeMax, it shows our average request
> > size is 500 bytes and max request size is 800,000 bytes. I assume the lag
> > is because that batch could only hold one message given the max is
> 100
> > bytes. I plan to enable compression and increase the max.bytes to 10mb to
> > fix this short term. In a few blogs, people mentioned the ultimate fix
> > should be splitting the message into smaller chunks in the producer and
> > then having the consumer put it back together. Is that handled in the
> kafka
> > producer/consumer natively or has to be handled outside of it?
> >
> > Thanks for the attention.
> > Allen Chan
> >
> >
> >
>
>
> --
> Allen Michael Chan
>


Re: Kafka as master data store

2016-02-17 Thread Damian Guy
Hi Ted - if the data is keyed you can use a key compacted topic and
essentially keep the data 'forever',i.e., you'll always have the latest
version of the data for a given key. However, you'd still want to backup
the data someplace else just-in-case.

On 16 February 2016 at 21:25, Ted Swerve  wrote:

> I guess I was just drawn in by the elegance of having everything available
> in one well-defined Kafka topic should I start up some new code.
>
> If instead the Kafka topics were on a retention period of say 7 days, that
> would involve firing up a topic to load the warehoused data from HDFS (or a
> more traditional load), and then switching over to the live topic?
>
> On Tue, Feb 16, 2016 at 8:32 AM, Ben Stopford  wrote:
>
> > Ted - it depends on your domain. More conservative approaches to long
> > lived data protect against data corruption, which generally means
> snapshots
> > and cold storage.
> >
> >
> > > On 15 Feb 2016, at 21:31, Ted Swerve  wrote:
> > >
> > > HI Ben, Sharninder,
> > >
> > > Thanks for your responses, I appreciate it.
> > >
> > > Ben - thanks for the tips on settings. A backup could certainly be a
> > > possibility, although if only with similar durability guarantees, I'm
> not
> > > sure what the purpose would be?
> > >
> > > Sharninder - yes, we would only be using the logs as forward-only
> > streams -
> > > i.e. picking an offset to read from and moving forwards - and would be
> > > setting retention time to essentially infinite.
> > >
> > > Regards,
> > > Ted.
> > >
> > > On Tue, Feb 16, 2016 at 5:05 AM, Sharninder Khera <
> sharnin...@gmail.com>
> > > wrote:
> > >
> > >> This topic comes up often on this list. Kafka can be used as a
> datastore
> > >> if that’s what your application wants with the caveat that Kafka isn’t
> > >> designed to keep data around forever. There is a default retention
> time
> > >> after which older data gets deleted. The high level consumer
> essentially
> > >> reads data as a stream and while you can do sort of random access with
> > the
> > >> low level consumer, its not ideal.
> > >>
> > >>
> > >>
> > >>> On 15-Feb-2016, at 10:26 PM, Ted Swerve 
> wrote:
> > >>>
> > >>> Hello,
> > >>>
> > >>> Is it viable to use infinite-retention Kafka topics as a master data
> > >>> store?  I'm not talking massive volumes of data here, but still
> > >> potentially
> > >>> extending into tens of terabytes.
> > >>>
> > >>> Are there any drawbacks or pitfalls to such an approach?  It seems
> > like a
> > >>> compelling design, but there seem to be mixed messages about its
> > >>> suitability for this kind of role.
> > >>>
> > >>> Regards,
> > >>> Ted
> > >>
> > >>
> >
> >
>


Re: Replication Factor and number of brokers

2016-02-17 Thread Alexis Midon
it will throw an exception:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L76-L78

On Tue, Feb 16, 2016 at 3:55 PM Alex Loddengaard  wrote:

> Hi Sean, you'll want equal or more brokers than your replication factor.
> Meaning, if your replication factor is 3, you'll want 3 or more brokers.
>
> I'm not sure what Kafka will do if you have fewer brokers than your
> replication factor. It will either give you the highest replication factor
> it can (in this case, the number of brokers), or it will put more than one
> replica on some brokers. My guess is the former, but again, I'm not sure.
>
> Hope this helps.
>
> Alex
>
> On Tue, Feb 16, 2016 at 7:47 AM, Damian Guy  wrote:
>
> > Then you'll have under-replicated partitions. However, even if you have 3
> > brokers with a replication factor of 2 and you lose a single broker
> you'll
> > still likely have under-replicated partitions.
> > Partitions are assigned to brokers, 1 broker will be the leader and n
> > brokers will be followers. If any of the brokers with replicas of the
> > partition on it crash then you'll have under-replicated partitions.
> >
> >
> > On 16 February 2016 at 14:45, Sean Morris (semorris)  >
> > wrote:
> >
> > > So if I have a replication factor of 2, but only 2 brokers, then
> > > replication works, but what if I lose one broker?
> > >
> > > Thanks,
> > > Sean
> > >
> > > On 2/16/16, 9:14 AM, "Damian Guy"  wrote:
> > >
> > > >Hi,
> > > >
> > > >You need to have at least replication factor brokers.
> > > >replication factor  = 1 is no replication.
> > > >
> > > >HTH,
> > > >Damian
> > > >
> > > >On 16 February 2016 at 14:08, Sean Morris (semorris) <
> > semor...@cisco.com>
> > > >wrote:
> > > >
> > > >> Should your number of brokers be atleast one more then your
> > replication
> > > >> factor of your topic(s)?
> > > >>
> > > >> So if I have a replication factor of 2, I need atleast 3 brokers?
> > > >>
> > > >> Thanks,
> > > >> Sean
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> *Alex Loddengaard | **Solutions Architect | Confluent*
> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> *
>


Re: Kafka as master data store

2016-02-17 Thread Daniel Schierbeck
I'm also very interested in using Kafka as a persistent, distributed commit
log – essentially the write side of a distributed database, with the read
side being an array of various query stores (Elasticsearch, Redis,
whatever) and stream processing systems.

The benefit of retaining data in Kafka indefinitely is the easy with which
it's possible to bootstrap new read-side technologies. I really feel that
there should be a standardized Kafka configuration optimized for this case,
with long-term durability in mind.

On Tue, Feb 16, 2016 at 10:26 PM Ted Swerve  wrote:

> I guess I was just drawn in by the elegance of having everything available
> in one well-defined Kafka topic should I start up some new code.
>
> If instead the Kafka topics were on a retention period of say 7 days, that
> would involve firing up a topic to load the warehoused data from HDFS (or a
> more traditional load), and then switching over to the live topic?
>
> On Tue, Feb 16, 2016 at 8:32 AM, Ben Stopford  wrote:
>
> > Ted - it depends on your domain. More conservative approaches to long
> > lived data protect against data corruption, which generally means
> snapshots
> > and cold storage.
> >
> >
> > > On 15 Feb 2016, at 21:31, Ted Swerve  wrote:
> > >
> > > HI Ben, Sharninder,
> > >
> > > Thanks for your responses, I appreciate it.
> > >
> > > Ben - thanks for the tips on settings. A backup could certainly be a
> > > possibility, although if only with similar durability guarantees, I'm
> not
> > > sure what the purpose would be?
> > >
> > > Sharninder - yes, we would only be using the logs as forward-only
> > streams -
> > > i.e. picking an offset to read from and moving forwards - and would be
> > > setting retention time to essentially infinite.
> > >
> > > Regards,
> > > Ted.
> > >
> > > On Tue, Feb 16, 2016 at 5:05 AM, Sharninder Khera <
> sharnin...@gmail.com>
> > > wrote:
> > >
> > >> This topic comes up often on this list. Kafka can be used as a
> datastore
> > >> if that’s what your application wants with the caveat that Kafka isn’t
> > >> designed to keep data around forever. There is a default retention
> time
> > >> after which older data gets deleted. The high level consumer
> essentially
> > >> reads data as a stream and while you can do sort of random access with
> > the
> > >> low level consumer, its not ideal.
> > >>
> > >>
> > >>
> > >>> On 15-Feb-2016, at 10:26 PM, Ted Swerve 
> wrote:
> > >>>
> > >>> Hello,
> > >>>
> > >>> Is it viable to use infinite-retention Kafka topics as a master data
> > >>> store?  I'm not talking massive volumes of data here, but still
> > >> potentially
> > >>> extending into tens of terabytes.
> > >>>
> > >>> Are there any drawbacks or pitfalls to such an approach?  It seems
> > like a
> > >>> compelling design, but there seem to be mixed messages about its
> > >>> suitability for this kind of role.
> > >>>
> > >>> Regards,
> > >>> Ted
> > >>
> > >>
> >
> >
>


Re: Compression - MessageSet size

2016-02-17 Thread Alexis Midon
- it would be interesting to see the actual ProduceRequests/Responses and
FetchReq/Resp.
- at this point I would dive into the broker source code and follow the
fetch request handling.
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L418



On Tue, Feb 16, 2016 at 10:24 AM Oleksiy Krivoshey 
wrote:

> I'm not using Java client, I'm developing my own:
> https://github.com/oleksiyk/kafka
> And I'm talking about MessageSet on the wire, not the one passed to library
> user.
>
> Example:
> Consumer is not started.
> Producer sends a batch #1 of 5 messages, compressed in a single message.
> delay 50 ms
> Producer sends a batch #2 of 5 messages, compressed in a single message.
>
> Start consumer and it will receive a single MessageSet with two messages in
> it, both messages are corresponding compressed batches. Both have codec=2
> (using Snappy). In order to receive all 10 original messages I need to
> decompress both messages in this single MessageSet. Yes, I will then
> concatenate these messages and send a single MessageSet with 10 messages to
> library user, but I just want to clarify, the Protocol Guide says there
> should be single compressed message but I'm able to receive 2, 3 and more,
> all in single MessageSet.
>
> Kafka 0.9
>
> I can provide the actual buffers received.
>
> Thanks!
>
>
> On Tue, 16 Feb 2016 at 20:01 Alexis Midon  .invalid>
> wrote:
>
> > What makes you think there are 2? would you have data or code to share?
> >
> > When compression is enabled, multiple messages will be packed and
> > compressed in a MessageSet. That MessageSet will then have a single
> > message.
> > The interface however will let you iterate over the unpacked messages.
> See
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L166
> >
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >
> >
> >
> > On Tue, Feb 16, 2016 at 3:33 AM Oleksiy Krivoshey 
> > wrote:
> >
> > > Hi!
> > >
> > > The Kafka 0.9 protocol guide
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Compression
> > > explicitly
> > > states that there should be only single compressed message in a
> > MessageSet,
> > > but I'm definitely receiving two compressed messages in a single
> > > MessageSet.
> > >
> > > Can someone please confirm if this should be expected behaviour?
> > >
> > > Thanks!
> > >
> >
>


Re: Replication Factor and number of brokers

2016-02-17 Thread Ben Stopford
If you create a topic with more replicas than brokers it should throw an
error but if you lose a broker you'd have under replicated partitions.

B

On Tuesday, 16 February 2016, Alex Loddengaard  wrote:

> Hi Sean, you'll want equal or more brokers than your replication factor.
> Meaning, if your replication factor is 3, you'll want 3 or more brokers.
>
> I'm not sure what Kafka will do if you have fewer brokers than your
> replication factor. It will either give you the highest replication factor
> it can (in this case, the number of brokers), or it will put more than one
> replica on some brokers. My guess is the former, but again, I'm not sure.
>
> Hope this helps.
>
> Alex
>
> On Tue, Feb 16, 2016 at 7:47 AM, Damian Guy  > wrote:
>
> > Then you'll have under-replicated partitions. However, even if you have 3
> > brokers with a replication factor of 2 and you lose a single broker
> you'll
> > still likely have under-replicated partitions.
> > Partitions are assigned to brokers, 1 broker will be the leader and n
> > brokers will be followers. If any of the brokers with replicas of the
> > partition on it crash then you'll have under-replicated partitions.
> >
> >
> > On 16 February 2016 at 14:45, Sean Morris (semorris)  >
> > wrote:
> >
> > > So if I have a replication factor of 2, but only 2 brokers, then
> > > replication works, but what if I lose one broker?
> > >
> > > Thanks,
> > > Sean
> > >
> > > On 2/16/16, 9:14 AM, "Damian Guy" >
> wrote:
> > >
> > > >Hi,
> > > >
> > > >You need to have at least replication factor brokers.
> > > >replication factor  = 1 is no replication.
> > > >
> > > >HTH,
> > > >Damian
> > > >
> > > >On 16 February 2016 at 14:08, Sean Morris (semorris) <
> > semor...@cisco.com >
> > > >wrote:
> > > >
> > > >> Should your number of brokers be atleast one more then your
> > replication
> > > >> factor of your topic(s)?
> > > >>
> > > >> So if I have a replication factor of 2, I need atleast 3 brokers?
> > > >>
> > > >> Thanks,
> > > >> Sean
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
> *Alex Loddengaard | **Solutions Architect | Confluent*
> *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> *
>


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
BTW, what's the etiquette for votes (non-binding) for this community?
welcomed? noise?
happy to see the non-binding votes, I'd like to contribute, just don't want
to pollute the vote call. thoughts?
thanks!

On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:

> Thanks everyone for voting. The results are:
>
> +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy and
> Jun
> Rao)
> +1 non-binding = 3 votes
> -1 = 0 votes
> 0 = 0 votes
>
> The vote passes.
>
> I will release artifacts to maven central, update the dist svn and download
> site. Will send out an announce after that.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
>
> > This is the first candidate for release of Apache Kafka 0.9.0.1. This a
> > bug fix release that fixes 70 issues.
> >
> > Release Notes for the 0.9.0.1 release
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > and sha2 (SHA256) checksum.
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> >
> > * Maven artifacts to be voted upon prior to release:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * scala-doc
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> >
> > * java-doc
> > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> >
> > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> >
> > * Documentation
> > http://kafka.apache.org/090/documentation.html
> >
> > Thanks,
> >
> > Jun
> >
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Alexis Midon
By "re-connect", I'm assuming that the ZK session is expired, not
disconnected.
For details see
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions

In that case, the high level consumer is basically dead, and the
application should create a new instance of it.


On Mon, Feb 15, 2016 at 12:22 PM Joe San  wrote:

> Any ideas as to which property should I set to enable Zookeeper
> re-connection? I have the following properties defined for my consumer
> (High Level Consumer API). Is this enough for a automatic Zookeeper
> re-connect?
>
> val props = new Properties()
> props.put("zookeeper.connect", zookeeper)
> props.put("group.id", groupId)
> props.put("auto.commit.enabled", "false")
> // this timeout is needed so that we do not block on the stream!
> props.put("consumer.timeout.ms", "1")
> props.put("zookeeper.sync.time.ms", "200")
>


Kafka response ordering guarantees

2016-02-17 Thread Ivan Dyachkov
Hello all.

I'm developing a kafka client and have a question about kafka server guarantees.

A statement from 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Network
 makes me a bit confused:

"The server guarantees that on a single TCP connection, requests will be 
processed in the order they are sent and responses will return in that order as 
well. The broker's request processing allows only a single in-flight request 
per connection in order to guarantee this ordering. Note that clients can (and 
ideally should) use non-blocking IO to implement request pipelining and achieve 
higher throughput. i.e., clients can send requests even while awaiting 
responses for preceding requests since the outstanding requests will be 
buffered in the underlying OS socket buffer. All requests are initiated by the 
client, and result in a corresponding response message from the server except 
where noted."

Does this mean that when a client is sending more than one in-flight request 
per connection, the server does not guarantee that responses will be sent in 
the same order as requests?

In other words, if I have a strictly monotonically increasing integer as a 
correlation id for all requests, can I rely on Kafka that correlation id in 
responses will also have this property?

Thanks.

/Ivan


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
Yep, assuming you haven't completely partitioned that client from the
cluster, ZK should automatically try to connect/reconnect to other peers in
the server list. Otherwise, it's as Alexis said -- your session would
expire; you'd have to recreate the session once you have connectivity

On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
alexis.mi...@airbnb.com.invalid> wrote:

> By "re-connect", I'm assuming that the ZK session is expired, not
> disconnected.
> For details see
>
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
>
> In that case, the high level consumer is basically dead, and the
> application should create a new instance of it.
>
>
> On Mon, Feb 15, 2016 at 12:22 PM Joe San  wrote:
>
> > Any ideas as to which property should I set to enable Zookeeper
> > re-connection? I have the following properties defined for my consumer
> > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > re-connect?
> >
> > val props = new Properties()
> > props.put("zookeeper.connect", zookeeper)
> > props.put("group.id", groupId)
> > props.put("auto.commit.enabled", "false")
> > // this timeout is needed so that we do not block on the stream!
> > props.put("consumer.timeout.ms", "1")
> > props.put("zookeeper.sync.time.ms", "200")
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Joe San
It is all pretty strange. Here is what I see in my logs as soon as I
voluntarily shutdown Zookeeper!

java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[na:1.8.0_60]
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
~[zookeeper-3.4.6.jar:3.4.6-1569965]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
~[zookeeper-3.4.6.jar:3.4.6-1569965]
20160217-20:12:44.960+0100
[sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket
connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
authenticate using SASL (unknown error)
20160217-20:12:44.960+0100
[sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
0x152ea19656b005c for server null, unexpected error, closing socket
connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[na:1.8.0_60]
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
~[zookeeper-3.4.6.jar:3.4.6-1569965]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
~[zookeeper-3.4.6.jar:3.4.6-1569965]

It just keep repeating trying to reconnect for ever! So I just wanted to
know which property from my setting in my email above is responsible for
this auto reconnect mechanism?

On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta 
wrote:

> Yep, assuming you haven't completely partitioned that client from the
> cluster, ZK should automatically try to connect/reconnect to other peers in
> the server list. Otherwise, it's as Alexis said -- your session would
> expire; you'd have to recreate the session once you have connectivity
>
> On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> alexis.mi...@airbnb.com.invalid> wrote:
>
> > By "re-connect", I'm assuming that the ZK session is expired, not
> > disconnected.
> > For details see
> >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> >
> > In that case, the high level consumer is basically dead, and the
> > application should create a new instance of it.
> >
> >
> > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> wrote:
> >
> > > Any ideas as to which property should I set to enable Zookeeper
> > > re-connection? I have the following properties defined for my consumer
> > > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > > re-connect?
> > >
> > > val props = new Properties()
> > > props.put("zookeeper.connect", zookeeper)
> > > props.put("group.id", groupId)
> > > props.put("auto.commit.enabled", "false")
> > > // this timeout is needed so that we do not block on the stream!
> > > props.put("consumer.timeout.ms", "1")
> > > props.put("zookeeper.sync.time.ms", "200")
> > >
> >
>
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Resetting Kafka Offsets -- and What are offsets.... exactly?

2016-02-17 Thread John Bickerstaff
*Use Case: Disaster Recovery & Re-indexing SOLR*

I'm using Kafka to hold messages from a service that prepares "documents"
for SOLR.

A second micro service (a consumer) requests these messages, does any final
processing, and fires them into SOLR.

The whole thing is (in part) designed to be used for disaster recovery -
allowing the rebuild of the SOLR index in the shortest possible time.

To do this (and to be able to use it for re-indexing SOLR while testing
relevancy) I need to be able to "play all messages from the beginning" at
will.

I find I can use the zkCli.sh tool to delete the Consumer Group Name like
this:
 rmr /kafka/consumers/myGroupName

After which my microservice will get all the messages again when it runs.

I was trying to find a way to do this programmatically without actually
using the "low level" consumer api since the high level one is so simple
and my code already works.  So I started playing with Zookeeper api for
duplicating "rmr /kafka/consumers/myGroupName"

*The Question: What does that offset actually represent?*

It was at this point that I discovered the offset must represent something
other than what I thought it would.  Things obviously work, but I'm
wondering what - exactly do the offsets represent?

To clarify - if I run this command on a zookeeper node, after the
microservice has run:
 get /kafka/consumers/myGroupName/offsets/myTopicName/0

I get the following:

30024
cZxid = 0x360355
ctime = Fri Feb 12 07:27:50 MST 2016
mZxid = 0x360357
mtime = Fri Feb 12 07:29:50 MST 2016
pZxid = 0x360355
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

Now - I have exactly 3500 messages in this Kafka topic.  I verify that by
running this command:
 bin/kafka-console-consumer.sh --zookeeper 192.168.56.5:2181/kafka
--topic myTopicName --from-beginning

When I hit Ctrl-C, it tells me it consumed 3500 messages.

So - what does that 30024 actually represent?  If I reset that number to 1
or 0 and re-run my consumer microservice, I get all the messages again -
and the number again goes to 30024.  However, I'm not comfortable to trust
that because my assumption that the number represents a simple count of
messages that have been sent to this consumer is obviously wrong.

(I reset the number like this -- to 1 -- and assume there's an API command
that will do it too.)
 set /kafka/consumers/myGroupName/offsets/myTopicName/0 1

Can someone help me clarify or point me at a doc that explains what is
getting counted here?  You can shoot me if you like for attempting the
hack-ish solution of re-setting the offset through the Zookeeper API, but I
would still like to understand what, exactly, is represented by that number
30024.

I need to hand off to IT for the Disaster Recovery portion and saying
"trust me, it just works" isn't going to fly very far...

Thanks.


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
i believe reconnect is handled automatically by the client... is that what
you're asking
peek here to see how it does that and when:

https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153

On Wed, Feb 17, 2016 at 12:14 PM, Joe San  wrote:

> It is all pretty strange. Here is what I see in my logs as soon as I
> voluntarily shutdown Zookeeper!
>
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_60]
> at
>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> 20160217-20:12:44.960+0100
> [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket
> connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 20160217-20:12:44.960+0100
> [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> 0x152ea19656b005c for server null, unexpected error, closing socket
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_60]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_60]
> at
>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> ~[zookeeper-3.4.6.jar:3.4.6-1569965]
>
> It just keep repeating trying to reconnect for ever! So I just wanted to
> know which property from my setting in my email above is responsible for
> this auto reconnect mechanism?
>
> On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> christian.po...@gmail.com>
> wrote:
>
> > Yep, assuming you haven't completely partitioned that client from the
> > cluster, ZK should automatically try to connect/reconnect to other peers
> in
> > the server list. Otherwise, it's as Alexis said -- your session would
> > expire; you'd have to recreate the session once you have connectivity
> >
> > On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> > alexis.mi...@airbnb.com.invalid> wrote:
> >
> > > By "re-connect", I'm assuming that the ZK session is expired, not
> > > disconnected.
> > > For details see
> > >
> > >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> > >
> > > In that case, the high level consumer is basically dead, and the
> > > application should create a new instance of it.
> > >
> > >
> > > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> > wrote:
> > >
> > > > Any ideas as to which property should I set to enable Zookeeper
> > > > re-connection? I have the following properties defined for my
> consumer
> > > > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > > > re-connect?
> > > >
> > > > val props = new Properties()
> > > > props.put("zookeeper.connect", zookeeper)
> > > > props.put("group.id", groupId)
> > > > props.put("auto.commit.enabled", "false")
> > > > // this timeout is needed so that we do not block on the stream!
> > > > props.put("consumer.timeout.ms", "1")
> > > > props.put("zookeeper.sync.time.ms", "200")
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Joe San
So if I use the High Level Consumer API, using the ConsumerConnector, I get
this automatic zookeeper connection for free?

On Wed, Feb 17, 2016 at 8:25 PM, Christian Posta 
wrote:

> i believe reconnect is handled automatically by the client... is that what
> you're asking
> peek here to see how it does that and when:
>
>
> https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153
>
> On Wed, Feb 17, 2016 at 12:14 PM, Joe San  wrote:
>
> > It is all pretty strange. Here is what I see in my logs as soon as I
> > voluntarily shutdown Zookeeper!
> >
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.8.0_60]
> > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > ~[na:1.8.0_60]
> > at
> >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > 20160217-20:12:44.960+0100
> > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket
> > connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> > authenticate using SASL (unknown error)
> > 20160217-20:12:44.960+0100
> > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> > 0x152ea19656b005c for server null, unexpected error, closing socket
> > connection and attempting reconnect
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.8.0_60]
> > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > ~[na:1.8.0_60]
> > at
> >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> >
> > It just keep repeating trying to reconnect for ever! So I just wanted to
> > know which property from my setting in my email above is responsible for
> > this auto reconnect mechanism?
> >
> > On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> > christian.po...@gmail.com>
> > wrote:
> >
> > > Yep, assuming you haven't completely partitioned that client from the
> > > cluster, ZK should automatically try to connect/reconnect to other
> peers
> > in
> > > the server list. Otherwise, it's as Alexis said -- your session would
> > > expire; you'd have to recreate the session once you have connectivity
> > >
> > > On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> > > alexis.mi...@airbnb.com.invalid> wrote:
> > >
> > > > By "re-connect", I'm assuming that the ZK session is expired, not
> > > > disconnected.
> > > > For details see
> > > >
> > > >
> > >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> > > >
> > > > In that case, the high level consumer is basically dead, and the
> > > > application should create a new instance of it.
> > > >
> > > >
> > > > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> > > wrote:
> > > >
> > > > > Any ideas as to which property should I set to enable Zookeeper
> > > > > re-connection? I have the following properties defined for my
> > consumer
> > > > > (High Level Consumer API). Is this enough for a automatic Zookeeper
> > > > > re-connect?
> > > > >
> > > > > val props = new Properties()
> > > > > props.put("zookeeper.connect", zookeeper)
> > > > > props.put("group.id", groupId)
> > > > > props.put("auto.commit.enabled", "false")
> > > > > // this timeout is needed so that we do not block on the stream!
> > > > > props.put("consumer.timeout.ms", "1")
> > > > > props.put("zookeeper.sync.time.ms", "200")
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Christian Posta*
> > > twitter: @christianposta
> > > http://www.christianposta.com/blog
> > > http://fabric8.io
> > >
> >
>
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Re: Resetting Kafka Offsets -- and What are offsets.... exactly?

2016-02-17 Thread Christian Posta
The number is the log-ordered number of bytes. So really, the offset is
kinda like the "number of bytes" to begin reading from. 0 means read the
log from the beginning. The second message is 0 + size of message. So the
message "ids" are really just the offset of the previous message sizes.

For example, if I have three messages of 10 bytes each, and set the
consumer offset to 0, i'll read everything. If you set the offset to 10,
I'll read the second and third messages, and so on.

see more here:
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
and here: http://kafka.apache.org/documentation.html#introduction

HTH!

On Wed, Feb 17, 2016 at 12:16 PM, John Bickerstaff  wrote:

> *Use Case: Disaster Recovery & Re-indexing SOLR*
>
> I'm using Kafka to hold messages from a service that prepares "documents"
> for SOLR.
>
> A second micro service (a consumer) requests these messages, does any final
> processing, and fires them into SOLR.
>
> The whole thing is (in part) designed to be used for disaster recovery -
> allowing the rebuild of the SOLR index in the shortest possible time.
>
> To do this (and to be able to use it for re-indexing SOLR while testing
> relevancy) I need to be able to "play all messages from the beginning" at
> will.
>
> I find I can use the zkCli.sh tool to delete the Consumer Group Name like
> this:
>  rmr /kafka/consumers/myGroupName
>
> After which my microservice will get all the messages again when it runs.
>
> I was trying to find a way to do this programmatically without actually
> using the "low level" consumer api since the high level one is so simple
> and my code already works.  So I started playing with Zookeeper api for
> duplicating "rmr /kafka/consumers/myGroupName"
>
> *The Question: What does that offset actually represent?*
>
> It was at this point that I discovered the offset must represent something
> other than what I thought it would.  Things obviously work, but I'm
> wondering what - exactly do the offsets represent?
>
> To clarify - if I run this command on a zookeeper node, after the
> microservice has run:
>  get /kafka/consumers/myGroupName/offsets/myTopicName/0
>
> I get the following:
>
> 30024
> cZxid = 0x360355
> ctime = Fri Feb 12 07:27:50 MST 2016
> mZxid = 0x360357
> mtime = Fri Feb 12 07:29:50 MST 2016
> pZxid = 0x360355
> cversion = 0
> dataVersion = 2
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 5
> numChildren = 0
>
> Now - I have exactly 3500 messages in this Kafka topic.  I verify that by
> running this command:
>  bin/kafka-console-consumer.sh --zookeeper 192.168.56.5:2181/kafka
> --topic myTopicName --from-beginning
>
> When I hit Ctrl-C, it tells me it consumed 3500 messages.
>
> So - what does that 30024 actually represent?  If I reset that number to 1
> or 0 and re-run my consumer microservice, I get all the messages again -
> and the number again goes to 30024.  However, I'm not comfortable to trust
> that because my assumption that the number represents a simple count of
> messages that have been sent to this consumer is obviously wrong.
>
> (I reset the number like this -- to 1 -- and assume there's an API command
> that will do it too.)
>  set /kafka/consumers/myGroupName/offsets/myTopicName/0 1
>
> Can someone help me clarify or point me at a doc that explains what is
> getting counted here?  You can shoot me if you like for attempting the
> hack-ish solution of re-setting the offset through the Zookeeper API, but I
> would still like to understand what, exactly, is represented by that number
> 30024.
>
> I need to hand off to IT for the Disaster Recovery portion and saying
> "trust me, it just works" isn't going to fly very far...
>
> Thanks.
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Enable Kafka Consumer 0.8.2.1 Reconnect to Zookeeper

2016-02-17 Thread Christian Posta
I believe so. Happy to be corrected.

On Wed, Feb 17, 2016 at 12:31 PM, Joe San  wrote:

> So if I use the High Level Consumer API, using the ConsumerConnector, I get
> this automatic zookeeper connection for free?
>
> On Wed, Feb 17, 2016 at 8:25 PM, Christian Posta <
> christian.po...@gmail.com>
> wrote:
>
> > i believe reconnect is handled automatically by the client... is that
> what
> > you're asking
> > peek here to see how it does that and when:
> >
> >
> >
> https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java#L1153
> >
> > On Wed, Feb 17, 2016 at 12:14 PM, Joe San 
> wrote:
> >
> > > It is all pretty strange. Here is what I see in my logs as soon as I
> > > voluntarily shutdown Zookeeper!
> > >
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > ~[na:1.8.0_60]
> > > at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > ~[na:1.8.0_60]
> > > at
> > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > 20160217-20:12:44.960+0100
> > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > 127.0.0.1:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening
> socket
> > > connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to
> > > authenticate using SASL (unknown error)
> > > 20160217-20:12:44.960+0100
> > > [sally-kafka-consumer-akka.actor.default-dispatcher-4-SendThread(
> > > 127.0.0.1:2181)] WARN  org.apache.zookeeper.ClientCnxn - Session
> > > 0x152ea19656b005c for server null, unexpected error, closing socket
> > > connection and attempting reconnect
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > ~[na:1.8.0_60]
> > > at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > ~[na:1.8.0_60]
> > > at
> > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > ~[zookeeper-3.4.6.jar:3.4.6-1569965]
> > >
> > > It just keep repeating trying to reconnect for ever! So I just wanted
> to
> > > know which property from my setting in my email above is responsible
> for
> > > this auto reconnect mechanism?
> > >
> > > On Wed, Feb 17, 2016 at 8:04 PM, Christian Posta <
> > > christian.po...@gmail.com>
> > > wrote:
> > >
> > > > Yep, assuming you haven't completely partitioned that client from the
> > > > cluster, ZK should automatically try to connect/reconnect to other
> > peers
> > > in
> > > > the server list. Otherwise, it's as Alexis said -- your session would
> > > > expire; you'd have to recreate the session once you have connectivity
> > > >
> > > > On Wed, Feb 17, 2016 at 2:30 AM, Alexis Midon <
> > > > alexis.mi...@airbnb.com.invalid> wrote:
> > > >
> > > > > By "re-connect", I'm assuming that the ZK session is expired, not
> > > > > disconnected.
> > > > > For details see
> > > > >
> > > > >
> > > >
> > >
> >
> http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
> > > > >
> > > > > In that case, the high level consumer is basically dead, and the
> > > > > application should create a new instance of it.
> > > > >
> > > > >
> > > > > On Mon, Feb 15, 2016 at 12:22 PM Joe San 
> > > > wrote:
> > > > >
> > > > > > Any ideas as to which property should I set to enable Zookeeper
> > > > > > re-connection? I have the following properties defined for my
> > > consumer
> > > > > > (High Level Consumer API). Is this enough for a automatic
> Zookeeper
> > > > > > re-connect?
> > > > > >
> > > > > > val props = new Properties()
> > > > > > props.put("zookeeper.connect", zookeeper)
> > > > > > props.put("group.id", groupId)
> > > > > > props.put("auto.commit.enabled", "false")
> > > > > > // this timeout is needed so that we do not block on the stream!
> > > > > > props.put("consumer.timeout.ms", "1")
> > > > > > props.put("zookeeper.sync.time.ms", "200")
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Christian Posta*
> > > > twitter: @christianposta
> > > > http://www.christianposta.com/blog
> > > > http://fabric8.io
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Consumer seek on 0.9.0 API

2016-02-17 Thread Alex Loddengaard
Hi Robin,

I believe seek() needs to be called after the consumer gets its partition
assignments. Try calling poll() before you call seek(), then poll() again
and process the records from the latter poll().

There may be a better way to do this -- let's see if anyone else has a
suggestion.

Alex

On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin 
wrote:

> Hi,
>
> I'm trying to use the new Consumer API with this example :
>
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
>
> With a Producer I sent 1000 messages to my Kafka broker. I need to know if
> it's possible, for example, to read message from offset 500 to 1000.
>
> What I did :
>
>
>- consumer.seek(new TopicPartition("topic1", 0), 500);
>
>
>- final ConsumerRecords records =
>consumer.poll(1000);
>
>
> But this didn't nothing (when I don't use seek() method I consume all the
> messages without any problems).
>
> Any help on this will be greatly appreciated !
>
> Regards,
>
> Robin
>



-- 
*Alex Loddengaard | **Solutions Architect | Confluent*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Replication Factor and number of brokers

2016-02-17 Thread Alex Loddengaard
Thanks, Alexis and Ben!

Alex

On Wed, Feb 17, 2016 at 5:57 AM, Ben Stopford  wrote:

> If you create a topic with more replicas than brokers it should throw an
> error but if you lose a broker you'd have under replicated partitions.
>
> B
>
> On Tuesday, 16 February 2016, Alex Loddengaard  wrote:
>
> > Hi Sean, you'll want equal or more brokers than your replication factor.
> > Meaning, if your replication factor is 3, you'll want 3 or more brokers.
> >
> > I'm not sure what Kafka will do if you have fewer brokers than your
> > replication factor. It will either give you the highest replication
> factor
> > it can (in this case, the number of brokers), or it will put more than
> one
> > replica on some brokers. My guess is the former, but again, I'm not sure.
> >
> > Hope this helps.
> >
> > Alex
> >
> > On Tue, Feb 16, 2016 at 7:47 AM, Damian Guy  > > wrote:
> >
> > > Then you'll have under-replicated partitions. However, even if you
> have 3
> > > brokers with a replication factor of 2 and you lose a single broker
> > you'll
> > > still likely have under-replicated partitions.
> > > Partitions are assigned to brokers, 1 broker will be the leader and n
> > > brokers will be followers. If any of the brokers with replicas of the
> > > partition on it crash then you'll have under-replicated partitions.
> > >
> > >
> > > On 16 February 2016 at 14:45, Sean Morris (semorris) <
> semor...@cisco.com
> > >
> > > wrote:
> > >
> > > > So if I have a replication factor of 2, but only 2 brokers, then
> > > > replication works, but what if I lose one broker?
> > > >
> > > > Thanks,
> > > > Sean
> > > >
> > > > On 2/16/16, 9:14 AM, "Damian Guy"  >
> > wrote:
> > > >
> > > > >Hi,
> > > > >
> > > > >You need to have at least replication factor brokers.
> > > > >replication factor  = 1 is no replication.
> > > > >
> > > > >HTH,
> > > > >Damian
> > > > >
> > > > >On 16 February 2016 at 14:08, Sean Morris (semorris) <
> > > semor...@cisco.com >
> > > > >wrote:
> > > > >
> > > > >> Should your number of brokers be atleast one more then your
> > > replication
> > > > >> factor of your topic(s)?
> > > > >>
> > > > >> So if I have a replication factor of 2, I need atleast 3 brokers?
> > > > >>
> > > > >> Thanks,
> > > > >> Sean
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > *Alex Loddengaard | **Solutions Architect | Confluent*
> > *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> > *
> >
>



-- 
*Alex Loddengaard | **Solutions Architect | Confluent*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: 0.9.0.1 RC1

2016-02-17 Thread Jun Rao
Christian,

Similar to other Apache projects, a vote from a committer is considered
binding. During the voting process, we encourage non-committers to vote as
well. We will cancel the release even if a critical issue is reported from
a non-committer.

Thanks,

Jun

On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta  wrote:

> BTW, what's the etiquette for votes (non-binding) for this community?
> welcomed? noise?
> happy to see the non-binding votes, I'd like to contribute, just don't want
> to pollute the vote call. thoughts?
> thanks!
>
> On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
>
> > Thanks everyone for voting. The results are:
> >
> > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy
> and
> > Jun
> > Rao)
> > +1 non-binding = 3 votes
> > -1 = 0 votes
> > 0 = 0 votes
> >
> > The vote passes.
> >
> > I will release artifacts to maven central, update the dist svn and
> download
> > site. Will send out an announce after that.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> >
> > > This is the first candidate for release of Apache Kafka 0.9.0.1. This a
> > > bug fix release that fixes 70 issues.
> > >
> > > Release Notes for the 0.9.0.1 release
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > and sha2 (SHA256) checksum.
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > >
> > > * Maven artifacts to be voted upon prior to release:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * scala-doc
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > >
> > > * java-doc
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > >
> > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > >
> > > * Documentation
> > > http://kafka.apache.org/090/documentation.html
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> >
>
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
Awesome, glad to hear. Thanks Jun!

On Wed, Feb 17, 2016 at 12:57 PM, Jun Rao  wrote:

> Christian,
>
> Similar to other Apache projects, a vote from a committer is considered
> binding. During the voting process, we encourage non-committers to vote as
> well. We will cancel the release even if a critical issue is reported from
> a non-committer.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta <
> christian.po...@gmail.com
> > wrote:
>
> > BTW, what's the etiquette for votes (non-binding) for this community?
> > welcomed? noise?
> > happy to see the non-binding votes, I'd like to contribute, just don't
> want
> > to pollute the vote call. thoughts?
> > thanks!
> >
> > On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
> >
> > > Thanks everyone for voting. The results are:
> > >
> > > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy
> > and
> > > Jun
> > > Rao)
> > > +1 non-binding = 3 votes
> > > -1 = 0 votes
> > > 0 = 0 votes
> > >
> > > The vote passes.
> > >
> > > I will release artifacts to maven central, update the dist svn and
> > download
> > > site. Will send out an announce after that.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> > >
> > > > This is the first candidate for release of Apache Kafka 0.9.0.1.
> This a
> > > > bug fix release that fixes 70 issues.
> > > >
> > > > Release Notes for the 0.9.0.1 release
> > > >
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > > >
> > > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > > >
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > > and sha2 (SHA256) checksum.
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > > >
> > > > * Maven artifacts to be voted upon prior to release:
> > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > >
> > > > * scala-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > > >
> > > > * java-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > > >
> > > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > > >
> > > > * Documentation
> > > > http://kafka.apache.org/090/documentation.html
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: 0.9.0.1 RC1

2016-02-17 Thread Gwen Shapira
Actually, for releases, committers are non-binding. PMC votes are the only
binding ones for releases.

On Wed, Feb 17, 2016 at 11:57 AM, Jun Rao  wrote:

> Christian,
>
> Similar to other Apache projects, a vote from a committer is considered
> binding. During the voting process, we encourage non-committers to vote as
> well. We will cancel the release even if a critical issue is reported from
> a non-committer.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta <
> christian.po...@gmail.com
> > wrote:
>
> > BTW, what's the etiquette for votes (non-binding) for this community?
> > welcomed? noise?
> > happy to see the non-binding votes, I'd like to contribute, just don't
> want
> > to pollute the vote call. thoughts?
> > thanks!
> >
> > On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
> >
> > > Thanks everyone for voting. The results are:
> > >
> > > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel Koshy
> > and
> > > Jun
> > > Rao)
> > > +1 non-binding = 3 votes
> > > -1 = 0 votes
> > > 0 = 0 votes
> > >
> > > The vote passes.
> > >
> > > I will release artifacts to maven central, update the dist svn and
> > download
> > > site. Will send out an announce after that.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> > >
> > > > This is the first candidate for release of Apache Kafka 0.9.0.1.
> This a
> > > > bug fix release that fixes 70 issues.
> > > >
> > > > Release Notes for the 0.9.0.1 release
> > > >
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > > >
> > > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > > >
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > > and sha2 (SHA256) checksum.
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > > >
> > > > * Maven artifacts to be voted upon prior to release:
> > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > >
> > > > * scala-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > > >
> > > > * java-doc
> > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > > >
> > > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > > >
> > > > * Documentation
> > > > http://kafka.apache.org/090/documentation.html
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
> >
>


Re: 0.9.0.1 RC1

2016-02-17 Thread Christian Posta
yep! http://www.apache.org/dev/release-publishing.html#voted

On Wed, Feb 17, 2016 at 1:05 PM, Gwen Shapira  wrote:

> Actually, for releases, committers are non-binding. PMC votes are the only
> binding ones for releases.
>
> On Wed, Feb 17, 2016 at 11:57 AM, Jun Rao  wrote:
>
> > Christian,
> >
> > Similar to other Apache projects, a vote from a committer is considered
> > binding. During the voting process, we encourage non-committers to vote
> as
> > well. We will cancel the release even if a critical issue is reported
> from
> > a non-committer.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 16, 2016 at 11:05 PM, Christian Posta <
> > christian.po...@gmail.com
> > > wrote:
> >
> > > BTW, what's the etiquette for votes (non-binding) for this community?
> > > welcomed? noise?
> > > happy to see the non-binding votes, I'd like to contribute, just don't
> > want
> > > to pollute the vote call. thoughts?
> > > thanks!
> > >
> > > On Tue, Feb 16, 2016 at 10:56 PM, Jun Rao  wrote:
> > >
> > > > Thanks everyone for voting. The results are:
> > > >
> > > > +1 binding = 4 votes (Ewen Cheslack-Postava, Neha Narkhede, Joel
> Koshy
> > > and
> > > > Jun
> > > > Rao)
> > > > +1 non-binding = 3 votes
> > > > -1 = 0 votes
> > > > 0 = 0 votes
> > > >
> > > > The vote passes.
> > > >
> > > > I will release artifacts to maven central, update the dist svn and
> > > download
> > > > site. Will send out an announce after that.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:
> > > >
> > > > > This is the first candidate for release of Apache Kafka 0.9.0.1.
> > This a
> > > > > bug fix release that fixes 70 issues.
> > > > >
> > > > > Release Notes for the 0.9.0.1 release
> > > > >
> > > >
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > > > >
> > > > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > > > >
> > > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > > > and sha2 (SHA256) checksum.
> > > > >
> > > > > * Release artifacts to be voted upon (source and binary):
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > > > >
> > > > > * Maven artifacts to be voted upon prior to release:
> > > > >
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * scala-doc
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > > > >
> > > > > * java-doc
> > > > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > > > >
> > > > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1
> tag
> > > > >
> > > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > > > >
> > > > > * Documentation
> > > > > http://kafka.apache.org/090/documentation.html
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Christian Posta*
> > > twitter: @christianposta
> > > http://www.christianposta.com/blog
> > > http://fabric8.io
> > >
> >
>



-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Kafka connect HDFS conenctor

2016-02-17 Thread Venkatesh Rudraraju
Hi,

I tried using the HDFS connector sink with kafka-connect and works as
described->
http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html

My Scenario :

I have plain Json data in a kafka topic. Can I still use HDFS connector
sink to read data from kafka-topic and write to HDFS in avro format ?

As I read from the documentation, HDFS connector expects data in kafka
already in avro format? Is there a workaround where I can consume plain
Json and write to HDFS in avro ? Say I have a schema for the plain json
data.

Thanks,
Venkatesh


Re: Resetting Kafka Offsets -- and What are offsets.... exactly?

2016-02-17 Thread John Bickerstaff
Thank you Christian -- I appreciate your taking the time to help me out on
this.

Here's what I found while continuing to dig into this.

If I take 30024 and subtract the number of messages I know I have in Kafka
(3500) I get 26524.

If I reset thus:  set /kafka/consumers/myGroupName/offsets/myTopicName/0
26524

... and then re-run my consumer - I get all 3500 messages again.

If I do this: set /kafka/consumers/myGroupName/offsets/myTopicName/0 26624

In other words, I increase the offset number by 100 -- then I get exactly
3400 messages on my consumer --  exactly 100 less than before which I think
makes sense, since I started the offset 100 higher...

This seems to suggest that each number between 26624 and 30024 in the log
represents one of my 3500 messages on this topic, but what you say suggests
that they represent byte count of the actual messages and not "one number
per message"...

I also find that if I issue this command:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic=myTopicName
--broker-list=192.168.56.3:9092  --time=-2

I get back that same number -- 26524...

H  A little confused still...  These messages are literally stored
in the Kafka logs, yes?  I think I'll go digging in there and see...

Thanks again!





On Wed, Feb 17, 2016 at 12:38 PM, Christian Posta  wrote:

> The number is the log-ordered number of bytes. So really, the offset is
> kinda like the "number of bytes" to begin reading from. 0 means read the
> log from the beginning. The second message is 0 + size of message. So the
> message "ids" are really just the offset of the previous message sizes.
>
> For example, if I have three messages of 10 bytes each, and set the
> consumer offset to 0, i'll read everything. If you set the offset to 10,
> I'll read the second and third messages, and so on.
>
> see more here:
>
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> and here: http://kafka.apache.org/documentation.html#introduction
>
> HTH!
>
> On Wed, Feb 17, 2016 at 12:16 PM, John Bickerstaff <
> j...@johnbickerstaff.com
> > wrote:
>
> > *Use Case: Disaster Recovery & Re-indexing SOLR*
> >
> > I'm using Kafka to hold messages from a service that prepares "documents"
> > for SOLR.
> >
> > A second micro service (a consumer) requests these messages, does any
> final
> > processing, and fires them into SOLR.
> >
> > The whole thing is (in part) designed to be used for disaster recovery -
> > allowing the rebuild of the SOLR index in the shortest possible time.
> >
> > To do this (and to be able to use it for re-indexing SOLR while testing
> > relevancy) I need to be able to "play all messages from the beginning" at
> > will.
> >
> > I find I can use the zkCli.sh tool to delete the Consumer Group Name like
> > this:
> >  rmr /kafka/consumers/myGroupName
> >
> > After which my microservice will get all the messages again when it runs.
> >
> > I was trying to find a way to do this programmatically without actually
> > using the "low level" consumer api since the high level one is so simple
> > and my code already works.  So I started playing with Zookeeper api for
> > duplicating "rmr /kafka/consumers/myGroupName"
> >
> > *The Question: What does that offset actually represent?*
> >
> > It was at this point that I discovered the offset must represent
> something
> > other than what I thought it would.  Things obviously work, but I'm
> > wondering what - exactly do the offsets represent?
> >
> > To clarify - if I run this command on a zookeeper node, after the
> > microservice has run:
> >  get /kafka/consumers/myGroupName/offsets/myTopicName/0
> >
> > I get the following:
> >
> > 30024
> > cZxid = 0x360355
> > ctime = Fri Feb 12 07:27:50 MST 2016
> > mZxid = 0x360357
> > mtime = Fri Feb 12 07:29:50 MST 2016
> > pZxid = 0x360355
> > cversion = 0
> > dataVersion = 2
> > aclVersion = 0
> > ephemeralOwner = 0x0
> > dataLength = 5
> > numChildren = 0
> >
> > Now - I have exactly 3500 messages in this Kafka topic.  I verify that by
> > running this command:
> >  bin/kafka-console-consumer.sh --zookeeper 192.168.56.5:2181/kafka
> > --topic myTopicName --from-beginning
> >
> > When I hit Ctrl-C, it tells me it consumed 3500 messages.
> >
> > So - what does that 30024 actually represent?  If I reset that number to
> 1
> > or 0 and re-run my consumer microservice, I get all the messages again -
> > and the number again goes to 30024.  However, I'm not comfortable to
> trust
> > that because my assumption that the number represents a simple count of
> > messages that have been sent to this consumer is obviously wrong.
> >
> > (I reset the number like this -- to 1 -- and assume there's an API
> command
> > that will do it too.)
> >  set /kafka/consumers/myGroupName/offsets/myTopicName/0 1
> >
> > Can someone help me clarify or point me at a doc that explains what is
> > getting counted here?  You can shoot me if you like for attempting the
> > hack-i

Re: Resetting Kafka Offsets -- and What are offsets.... exactly?

2016-02-17 Thread John Bickerstaff
Hmmm...  more info.

So, inside /var/log/kafka-logs/myTopicName-0 I find two files

00026524.index  00026524.log

Interestingly, they both bear the number of the "lowest" offset returned by
the command I mention above.

If I "cat" the 000.26524.log file, I get all my messages on the
commandline as if I'd issued the --from-beginning command

I'm not sure what the index has, it's unreadable by the simple tools I've
tried

I'm still scratching my head a bit - as the link you sent for Kafka
introduction says this:

The messages in the partitions are each assigned a sequential id number
called the *offset* that uniquely identifies each message within the
partition.
I see how that could be exactly what you said (the previous message(s) byte
count) -- but the picture implies that it's a linear progression - 1,2,3
etc...  (and that could be an oversimplification for purposes of the
introduction - I get that...)

Feel free to comment or not - I'm going to keep digging into it as best I
can - any clarifications will be gratefully accepted...



On Wed, Feb 17, 2016 at 1:50 PM, John Bickerstaff 
wrote:

> Thank you Christian -- I appreciate your taking the time to help me out on
> this.
>
> Here's what I found while continuing to dig into this.
>
> If I take 30024 and subtract the number of messages I know I have in Kafka
> (3500) I get 26524.
>
> If I reset thus:  set /kafka/consumers/myGroupName/offsets/myTopicName/0
> 26524
>
> ... and then re-run my consumer - I get all 3500 messages again.
>
> If I do this: set /kafka/consumers/myGroupName/offsets/myTopicName/0 26624
>
> In other words, I increase the offset number by 100 -- then I get exactly
> 3400 messages on my consumer --  exactly 100 less than before which I think
> makes sense, since I started the offset 100 higher...
>
> This seems to suggest that each number between 26624 and 30024 in the log
> represents one of my 3500 messages on this topic, but what you say suggests
> that they represent byte count of the actual messages and not "one number
> per message"...
>
> I also find that if I issue this command:
>
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic=myTopicName
> --broker-list=192.168.56.3:9092  --time=-2
>
> I get back that same number -- 26524...
>
> H  A little confused still...  These messages are literally stored
> in the Kafka logs, yes?  I think I'll go digging in there and see...
>
> Thanks again!
>
>
>
>
>
> On Wed, Feb 17, 2016 at 12:38 PM, Christian Posta <
> christian.po...@gmail.com> wrote:
>
>> The number is the log-ordered number of bytes. So really, the offset is
>> kinda like the "number of bytes" to begin reading from. 0 means read the
>> log from the beginning. The second message is 0 + size of message. So the
>> message "ids" are really just the offset of the previous message sizes.
>>
>> For example, if I have three messages of 10 bytes each, and set the
>> consumer offset to 0, i'll read everything. If you set the offset to 10,
>> I'll read the second and third messages, and so on.
>>
>> see more here:
>>
>> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
>> and here: http://kafka.apache.org/documentation.html#introduction
>>
>> HTH!
>>
>> On Wed, Feb 17, 2016 at 12:16 PM, John Bickerstaff <
>> j...@johnbickerstaff.com
>> > wrote:
>>
>> > *Use Case: Disaster Recovery & Re-indexing SOLR*
>> >
>> > I'm using Kafka to hold messages from a service that prepares
>> "documents"
>> > for SOLR.
>> >
>> > A second micro service (a consumer) requests these messages, does any
>> final
>> > processing, and fires them into SOLR.
>> >
>> > The whole thing is (in part) designed to be used for disaster recovery -
>> > allowing the rebuild of the SOLR index in the shortest possible time.
>> >
>> > To do this (and to be able to use it for re-indexing SOLR while testing
>> > relevancy) I need to be able to "play all messages from the beginning"
>> at
>> > will.
>> >
>> > I find I can use the zkCli.sh tool to delete the Consumer Group Name
>> like
>> > this:
>> >  rmr /kafka/consumers/myGroupName
>> >
>> > After which my microservice will get all the messages again when it
>> runs.
>> >
>> > I was trying to find a way to do this programmatically without actually
>> > using the "low level" consumer api since the high level one is so simple
>> > and my code already works.  So I started playing with Zookeeper api for
>> > duplicating "rmr /kafka/consumers/myGroupName"
>> >
>> > *The Question: What does that offset actually represent?*
>> >
>> > It was at this point that I discovered the offset must represent
>> something
>> > other than what I thought it would.  Things obviously work, but I'm
>> > wondering what - exactly do the offsets represent?
>> >
>> > To clarify - if I run this command on a zookeeper node, after the
>> > microservice has run:
>> >  get /kafka/consumers/myGroupName/offsets/myTopicName/0
>> >
>> > I get the following:
>>

Re: Kafka response ordering guarantees

2016-02-17 Thread Ben Stopford
So long as you set max.inflight.requests.per.connection = 1 Kafka should 
provide strong ordering within a partition (so use the same key for messages 
that should retain their order). There is a bug currently raised agaisnt this 
feature though where there is an edge case that can cause ordering issues. 

https://issues.apache.org/jira/browse/KAFKA-3197  
> On 17 Feb 2016, at 07:17, Ivan Dyachkov  wrote:
> 
> Hello all.
> 
> I'm developing a kafka client and have a question about kafka server 
> guarantees.
> 
> A statement from 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Network
>  makes me a bit confused:
> 
> "The server guarantees that on a single TCP connection, requests will be 
> processed in the order they are sent and responses will return in that order 
> as well. The broker's request processing allows only a single in-flight 
> request per connection in order to guarantee this ordering. Note that clients 
> can (and ideally should) use non-blocking IO to implement request pipelining 
> and achieve higher throughput. i.e., clients can send requests even while 
> awaiting responses for preceding requests since the outstanding requests will 
> be buffered in the underlying OS socket buffer. All requests are initiated by 
> the client, and result in a corresponding response message from the server 
> except where noted."
> 
> Does this mean that when a client is sending more than one in-flight request 
> per connection, the server does not guarantee that responses will be sent in 
> the same order as requests?
> 
> In other words, if I have a strictly monotonically increasing integer as a 
> correlation id for all requests, can I rely on Kafka that correlation id in 
> responses will also have this property?
> 
> Thanks.
> 
> /Ivan



Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-17 Thread yuanjia8...@163.com
Hi Jerry,
1. Make sure that 1000 messages have been sent to kafka, before consuming.
2. If you don't care the sequence between messages, you can use mutiple 
partition and use more comsumers.



LiYuanJia
 
From: Jerry Wong
Date: 2016-02-17 05:33
To: users
Subject: Optimize the performance of inserting data to Cassandra with Kafka and 
Spark Streaming
Hello Everybody,
 
I have questions using Spark streaming to consume data from Kafka and
insert to Cassandra database but not sure whether should post in the Kafka
users mailing list or not. I appreciated it if you do have any suggestions
to me.
 
5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
Hadoop, Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18
 
3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6
 
Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10
 
There are two columns in the cassandra table
keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
 
Here is a piece of codes,
 
@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
StorageLevel.MEMORY_AND_DISK_SER)
.map(_._2.toString).map(Tuple1(_))
.map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)
 
unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))
 
I created a producer and send messages to Brokers (1000 messages/per time)
 
But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message)
can't be consumed?
How do I configure and tune the parameters in order to improve the
throughput of consumers?
 
Thank you very much for your reading and suggestions in advances.
 
Jerry Wong


Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-17 Thread Jerry Wong
Hi LiYuan,

Thank you very much for your response.
This problem was solved. Actually, a lots of messages are created almost in
the same time (even use milliseconds). I changed the key with using
"UUID.randomUUID()" with which all messages can be inserted in the
Cassandra table without time lag.

Regards,
Jerry Wong


On Wed, Feb 17, 2016 at 9:43 PM, yuanjia8...@163.com 
wrote:

> Hi Jerry,
> 1. Make sure that 1000 messages have been sent to kafka, before
> consuming.
> 2. If you don't care the sequence between messages, you can use
> mutiple partition and use more comsumers.
>
>
>
> LiYuanJia
>
> From: Jerry Wong
> Date: 2016-02-17 05:33
> To: users
> Subject: Optimize the performance of inserting data to Cassandra with
> Kafka and Spark Streaming
> Hello Everybody,
>
> I have questions using Spark streaming to consume data from Kafka and
> insert to Cassandra database but not sure whether should post in the Kafka
> users mailing list or not. I appreciated it if you do have any suggestions
> to me.
>
> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
> Hadoop, Cassandra
> Scala: 2.10.5
> Spark: 1.2.2
> Hadoop: 1.2.1
> Cassandra 2.0.18
>
> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
> Kafka: 0.8.2.1
> Zookeeper: 3.4.6
>
> Other configurations:
> batchInterval = 6 Seconds
> blockInterval = 1500 millis
> spark.locality.wait = 500 millis
> #Consumers = 10
>
> There are two columns in the cassandra table
> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>
> Here is a piece of codes,
>
> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
> StorageLevel.MEMORY_AND_DISK_SER)
> .map(_._2.toString).map(Tuple1(_))
> .map{case(log) => (System.currentTimeMillis(), log)}
> }
> @transient val unifiedMessage = ssc.union(kstreams)
>
> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
> SomeColumns("createdtime", "log"))
>
> I created a producer and send messages to Brokers (1000 messages/per time)
>
> But the Cassandra can only be inserted about 100 messages in each round of
> test.
> Can anybody give me advices why the other messages (about 900 message)
> can't be consumed?
> How do I configure and tune the parameters in order to improve the
> throughput of consumers?
>
> Thank you very much for your reading and suggestions in advances.
>
> Jerry Wong
>