Re: Kafka Monitoring

2017-06-20 Thread David Garcia
If you’re using confluent, you can use the control center.  It’s not free  
however.

From: Muhammad Arshad 
Reply-To: "users@kafka.apache.org" 
Date: Monday, June 19, 2017 at 5:52 PM
To: "users@kafka.apache.org" 
Subject: Kafka Monitoring

Hi,
wanted to see if there is Kafka monitoring which is available. I am looking to 
the following:

how much data came in at a certain time.

Thanks,
Muhammad Faisal Arshad
Manager, Enterprise Data Quality
Data Services & Architecture
[ttp://www.multichannel.com/sites/default/files/public/styles/blog_content/public/Altice-NewLogo2017_RESI]




The information transmitted in this email and any of its attachments is 
intended only for the person or entity to which it is addressed and may contain 
information concerning Altice USA and/or its affiliates and subsidiaries that 
is proprietary, privileged, confidential and/or subject to copyright. Any 
review, retransmission, dissemination or other use of, or taking of any action 
in reliance upon, this information by persons or entities other than the 
intended recipient(s) is prohibited and may be unlawful. If you received this 
in error, please contact the sender immediately and delete and destroy the 
communication and all of the attachments you have received and all copies 
thereof.



Re: Hello, Help!

2017-07-07 Thread David Garcia
“…events so timely that the bearing upon of which is not immediately apparent 
and are hidden from cognitive regard; the same so tardy, they herald apropos”

On 7/7/17, 12:06 PM, "Marcelo Vinicius"  wrote:

Hello, my name is Marcelo, and I am from Brazil. I'm doing a search on
Kafka. I would like to know if this phrase: "Somehow I struggled against
sensations that contained pure abstraction and no gesture directed at the
present world", is it really kafka? If so, where do I find his phrase? In
what text from kafka?
Thank you!

-- 
*Marcelo Vinicius*
Universidade Estadual de Feira de Santana - UEFS
Facebook: www.facebook.com/marcelovinicius02

"Não há poema em si, mas em mim ou em ti"




Re: Kafka Connect Embedded API

2017-07-12 Thread David Garcia
I would just look at an example:
 https://github.com/confluentinc/kafka-connect-jdbc
https://github.com/confluentinc/kafka-connect-hdfs





On 7/12/17, 8:27 AM, "Debasish Ghosh"  wrote:

Hi -

I would like to use the embedded API of Kafka Connect as per
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767.
But cannot find enough details regarding the APIs and implementation
models. Is there any sample example that gives enough details about
embedded Kafka Connect APIs ?

My use case is as follows :-

I have a Kafka Streams app from which I plan to use a HDFS sink connector
to write to HDFS. And plan to use embedded Kafka Connect API.

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




Re: DAG processing in Kafka Streams

2017-07-17 Thread David Garcia
I think he means something like Akka Streams: 
http://doc.akka.io/docs/akka/2.5.2/java/stream/stream-graphs.html

Directed Acyclic Graphs are trivial to construct in Akka Streams and use 
back-pressure to preclude memory issues.

-David

On 7/17/17, 12:20 PM, "Guozhang Wang"  wrote:

Sameer,

Could you elaborate a bit more what do you mean by "DAG processing"?


Guozhang


On Sun, Jul 16, 2017 at 11:58 PM, Sameer Kumar 
wrote:

> Currently, we don't have DAG processing in Kafka Streams. Having a DAG has
> its own share of advantages in that, it can optimize code on its own and
> come up with a optimized execution plan.
>
> Are we exploring in this direction, do we have this in our current 
roadmap.
>
>  -Sameer.
>



-- 
-- Guozhang




Re: DAG processing in Kafka Streams

2017-07-17 Thread David Garcia
On that note, akka streams has Kafka integration.  We use it heavily and it is 
quite a bit more flexible than K-Streams (which we also use…but for simpler 
applications)  Akka-streams-Kafka is particularly good for asynchronous 
processing: http://doc.akka.io/docs/akka-stream-kafka/current/home.html

-David

On 7/17/17, 12:35 PM, "David Garcia"  wrote:

I think he means something like Akka Streams: 
http://doc.akka.io/docs/akka/2.5.2/java/stream/stream-graphs.html

Directed Acyclic Graphs are trivial to construct in Akka Streams and use 
back-pressure to preclude memory issues.

-David

On 7/17/17, 12:20 PM, "Guozhang Wang"  wrote:

Sameer,

Could you elaborate a bit more what do you mean by "DAG processing"?


Guozhang


On Sun, Jul 16, 2017 at 11:58 PM, Sameer Kumar 
wrote:

> Currently, we don't have DAG processing in Kafka Streams. Having a 
DAG has
> its own share of advantages in that, it can optimize code on its own 
and
> come up with a optimized execution plan.
>
> Are we exploring in this direction, do we have this in our current 
roadmap.
>
>  -Sameer.
>



-- 
-- Guozhang






Re: Shooting for microsecond latency between a Kafka producer and a Kafka consumer

2017-08-07 Thread David Garcia
You are not going to get that kind of latency (i.e. less than 100 
microseconds).  In my experience, consumer->producer latency averages around: 
20 milliseconds (cluster is in AWS with enhanced networking).

On 8/3/17, 2:32 PM, "Chao Wang"  wrote:

Hi,

I observed that it took 2-6 milliseconds for a topic to be received by a 
Kafka consumer from a Kafka producer, and I wonder what I might be 
missing or I was wrong in configuring Kafka for low latency (targeting 
at < 100 microseconds). I did the following:

1. On the broker, I tried to prevent frequent flush of data to disk 
(log.flush.interval.messages=10)

2. On the producer, I tried to reduce the delay by setting batch.size=0, 
linger.ms=0, acks =0, and I invoked flush() right after send()

3. On the consumer, I set poll(0) (i.e., fetch every data once its 
available?)

I got similar observation (millisecond latency) in varying value size 
from 1 to 512B, and also similar results when either colocating 
producer/consumer or putting them on separate PCs (connecting by a 
switch). As a verification, I implemented simple C/C++ sockets for 
transmission and observed latencies no more than 100 microseconds.

Thanks,

Chao





Re: Time based data retrieval from Kafka Topic

2017-09-05 Thread David Garcia
Make a topic and then set the retention to 1 hour.  Every 15 minutes, start a 
consumer that always reads from the beginning.

-David

On 9/5/17, 9:28 AM, "Tauzell, Dave"  wrote:

What are you going to do with the messages every 15 minutes?

One way I can think of is to have two consumers in your application.  One 
of them reads messages and just keeps track of the offsets for each hour.  The 
other consumer then uses this info to pull the data.  You could publish these 
offsets to a topic keyed on something like -mm-dd-hh24 or keep them in 
memory ( if running in the same application ).   You would need some one-time 
process to create the offsets for the first time.

-Dave

-Original Message-
From: Kaustuv Bhattacharya [mailto:kaustuvl...@gmail.com]
Sent: Sunday, September 3, 2017 1:08 PM
To: users@kafka.apache.org
Subject: Time based data retrieval from Kafka Topic

*Hi All,*

*I need a solution to the below Problem statement -*

*How to retrieve only last 1 hour data from an existing Kafka Topic, on 1st 
& every consecutive (at every 15 mins interval) of the client application?*
*Note:- The existing Topic is accumulating data since last 6 months.*

*Regards,*
*Kaustuv Bhattacharya*
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.




Re: Identifying Number of Kafka Consumers

2017-09-12 Thread David Garcia
Consumers can be split up based on partitions.  So, you can tell a consumer 
group to listen to several topics and it will divvy up the work.  Your use case 
sounds very canonical.  I would take a look at Kafka connect (if you’re using 
the confluent stack).

-Daivd

http://docs.confluent.io/current/connect/connect-hdfs/docs/index.html

On 9/11/17, 4:48 PM, "Nishanth S"  wrote:

All,
I am very new to kafka . We have a  case where we need to ingest multiple
avro record types . These avro record types vary vastly in volume  and size
and I am thinking of sending  each of these message types to a different
topic and creating partitions based on volume and through put needed.  What
the  kafka consumer has to do is read the record of from  partitions and
 write to  different hdfs locations based on record type  . I am guessing
we should at least start with one consumer per  topic  . Is this
understanding correct or is there a better way to look at it?




Re: To Know about Kafka and it's usage

2017-09-28 Thread David Garcia
If you’re on the AWS bandwagon, you can use Kinesis-Analytics 
(https://aws.amazon.com/kinesis/analytics/).  It’s very easy to use.  Kafka is 
a bit more flexible, but you have to instrument maintain it.  You may also want 
to look at Druid: http://druid.io/  There are some dashboards that you can 
attach to it.  I personally preferred this one: 
https://docs.imply.io/pivot/index  

On 9/27/17, 7:30 PM, "Parth Patel"  wrote:

Hi,
This is to know more about Kafka and how i can use it in my project. I am
trying to learn about Big Data Engineering and came across Kafka. I am
trying to develop an application which could take some real time data,
filter it and show some visual outputs and would like to know where Kafka
could fit and what other technologies should i use.

I was thinking about using Spark, Python, Google Cloud. Any help regarding
this would be great.

Thanks,
Parth Patel




Re: Using Kafka on DC/OS + Marathon

2017-10-02 Thread David Garcia
I’m not sure how your requirements of Kafka are related to your requirements 
for marathon.  Kafka is a streaming-log system and marathon is a scheduler.  
Mesos, as your resource manager, simply “manages” resources.  Are you asking 
about multitenancy?  If so, I highly recommend that you separate your Kafka 
cluster (and zookeeper) from your other services.  Kafka leverages the OS page 
cache to optimize read performance and it seems likely this would interfere 
with Mesos resource management policy.

-David 

On 10/2/17, 6:39 AM, "Valentin Forst"  wrote:

Hi there,

Working in a huge compony we are about to install Kafka on DC/OS (Mesos) 
and intend to use Marathon as a Scheduler. Since I am new to DC/OS and 
Marathon, I was wondering if this is a recommended way of using Kafka in the 
production environment.

My doubts are:
- Kafka manages Broker rebalancing (e.g. Failover, etc.) using its own 
semantic. Can I trust Marathon that it will match the requirements here?
- Since our Container Platform - DC/OS is going to be used by other „micro 
services“ - soon or later this is going to raise a performance issue. Should we 
better use a dedicated DC/OS instance for our Kafka-Cluster? Or Kafka-Cluster 
on its own?
- Is there something else we should consider important if using Kafka on 
DC/OS + Marathon?


Thanks in advance for your time.
Valentin





Re: Kafka Monitoring..

2017-11-09 Thread David Garcia
JMX was pretty easy to setup for us.  Look up the various jmx beans locally for 
your particular version of kafka (i.e. with jconsole..etc)

https://github.com/jmxtrans/jmxtrans

On 11/8/17, 7:10 PM, "chidigam ."  wrote:

Hi All,
What is the simplest way of monitoring the metrics in kaka brokers?
Is there any opensource available?
Any help in this regards is appreciated.

Regards
Bhanu




Re: Kafka Monitoring..

2017-11-09 Thread David Garcia
Yeah…jmx is really the cheap/easy way to monitor kafka.  You should also 
monitor OS metrics like pageScanD and pageScanK.  Those were very helpful for 
us.

-David

On 11/9/17, 11:40 AM, "Andrew Otto"  wrote:

We’ve recently started using Prometheus, and use Prometheus JMX Exporter
<https://github.com/prometheus/jmx_exporter> to get Kafka metrics into
prometheus.  Here’s our JMX Exporter config:

https://github.com/wikimedia/puppet/blob/production/modules/profile/files/kafka/broker_prometheus_jmx_exporter.yaml



On Thu, Nov 9, 2017 at 11:42 AM, David Garcia  wrote:

> JMX was pretty easy to setup for us.  Look up the various jmx beans
> locally for your particular version of kafka (i.e. with jconsole..etc)
>
> https://github.com/jmxtrans/jmxtrans
>
> On 11/8/17, 7:10 PM, "chidigam ."  wrote:
>
> Hi All,
> What is the simplest way of monitoring the metrics in kaka brokers?
> Is there any opensource available?
> Any help in this regards is appreciated.
>
> Regards
> Bhanu
>
>
>




Null Output topic for Kafka Streams

2016-07-18 Thread David Garcia
I would like to process messages from an input topic, but I don’t want to send 
messages downstream…with KStreams.  (i.e. I would like to read from a topic, do 
some processing including occasional database updates, and that’s it…no output 
to a topic).  I could fake this by filtering out all my messages, and specify a 
fake topic…but I was wondering if there is a less hacky way to do this.  Thx!

-David J. Garcia


Kafka Streams Dynamic Topic consumer

2016-07-18 Thread David Garcia
Is there any way to specify a dynamic topic list (e.g. like a regex whitelist 
filter…like in the consumer API) with kafka streams?  We would like to get the 
benefit of automatic checkpointing and load balancing if possible.

-David


Re: Kafka Consumer stops consuming from a topic

2016-07-19 Thread David Garcia
Is it possible that your app is thrashing (i.e. FullGC’ing too much and not 
processing messages)?

-David

On 7/19/16, 9:16 AM, "Abhinav Solan"  wrote:

Hi Everyone, can anyone help me on this

Thanks,
Abhinav

On Mon, Jul 18, 2016, 6:19 PM Abhinav Solan  wrote:

> Hi Everyone,
>
> Here are my settings
> Using Kafka 0.9.0.1, 1 instance (as we are testing things on a staging
> environment)
> Subscribing to 4 topics from a single Consumer application with 4 threads
>
> Now the server keeps on working fine for a while, then after about 3-4 hrs
> or so, it stops consuming at all.
> I started my own Consumer Instance and one Kafka Console Consumer, I can
> see messages coming in the console consumer but not in my Consumer 
instance.
>
> There are some messages which are coming through but not all messages,
> then after a while I restarted the Consumer instance, then again nothing
> coming through .. then I restarted the Kafka Server and then I could see
> all the messages coming through.
>
> Has anyone seen this kind of problem ?
> Is it because I am running only single broker ?
>
> Here are the properties I am setting for the Consumer -
> fetch.min.bytes=1
> max.partition.fetch.bytes=8192
> heartbeat.interval.ms=1
>
> I have written up my Consumer using
> http://docs.confluent.io/2.0.1/clients/consumer.html
>
> Thanks,
> Abhinav
>
>




Re: Kafka Consumer stops consuming from a topic

2016-07-19 Thread David Garcia
Ah ok.  Another dumb question: what about acks?  Are you using auto-ack? 

On 7/19/16, 10:00 AM, "Abhinav Solan"  wrote:

If I add 2 more nodes and make it a cluster .. would that help ? Have
searched forums and all this kind of thing is not there ... If we have a
cluster then might be Kafka Server has a backup option and it self heals
from this behavior ... Just a theory

On Tue, Jul 19, 2016, 7:57 AM Abhinav Solan  wrote:

> No, was monitoring the app at that time .. it was just sitting idle
>
> On Tue, Jul 19, 2016, 7:32 AM David Garcia  wrote:
>
>> Is it possible that your app is thrashing (i.e. FullGC’ing too much and
>> not processing messages)?
>>
>> -David
>>
>> On 7/19/16, 9:16 AM, "Abhinav Solan"  wrote:
>>
>> Hi Everyone, can anyone help me on this
>>
>> Thanks,
>> Abhinav
>>
>> On Mon, Jul 18, 2016, 6:19 PM Abhinav Solan 
>> wrote:
>>
>> > Hi Everyone,
>> >
>> > Here are my settings
>> > Using Kafka 0.9.0.1, 1 instance (as we are testing things on a
>> staging
>> > environment)
>> > Subscribing to 4 topics from a single Consumer application with 4
>> threads
>> >
>> > Now the server keeps on working fine for a while, then after about
>> 3-4 hrs
>> > or so, it stops consuming at all.
>> > I started my own Consumer Instance and one Kafka Console Consumer,
>> I can
>> > see messages coming in the console consumer but not in my Consumer
>> instance.
>> >
>> > There are some messages which are coming through but not all
>> messages,
>> > then after a while I restarted the Consumer instance, then again
>> nothing
>> > coming through .. then I restarted the Kafka Server and then I
>> could see
>> > all the messages coming through.
>> >
>> > Has anyone seen this kind of problem ?
>> > Is it because I am running only single broker ?
>> >
>> > Here are the properties I am setting for the Consumer -
>> > fetch.min.bytes=1
>> > max.partition.fetch.bytes=8192
>> > heartbeat.interval.ms=1
>> >
>> > I have written up my Consumer using
>> > http://docs.confluent.io/2.0.1/clients/consumer.html
>> >
>> > Thanks,
>> > Abhinav
>> >
>> >
>>
>>
>>




release of 0.10.1

2016-07-20 Thread David Garcia
Does anyone know when this release will be cut?

-David


Re: Kafka Streams Latency

2016-07-22 Thread David Garcia
You should probably just put reporting in your app.  Dropwizard, logs…etc.  You 
can also look at Kafka JMX consumer metrics (assuming you don’t have too many 
consumers).

-David

On 7/22/16, 9:13 AM, "Adrienne Kole"  wrote:

Hi,

 How can I measure the latency and throughput in Kafka Streams?

Cheers
Adrienne




Re: release of 0.10.1

2016-07-24 Thread David Garcia
We basically need the regex(java-util regex) support for specifying source 
topics.


On 7/23/16, 7:41 PM, "Ewen Cheslack-Postava"  wrote:

0.10.1.0 is considered a major release. The release 0.10.0.0 might have a
follow up 0.10.0.1 for critical bug fixes, but 0.10.1.0 is a "minor"
release. Kafka is a bit odd in that its "major" releases are labeled as a
normal "minor" release number because Kafka hasn't decided to make an
official 1.0 release yet.

What features/fixes are you looking for?

-Ewen
    
On Wed, Jul 20, 2016 at 7:23 AM, David Garcia  wrote:

> Does anyone know when this release will be cut?
>
> -David
>



-- 
Thanks,
Ewen




Streaming Application Design with Database Lookups

2016-07-26 Thread David Garcia
Hello, we are working on designs for several streaming applications and a 
common consideration is the need for occasional external database 
updates/lookups.  For example…we would be processing a stream of events with 
some kind of local-id, and we occasionally need to resolve the local-id to a 
global-id using an external service (e.g. such as a database).

A simple approach is the following:


1.) Quantify the expected throughput of a topic/s

2.) Partition the topic/s so that each task isn’t “overwhelmed”

3.) Combine blocking database-calls and an in-memory cache for external 
storage lookup/updates

4.) If the system isn’t performing fast enough, simply add more partitions 
and tasks to the application.

Obviously we are assuming that the external database can handle the rate of 
transactions.

Another approach is to process the messages asynchronously.  That is, database 
callbacks are attached to something like Futures and the streaming threads 
aren’t interrupted.  Assuming there isn’t shared data between threads, this 
seems exactly like the first approach.  If we have a thread pool with ‘N’ 
number of threads, our application will never go faster than c/N where ‘c’ is 
the average latency for a (Database+cache) lookup.  This is equivalent to 
making N partitions and starting N tasks.  However, this approach does have the 
advantage of “decoupling” the threads from the partitions (i.e. nothing 
precludes us from having more database threads than partitions).  The 
application, however, becomes much more complicated.


Any other approaches?

I would appreciate any design suggestions anyone has.  Thx!
-David


Re: Kafka Streams multi-node

2016-07-26 Thread David Garcia
http://docs.confluent.io/3.0.0/streams/architecture.html#parallelism-model

you shouldn’t have to do anything.  Simply starting a new thread will 
“rebalance” your streaming job.  The job coordinates with tasks through kafka 
itself.



On 7/26/16, 12:42 PM, "Davood Rafiei"  wrote:

Hi,

I am newbie in Kafka and Kafka-Streams. I read documentation to get
information how it works in multi-node environment. As a result I want to
run streams library on cluster that consists of more than one node.
From what I understood, I try to resolve the following conflicts:
- Streams is a standalone java application.So it runs in a single node, of
n-node cluster of kafka.
- However, streams runs on top of kafka, and if we set a multi-broker kafka
cluster, and then run streams library from master node, then streams
library will run in entire cluster.

So, streams library is standalone java application but to force it to run
in multiple nodes, do we need to do something extra (in configuration for
example) if we have already kafka running in multi-broker mode?


Thanks
Davood




Re: Problems with replication and performance

2016-07-27 Thread David Garcia
Sounds like you might want to go the partition route: 
http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

If you lose a broker (and you went the topic route), the probability that an 
arbitrary topic was on the broker is higher than if you had gone the partition 
route.  In either case the number of partitions on each broker should be about 
the same…so you will have the same draw backs described in this article 
regardless of what you do.

-David

On 7/27/16, 4:51 PM, "Krzysztof Nawara"  wrote:

Hi!

I've been testing Kafka. I've hit some problems, but I can't really 
understand what's going on, so I'd like to ask for your help.
Situation - we want to decide whether to go for many topics/a couple of 
partitions or the other way around, so I'be trying to benchmark both cases. 
During tests when I overload the cluster, number of under-replicated partitions 
spikes up. I'd expect it to go back down to 0 after the load lessens, but 
that's not always the case - either it never catches up, or it takes 
significantly longer than it takes other brokers. Currently, I run benchmarks 
against 3-node cluster and sometimes one of the brokers can't seem to be able 
to catch up with replication. There are 3 cases here that I experienced:

1. Seeing this in logs. It doesn't seem to be correlated with any problems 
with network infrastructure and once it appears.
[2016-07-27 20:34:09,237] WARN [ReplicaFetcherThread-0-1511], Error in 
fetch kafka.server.ReplicaFetcherThread$FetchRequest@25e2a1ac 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1511 was disconnected before the 
response was read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

2. During other test, instead of the above message, I sometimes see this:
[2016-07-26 15:26:30,334] INFO Partition [1806,0] on broker 1511: Expanding 
ISR for partition [1806,0] from 1511 to 1511,1509 (kafka.cluster.Partition)
[2016-07-26 15:26:30,344] INFO Partition [1806,0] on broker 1511: Cached 
zkVersion [1] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
At the same time broker can't catch up with replication.

I'm using version 0.10.0.0 on SCL6, running on 3 32core/64GB/8x7200RPM 
spindle blades. I don't know, if it's relevant, but I basically test two 
scenarios: 1 topic, 4k partitions and 4k topics, 1 partition each (in this 
scenario I just set auto.create.topics.enable=true and create topics during 
warm up by simply sending messages to them). For some reason the second 
scenario seems to be orders of magnitude slower - after I started looking at 
JMX metrics of the producer, it revealed huge difference in average number of 
messages per request. With 1 topic it oscilated around 100 records/request (5KB 
records), in 4k topics scenario it was just 1 record/request. Can you think of 
any explanation for that?

Code I use for testing:
https://github.com/BlueEyedHush/kafka_perf/tree/itrac

Thank you,
Krzysztof Nawara



Reactive Kafka performance

2016-07-28 Thread David Garcia
Our team is evaluating KStreams and Reactive Kafka (version 0.11-M4)  on a 
confluent 3.0 cluster.  Our testing is very simple (pulling from one topic, 
doing a simple transform) and then writing out to another topic.

The performance for the two systems is night and day. Both applications were 
running on a laptop and connecting to kafka over a wifi network.  Here are the 
numbers:

KStreams: ~14K messages per second
Reactive Kafka: ~110 messages per second

Both the input, and output topic had 54 partitions.  I’m fairly certain I’m not 
using Reactive kafka with good configuration.  Here is some stubbed out code: 
https://gist.github.com/anduill/2e17cd7a40d4a86fefe19870d1270f5b

One note, I am using the confluent stack (hence the CachedSchemaRegistryClient)

I like the flexibility of Reactive Kafka, so we’d really like to use it…but if 
performance is going to be like this, I can’t really justify it.  I’m a 
scala/akka/streaming-akka newbie, so I’m sure there are better ways to use the 
API.  Any help is appreciated.

-David


Re: Cluster config

2016-07-28 Thread David Garcia
What is your replication for these topics?

On 7/28/16, 3:03 PM, "Kessiler Rodrigues"  wrote:

Hey guys, 

I have > 5k topics with 5 partitions each in my cluster today. 

My actual cluster configuration is:

6 brokers - 16 vCPUs, 14.4 GB

Nowadays, I’m having some lags when the consumer tries to do rebalance and 
sometimes even kafka doesn’t get up.

What would be a best cluster configuration for that?






Re: Kafka 0.9.0.1 failing on new leader election

2016-07-29 Thread David Garcia
Well, just a dumb question, but did you include all the brokers in your client 
connection properties?

On 7/29/16, 10:48 AM, "Sean Morris (semorris)"  wrote:

Anyone have any ideas?

From: semorris mailto:semor...@cisco.com>>
Date: Tuesday, July 26, 2016 at 9:40 AM
To: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Kafka 0.9.0.1 failing on new leader election

I have a setup with 2 brokers and it is going through leader re-election 
but seems to fail to complete. The behavior I start to see is that some 
published succeed but others will fail with NotLeader exceptions like this


java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)

at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)

at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)


My Kafka and zookeeper log file has errors like this


[2016-07-26 02:01:12,842] ERROR 
[kafka.controller.ControllerBrokerRequestBatch] Haven't been able to send 
metadata update requests, current state of the map is Map(2 -> Map(eox-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:46,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 notify-eportal-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 psirts-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 notify-pushNotif-low-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1)),
 1 -> Map(eox-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:46,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 notify-eportal-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 psirts-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1),
 notify-pushNotif-low-1 -> 
(LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:51,ControllerEpoch:34),ReplicationFactor:2),AllReplicas:2,1)))

[2016-07-26 02:01:12,845] ERROR [kafka.controller.KafkaController] 
[Controller 1]: Forcing the controller to resign


Which is then followed by a null pointer exception


[2016-07-26 02:01:13,021] ERROR [org.I0Itec.zkclient.ZkEventThread] Error 
handling event ZkEvent[Children of /isr_change_notification changed sent to 
kafka.controller.IsrChangeNotificationListener@55ca3750]

java.lang.IllegalStateException: java.lang.NullPointerException

at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:434)

at 
kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1029)

at 
kafka.controller.IsrChangeNotificationListener.kafka$controller$IsrChangeNotificationListener$$processUpdateNotifications(KafkaController.scala:1372)

at 
kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply$mcV$sp(KafkaController.scala:1359)

at 
kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352)

at 
kafka.controller.IsrChangeNotificationListener$$anonfun$handleChildChange$1.apply(KafkaController.scala:1352)

at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)

at 
kafka.controller.IsrChangeNotificationListener.handleChildChange(KafkaController.scala:1352)

at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)

at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

Caused by: java.lang.NullPointerException

at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:699)

at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:403)

at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:369)

at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)

at scala.collection.mutable.HashMap.fo

Re: Reactive Kafka performance

2016-07-30 Thread David Garcia
Hey Guozhang, our basic road block is asynchronous processing (this is actually 
related to my previous post about asynchronous processing).  Here is the 
simplest use-case:

The streaming job receives messages.  Each message is a user-event and needs to 
randomly look up that user’s history (for 100’s-of-thousands of users, and each 
user can have 10’s-of-thousands of events in their history).  Once the history 
is retrieved, processing can continue.  We need processing to be as fast as 
possible and we need the ability to easily accommodate increases in incoming 
message traffic.  Here are the two designs (with KStreams, and then with 
Reactive Kafka)

KStreams Approach:

KStreams depth-first approach requires finishing processing of one message 
before the next one becomes available.  So, we would have to first estimate the 
average input message rate (and measure the performance of our app) and then 
partition our topic/s appropriately.  Each task would effectively block on 
DB-io for every history-set retrieval; obviously we would use a TTL cache 
(KTable could be useful here, but it wouldn’t be able to hold “all” of the 
history for every user).  If we need to “scale” our application, we would add 
more partitions and application processing instances.  Please suggest any other 
design choice we could go with.  I’m might be missing something.

Reactive Kafka Approach:

Reactive Kafka allows out-of-order processing.  So, while we are fetching 
history for event-1, we can start processing event-2.  In a nutshell 
Reactive-Kafka parallelism is not tightly-coupled to the number of partitions 
in the topic (obviously this doesn’t apply to the input…we can only receive 
events as fast as current partition configuration allows…but we don’t’ have to 
block on io before we receive the next message)


I’m new to both technologies, so any and all suggestions are welcome.

-David

On 7/30/16, 9:24 AM, "Guozhang Wang"  wrote:

Hello David,

I'd love to learn details about the "flexibility" of Reactive Kafka
compared with KStreams in order to see if KStreams can improve on that end.
Would you like to elaborate a bit more on that end?

Guozhang


On Thu, Jul 28, 2016 at 12:16 PM, David Garcia 
wrote:

> Our team is evaluating KStreams and Reactive Kafka (version 0.11-M4)  on a
> confluent 3.0 cluster.  Our testing is very simple (pulling from one 
topic,
> doing a simple transform) and then writing out to another topic.
>
> The performance for the two systems is night and day. Both applications
> were running on a laptop and connecting to kafka over a wifi network.  
Here
> are the numbers:
>
> KStreams: ~14K messages per second
> Reactive Kafka: ~110 messages per second
>
> Both the input, and output topic had 54 partitions.  I’m fairly certain
> I’m not using Reactive kafka with good configuration.  Here is some 
stubbed
> out code: https://gist.github.com/anduill/2e17cd7a40d4a86fefe19870d1270f5b
>
> One note, I am using the confluent stack (hence the
> CachedSchemaRegistryClient)
>
> I like the flexibility of Reactive Kafka, so we’d really like to use
> it…but if performance is going to be like this, I can’t really justify it.
> I’m a scala/akka/streaming-akka newbie, so I’m sure there are better ways
> to use the API.  Any help is appreciated.
>
> -David
>



-- 
-- Guozhang




KTable and Rebalance Operations

2016-08-02 Thread David Garcia
Hello, I’ve googled around for this, but haven’t had any luck.  Based upon 
this: http://docs.confluent.io/3.0.0/streams/architecture.html#state  KTables 
are local to instances.  An instance will process one or more partitions from 
one or more topics.  How does Kstreams/Ktables handle the following situation?

A single application instance is processing 4 partitions from a topic.  The 
application is using a Ktable.  Each event triggers lookups in the KTable.  
Now, a new application instance is started.  This triggers a rebalancing of the 
partitions.  2 partitions originally processed by the first instance migrate to 
the new instance.  What happens with the KTable?  Is the entire table 
“migrated” also?  This would be nice because lookups (in the first instance) 
triggered by particular events should be identical to lookups (in the second 
instance) triggered by those same events.

-David


Re: Opening up Kafka JMX port for Kafka Consumer in Kafka Streams app

2016-08-02 Thread David Garcia
Have you looked at kafka manager: https://github.com/yahoo/kafka-manager
It provides consumer level metrics.

-David

On 8/2/16, 12:36 PM, "Phillip Mann"  wrote:

Hello all,

This is a bit of a convoluted title but we are trying to set up monitoring 
on our Kafka Cluster and Kafka Streams app.  I currently have JMX port open on 
our Kafka cluster across our brokers.  I am able to use a Java JMX client to 
get certain metrics that are helpful to us.  However, the data we want to 
monitor the most is consumer level JMX metrics.  This is made complicated 
because there is no documentation for what we are trying to do.  Essentially we 
need to expose the JMX port for the Kafka Streams Consumer.  Each Kafka Streams 
app contains a producer and consumer (group?).  We want to get the offset lag 
metrics for the Kafka Streams consumer.  Is this possible?  If so, how do we do 
it?  Thanks for the help!!

Phillip




Re: Multiple processors belongs to same GroupId needs to read same message from same topic

2016-08-16 Thread David Garcia
You could create another partition in topic T, and publish the same message to 
both partitions.  You would have to configure P2 to read from the other 
partition.  Or you could have P1 write the message to another topic and 
configure P2 to listen to that topic.

-David

On 8/16/16, 11:54 PM, "Deepak Pengoria"  wrote:

For your information, I am using Confluent-3.0.0 (having Streaming api-0.10)

On Wed, Aug 17, 2016 at 10:23 AM, Deepak Pengoria  wrote:

> Hi, I have a problem for which I am not able to find the solution. Below
> is the problem statement :
>
> I have two Kafka-Steaming api processors say P1 and P2, both want to read
> same message(s) from same topic say T. Topic T is having only one 
partition
> and contains some configuration information and this topic doesn't update
> frequently (update hardly once in a month). Currently if P1 read the
> message from topic T then that P2 will not be able to read this message.
>
> How can I achieve that both the processors could read same message? It is
> something on which I am stuck and need help.
>
> Regards,
> Deepak
>




Kafka Connect JDBC on Production DB

2016-08-19 Thread David Garcia
My team is considering using either Kafka-connect JDBC or Bottled water to 
stream DB-changes from several production postgres DB’s.  WRT bottled water, 
this is a little scary: 
https://github.com/confluentinc/bottledwater-pg/issues/96

But, the Kafka-connect option also seems like it could affect availability of 
the production db.  Does anyone have any experience with either of these 
solutions on a production DB?  If so, what are your thoughts?

-David


Re: Questions about Apache Kafka

2016-08-24 Thread David Garcia
Regarding 3 and 4: https://calcite.apache.org/docs/stream.html  (i.e. streaming 
SQL queries)


On 8/24/16, 6:29 AM, "Herwerth Karin (CI/OSW3)"  
wrote:

Dear Sir or Madam,

I'm a beginner in Apache Kafka and have questions which I don't get it from 
the documentation.


1.If you set a Time To Live to a topic and to a message, which Time To 
Live is prioritized?

2.Supports Apache Kafka a mechanism to stop the publisher if the 
subscriber is too slow?

3.Is there a possibility of a script engine? Does Kafka supports the 
implementation and execution of self-made scripts?

4.Is there a possibility to join topics or partitions?

5.Support Kafka such a following mechanism: A mechanism to control 
whether each write to the store will also call sync on the file system to 
ensure all data is written to the disk.



I hope I will get support.



Thanks in advance.


Mit freundlichen Grüßen / Best regards

Karin Herwerth

EAI Development (CI/OSW3)
Robert Bosch GmbH | Postfach 30 02 20 | 70442 Stuttgart | GERMANY | 
www.bosch.com


Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
Denner,
Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
Markus Heyn, Dr. Dirk Hoheisel,
Christoph Kübel, Uwe Raschke, Dr. Werner Struth, Peter Tyroller







KStreams regex-topic null pointer Exception with multiple Consumer processes

2016-09-03 Thread David Garcia
This bug may be fixed after commit: 6fb33afff976e467bfa8e0b29eb82770a2a3aaec

When you start two consumer processes with a regex topic (with 2 or more 
partitions for the matching topics), the second (i.e. nonleader) consumer will 
fail with a null pointer exception.

Exception in thread "StreamThread-4" java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:78)
at 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:139)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:208)

The issue may be in the TopologyBuilder line 832:
String[] topics = (sourceNodeFactory.pattern != null) ? 
sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : 
sourceNodeFactory.getTopics();

Because the 2nd consumer joins as a follower, “getUpdates” returns an empty 
collection and the regular expression doesn’t get applied to any topics.  
Perhaps this is fixed after?: 6fb33afff976e467bfa8e0b29eb82770a2a3aaec

-David





Re: KStreams regex-topic null pointer Exception with multiple Consumer processes

2016-09-06 Thread David Garcia
Done: https://issues.apache.org/jira/browse/KAFKA-4131

Thanks Guozhang for also looking at this.  BTW, do you have any idea when 
0.10.1 will be released?

-David


On 9/5/16, 12:14 AM, "Guozhang Wang"  wrote:

Hello David,

Thanks for reporting this issue, and after some look through the code I
think it is indeed a bug, and commit 
6fb33afff976e467bfa8e0b29eb82770a2a3aaec
will not fix it IMHO.

Would you want to create a JIRA for keeping track of this issue?

Guozhang


On Sat, Sep 3, 2016 at 10:16 AM, David Garcia  wrote:

> This bug may be fixed after commit: 6fb33afff976e467bfa8e0b29eb827
> 70a2a3aaec
>
> When you start two consumer processes with a regex topic (with 2 or more
> partitions for the matching topics), the second (i.e. nonleader) consumer
> will fail with a null pointer exception.
>
> Exception in thread "StreamThread-4" java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.
> RecordQueue.addRawRecords(RecordQueue.java:78)
> at org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:117)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.addRecords(StreamTask.java:139)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:299)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:208)
>
> The issue may be in the TopologyBuilder line 832:
> String[] topics = (sourceNodeFactory.pattern != null) ?
> sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
> sourceNodeFactory.getTopics();
>
> Because the 2nd consumer joins as a follower, “getUpdates” returns an
> empty collection and the regular expression doesn’t get applied to any
> topics.  Perhaps this is fixed after?: 6fb33afff976e467bfa8e0b29eb827
> 70a2a3aaec
>
> -David
>
>
>
>


-- 
-- Guozhang





Re: monitor page cache read ratio

2016-09-07 Thread David Garcia
Cachestat
https://www.datadoghq.com/blog/collecting-kafka-performance-metrics/


On 9/7/16, 8:31 AM, "Peter Sinoros Szabo"  
wrote:

Hi,

As I read more and more about kafka monitoring it seems that monitoring 
the linux page cache hit ration is important, but I do not really find a 
good solution to get that value.
Do you have a good practice how to get that value?

Regards,
Peter






Re: Partitioning at the edges

2016-09-07 Thread David Garcia
The “simplest” way to solve this is to “repartition” your data (i.e. the 
streams you wish to join) with the partition key you wish to join on.  This 
obviously introduces redundancy, but it will solve your problem.  For example.. 
suppose you want to join topic T1 and topic T2…but they aren’t partitioned on 
the key you need.  You could write two “simple” repartition jobs for each topic 
(you can actually do this with one job):

T1 -> Job_T1 -> T1’
T2 -> Job_T2 -> T2’

T1’ and T2’ would be partitioned on your join key and would have the same 
number of partitions so that you have the guarantees you need to do the join.  
(i.e. join T1’ and T2’).

-David


On 9/2/16, 8:43 PM, "Andy Chambers"  wrote:

Hey Folks,

We are having quite a bit trouble modelling the flow of data through a very
kafka centric system

As I understand it, every stream you might want to join with another must
be partitioned the same way. But often streams at the edges of a system
*cannot* be partitioned the same way because they don't have the partition
key yet (often the work for this process is to find the key in some lookup
table based on some other key we don't control).

We have come up with a few solutions but everything seems to add complexity
and backs our designs into a corner.

What is frustrating is that most of the data is not really that big but we
have a handful of topics we expect to require a lot of throughput.

Is this just unavoidable complexity asociated with scale or am I thinking
about this in the wrong way. We're going all in on the "turning the
database inside out" architecture but we end up spending more time thinking
about how stuff gets broken up into tasks and distributed than we are about
our business.

Do these problems seem familiar to anyone else?  Did you find any patterns
that helped keep the complexity down.

Cheers





Re: Partitioning at the edges

2016-09-07 Thread David Garcia
Obviously for the keys you don’t have, you would have to look them up…sorry, I 
kinda missed that part.  That is indeed a pain.  The job that looks those keys 
up would probably have to batch queries to the external system.  Maybe you 
could use kafka-connect-jdbc to stream in updates to that system?

-David


On 9/7/16, 3:41 PM, "David Garcia"  wrote:

The “simplest” way to solve this is to “repartition” your data (i.e. the 
streams you wish to join) with the partition key you wish to join on.  This 
obviously introduces redundancy, but it will solve your problem.  For example.. 
suppose you want to join topic T1 and topic T2…but they aren’t partitioned on 
the key you need.  You could write two “simple” repartition jobs for each topic 
(you can actually do this with one job):

T1 -> Job_T1 -> T1’
T2 -> Job_T2 -> T2’

T1’ and T2’ would be partitioned on your join key and would have the same 
number of partitions so that you have the guarantees you need to do the join.  
(i.e. join T1’ and T2’).

-David


On 9/2/16, 8:43 PM, "Andy Chambers"  wrote:

Hey Folks,

We are having quite a bit trouble modelling the flow of data through a 
very
kafka centric system

As I understand it, every stream you might want to join with another 
must
be partitioned the same way. But often streams at the edges of a system
*cannot* be partitioned the same way because they don't have the 
partition
key yet (often the work for this process is to find the key in some 
lookup
table based on some other key we don't control).

We have come up with a few solutions but everything seems to add 
complexity
and backs our designs into a corner.

What is frustrating is that most of the data is not really that big but 
we
have a handful of topics we expect to require a lot of throughput.

Is this just unavoidable complexity asociated with scale or am I 
thinking
about this in the wrong way. We're going all in on the "turning the
database inside out" architecture but we end up spending more time 
thinking
about how stuff gets broken up into tasks and distributed than we are 
about
our business.

Do these problems seem familiar to anyone else?  Did you find any 
patterns
that helped keep the complexity down.

Cheers








Re: What's the relationship between Kafka and Zookeeper ?

2016-09-10 Thread David Garcia
Data is always provided by the leader of a topic-partition (i.e. a broker).  
Here is a summary of how zookeeper is used: 
https://www.quora.com/What-is-the-actual-role-of-ZooKeeper-in-Kafka

-David


On 9/10/16, 3:47 PM, "Eric Ho"  wrote:

I notice that some Spark programs would contact something like 'zoo1:2181'
when trying to suck data out of Kafka.

Does the kafka data actually get routed out of zookeeper before delivering
the payload onto Spark ?

-- 

-eric ho





Re: Slow machine disrupting the cluster

2016-09-16 Thread David Garcia
To remediate, you could start another broker, rebalance, and then shut down the 
busted broker.  But, you really should put some monitoring on your system (to 
help diagnose the actual problem).  Datadog has a pretty good set of articles 
for using jmx to do this: 
https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/

There are lots of jmx metrics gathering tools too…such as jmxtrans: 
https://github.com/jmxtrans/jmxtrans


confluent also offers tooling (such as command center) to help with monitoring.


As far as mirror maker goes, you can play with the consumer/producer timeout 
settings to make sure the process waits long enough for a slow machine.

-David

On 9/16/16, 7:11 AM, "Gerard Klijs"  wrote:

We just had an interesting issue, luckily this was only on our test cluster.
Because of some reason one of the machines in a cluster became really slow.
Because it was still alive, it stil was the leader for some
topic-partitions. Our mirror maker reads and writes to multiple
topic-partitions on each thread. When committing the offsets this will fail
for the topic-partitions located on the slow machine, because the consumers
have timed out. The data for these topic-partitions will be send over and
over, causing a flood of duplicate messages.
What would be the best way to prevent this in the future. Is there some way
the broker could notice it's performing poorly and shut's off for example?




Re: Mirroring from 0.8.2.1 to 0.10.0.0

2016-09-26 Thread David Garcia
Try running mirror maker from the other direction (i.e. from 0.8.2.1 ).  I had 
a similar issue, and that seemed to work.

-David

On 9/26/16, 5:19 PM, "Xavier Lange"  wrote:

I'm using bin/kafka-mirror-maker.sh for the first time and I need to take
my "aws-cloudtrail" topic from a 0.8.2.1 single broker and mirror it to a
0.10.0.0 cluster. I am running mirror maker from a host in the 0.10.0.0
cluster.

My consumer.properties file:

$ cat consumer.properties
zookeeper.connect=10.60.68.98:2181/
zookeeper.connectiontimeout.ms=100
bootstrap.servers=10.60.68.98:9092
consumer.timeout.ms=-1
group.id=mirror-cloudtrail
shallow.iterator.enable=true
mirror.topics.whitelist=aws-cloudtrail
inter.broker.protocol.version=0.8.2

My producer.properties file:
$ cat producer.properties
#zookeeper.connect=10.60.68.116:2181/
bootstrap.servers=10.60.68.116:9092
#producer.type=async
#compression.codec=1
#serializer.class=kafka.serializer.DefaultEncoder
#max.message.size=1000
#queue.time=1000
#queue.enqueueTimeout.ms=-1

Then when I kick off the mirror maker script I see a stream of errors,
indicating some kind of protocol mis-match:

$ KAFKA_JMX_OPTS="" JMX_PORT=7204 ./bin/kafka-mirror-maker.sh
--consumer.config consumer.properties --producer.config producer.properties
--whitelist aws-cloudtrail
[2016-09-26 22:16:29,312] WARN Property bootstrap.servers is not valid
(kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property inter.broker.protocol.version is
not valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property mirror.topics.whitelist is not
valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,313] WARN Property shallow.iterator.enable is not
valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:29,314] WARN Property zookeeper.connectiontimeout.ms is
not valid (kafka.utils.VerifiableProperties)
[2016-09-26 22:16:30,770] WARN

[ConsumerFetcherThread-mirror-cloudtrail_46fc50035fd0-1474928189396-63f8456c-0-0],
Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@7add6a8
(kafka.consumer.ConsumerFetcherThread)
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at
kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at

scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at

scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:108)
at
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
at

kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

in a tight loop, I have to background the script with ctrl-z and then run
"kill -9 %1" to kill off the job.

Do I need to set the "inter.broker.protocol.version" some other way?
Perhaps in the config of the 0.10.0.0 cluster nodes and then perform a
restart there?

Thanks,
Xavier




Re: Kafka 10 Consumer Reading from Kafka 8 Cluster?

2016-10-06 Thread David Garcia
Any reason you can’t use mirror maker?
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

-David

On 10/6/16, 1:32 PM, "Craig Swift"  wrote:

Hello,

We're in the process of upgrading several of our clusters to Kafka 10. I
was wondering if it's possible to use the Kafka 10 client code (old or new)
to read from a source Kafka 8 cluster and then use the new 10 producer to
write to a destination Kafka 10 cluster? I know there's a recommended
upgrade path per the documentation but we're unfortunately not able to
update the source cluster quite yet. Thoughts?

Craig




puncutuate() never called

2016-10-07 Thread David Garcia
Hello, I’m sure this question has been asked many times.
We have a test-cluster (confluent 3.0.0 release) of 3 aws m4.xlarges.  We have 
an application that needs to use the punctuate() function to do some work on a 
regular interval.  We are using the WallClock extractor.  Unfortunately, the 
method is never called.  I have checked the filedescriptor setting for both the 
user as well as the process, and everything seems to be fine.  Is this a known 
bug, or is there something obvious I’m missing?

One note, the application used to work on this cluster, but now it’s not 
working.  Not really sure what is going on?

-David


Re: Same leader for all partitions for topics

2016-10-07 Thread David Garcia
Maybe someone already answered this…but you can use the repartitioner to fix 
that (it’s included with Kafka)

As far as root cause, you probably had a few leader elections due to excessive 
latency.  There is a cascading scenario that I noticed Kafka is vulnerable to.  
The events transpire as follows:

1.) network latency occurs due to Random acts of God.
2.) This triggers leader election for some partitions.
3.) Because the new leaders now have more work to do, this may introduce more 
latency which will trigger more leader elections…wash and repeat.
4.) Now we have one ring to rule them…one ring to find them, one ring to bring 
them all and in the darkness bind them.

I’m not sure how much new leader election would trigger disk reads (possibly 
because the leader may not have the newly served data in the OS disk cache)…but 
that may be a source for more latency (perhaps a more learned voice can chime 
in here)

In any case, you should probably have some kind of alerting on leadership 
distribution.  And hedge against this scenario with beefy machines. 

-David

On 10/7/16, 2:49 AM, "Misra, Rahul"  wrote:

Hi,

I have been using a 3 node kafka cluster for development for some time now. 
I have created some topics on this cluster. Yesterday I observed the following 
when I used 'describe' for the topics:
The Kafka version I'm using is: 9.0.1 (kafka_2.11-0.9.0.1).

Topic:topicIc  PartitionCount:3ReplicationFactor:3 Configs:
Topic: topicIc Partition: 0Leader: 0   Replicas: 1,2,0 Isr: 
0,2,1
Topic: topicIc Partition: 1Leader: 0   Replicas: 2,0,1 Isr: 
0,2,1
Topic: topicIc Partition: 2Leader: 0   Replicas: 0,1,2 Isr: 
2,0,1
Topic:topicR   PartitionCount:3ReplicationFactor:3 Configs:
Topic: topicR  Partition: 0Leader: 0   Replicas: 0,1,2 Isr: 
2,0,1
Topic: topicR  Partition: 1Leader: 0   Replicas: 1,2,0 Isr: 
0,2,1
Topic: topicR  Partition: 2Leader: 0   Replicas: 2,0,1 Isr: 
0,2,1
Topic:topicSubEPartitionCount:1ReplicationFactor:3 
Configs:
Topic: topicSubE   Partition: 0Leader: 0   Replicas: 
0,2,1 Isr: 0,2,1
Topic:topicSubIc   PartitionCount:1ReplicationFactor:3 
Configs:
Topic: topicSubIc  Partition: 0Leader: 0   Replicas: 
2,0,1 Isr: 0,2,1
Topic:topicSubLr   PartitionCount:1ReplicationFactor:3 
Configs:
Topic: topicSubLr  Partition: 0Leader: 0   Replicas: 
0,1,2 Isr: 2,0,1
Topic:topicSubRPartitionCount:1ReplicationFactor:3 
Configs:
Topic: topicSubR   Partition: 0Leader: 0   Replicas: 
0,1,2 Isr: 2,0,1


As you would observe, the leader for all the partitions for all the topics 
is just one node: Broker node 0. This is true even for the '__consumer_offsets' 
topic.
The brokers are up and running on all the 3 nodes and the Zookeeper nodes 
are also running fine.

Does anybody have any ideas as to why this may have happened?
Is there a way to manually trigger rebalance of partitions to the nodes?

Regards,
Rahul Misra
This email message and any attachments are intended solely for the use of 
the addressee. If you are not the intended recipient, you are prohibited from 
reading, disclosing, reproducing, distributing, disseminating or otherwise 
using this transmission. If you have received this message in error, please 
promptly notify the sender by reply email and immediately delete this message 
from your system. This message and any attachments may contain information that 
is confidential, privileged or exempt from disclosure. Delivery of this message 
to any person other than the intended recipient is not intended to waive any 
right or privilege. Message transmission is not guaranteed to be secure or free 
of software viruses. 

***




Re: puncutuate() never called

2016-10-07 Thread David Garcia
Yeah, this is possible.  We have run the application (and have confirmed data 
is being received) for over 30 mins…with a 60-second timer.  So, do we need to 
just rebuild our cluster with bigger machines?

-David

On 10/7/16, 11:18 AM, "Michael Noll"  wrote:

David,

punctuate() is still data-driven at this point, even when you're using the
WallClock timestamp extractor.

To use an example: Imagine you have configured punctuate() to be run every
5 seconds.  If there's no data being received for a minute, then punctuate
won't be called -- even though you probably would have expected this to
happen 12 times during this 1 minute.

(FWIW, there's an ongoing discussion to improve punctuate(), part of which
is motivated by the current behavior that arguably is not very intuitive to
many users.)

Could this be the problem you're seeing?  See also the related discussion
at

http://stackoverflow.com/questions/39535201/kafka-problems-with-timestampextractor
.






    On Fri, Oct 7, 2016 at 6:07 PM, David Garcia  wrote:

> Hello, I’m sure this question has been asked many times.
> We have a test-cluster (confluent 3.0.0 release) of 3 aws m4.xlarges.  We
> have an application that needs to use the punctuate() function to do some
> work on a regular interval.  We are using the WallClock extractor.
> Unfortunately, the method is never called.  I have checked the
> filedescriptor setting for both the user as well as the process, and
> everything seems to be fine.  Is this a known bug, or is there something
> obvious I’m missing?
>
> One note, the application used to work on this cluster, but now it’s not
> working.  Not really sure what is going on?
>
> -David
>




puncutuate() bug

2016-10-07 Thread David Garcia
Ok I found the bug.  Basically, if there is an empty topic (in the list of 
topics being consumed), any partition-group with partitions from the topic will 
always return -1 as the smallest timestamp (see PartitionGroup.java).

To reproduce, simply start a kstreams consumer with one or more empty topics.  
Punctuate will never be called.

-David

On 10/7/16, 1:11 PM, "David Garcia"  wrote:

Yeah, this is possible.  We have run the application (and have confirmed 
data is being received) for over 30 mins…with a 60-second timer.  So, do we 
need to just rebuild our cluster with bigger machines?

-David

On 10/7/16, 11:18 AM, "Michael Noll"  wrote:

David,

punctuate() is still data-driven at this point, even when you're using 
the
WallClock timestamp extractor.

To use an example: Imagine you have configured punctuate() to be run 
every
5 seconds.  If there's no data being received for a minute, then 
punctuate
won't be called -- even though you probably would have expected this to
happen 12 times during this 1 minute.

(FWIW, there's an ongoing discussion to improve punctuate(), part of 
which
is motivated by the current behavior that arguably is not very 
intuitive to
many users.)

Could this be the problem you're seeing?  See also the related 
discussion
at

http://stackoverflow.com/questions/39535201/kafka-problems-with-timestampextractor
.






    On Fri, Oct 7, 2016 at 6:07 PM, David Garcia  
wrote:

> Hello, I’m sure this question has been asked many times.
> We have a test-cluster (confluent 3.0.0 release) of 3 aws m4.xlarges. 
 We
> have an application that needs to use the punctuate() function to do 
some
> work on a regular interval.  We are using the WallClock extractor.
> Unfortunately, the method is never called.  I have checked the
> filedescriptor setting for both the user as well as the process, and
> everything seems to be fine.  Is this a known bug, or is there 
something
> obvious I’m missing?
>
> One note, the application used to work on this cluster, but now it’s 
not
> working.  Not really sure what is going on?
>
> -David
>






Re: puncutuate() bug

2016-10-08 Thread David Garcia
Actually, I think the bug is more subtle.  What happens when a consumed topic 
stops receiving messages?  The smallest timestamp will always be the static 
timestamp of this topic.

-David

On 10/7/16, 5:03 PM, "David Garcia"  wrote:

Ok I found the bug.  Basically, if there is an empty topic (in the list of 
topics being consumed), any partition-group with partitions from the topic will 
always return -1 as the smallest timestamp (see PartitionGroup.java).

To reproduce, simply start a kstreams consumer with one or more empty 
topics.  Punctuate will never be called.

-David

On 10/7/16, 1:11 PM, "David Garcia"  wrote:

Yeah, this is possible.  We have run the application (and have 
confirmed data is being received) for over 30 mins…with a 60-second timer.  So, 
do we need to just rebuild our cluster with bigger machines?

-David

On 10/7/16, 11:18 AM, "Michael Noll"  wrote:

David,

punctuate() is still data-driven at this point, even when you're 
using the
WallClock timestamp extractor.

To use an example: Imagine you have configured punctuate() to be 
run every
5 seconds.  If there's no data being received for a minute, then 
punctuate
won't be called -- even though you probably would have expected 
this to
happen 12 times during this 1 minute.

(FWIW, there's an ongoing discussion to improve punctuate(), part 
of which
is motivated by the current behavior that arguably is not very 
intuitive to
many users.)

Could this be the problem you're seeing?  See also the related 
discussion
at

http://stackoverflow.com/questions/39535201/kafka-problems-with-timestampextractor
.





    
    On Fri, Oct 7, 2016 at 6:07 PM, David Garcia 
 wrote:

> Hello, I’m sure this question has been asked many times.
> We have a test-cluster (confluent 3.0.0 release) of 3 aws 
m4.xlarges.  We
> have an application that needs to use the punctuate() function to 
do some
> work on a regular interval.  We are using the WallClock extractor.
> Unfortunately, the method is never called.  I have checked the
> filedescriptor setting for both the user as well as the process, 
and
> everything seems to be fine.  Is this a known bug, or is there 
something
> obvious I’m missing?
>
> One note, the application used to work on this cluster, but now 
it’s not
> working.  Not really sure what is going on?
>
> -David
>








Re: puncutuate() never called

2016-10-10 Thread David Garcia
Thx for the responses.  I was able to identify a bug in how the times are 
obtained (offsets resolved as unknown cause the issue):

“Actually, I think the bug is more subtle.  What happens when a consumed topic 
stops receiving messages?  The smallest timestamp will always be the static 
timestamp of this topic.

-David

On 10/7/16, 5:03 PM, "David Garcia"  wrote:

Ok I found the bug.  Basically, if there is an empty topic (in the list of 
topics being consumed), any partition-group with partitions from the topic will 
always return -1 as the smallest timestamp (see PartitionGroup.java).

To reproduce, simply start a kstreams consumer with one or more empty 
topics.  Punctuate will never be called.

-David ”

On 10/10/16, 1:55 AM, "Michael Noll"  wrote:

> We have run the application (and have confirmed data is being received)
for over 30 mins…with a 60-second timer.

Ok, so your app does receive data but punctuate() still isn't being called.
:-(


> So, do we need to just rebuild our cluster with bigger machines?

That's worth trying out.  See

http://www.confluent.io/blog/design-and-deployment-considerations-for-deploying-apache-kafka-on-aws/
for some EC2 instance types recommendations.

But I'd also suggest to look into the logs of (1) your application, (2) the
log files of the Kafka broker(s), and (3) the log files of ZooKeeper to see
whether you see anything suspicious?

Sorry for not being able to provide more actionable feedback at this
point.  Typically we have seen such issues only (but not exclusively) in
cases where there have been problems in the environment in which your
application is running and/or the environment of the Kafka clusters.
Unfortunately these environment problems are a bit tricky to debug remotely
via the mailing list.

-Michael





    On Fri, Oct 7, 2016 at 8:11 PM, David Garcia  wrote:

> Yeah, this is possible.  We have run the application (and have confirmed
> data is being received) for over 30 mins…with a 60-second timer.  So, do 
we
> need to just rebuild our cluster with bigger machines?
>
> -David
>
> On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
>
> David,
>
> punctuate() is still data-driven at this point, even when you're using
> the
> WallClock timestamp extractor.
>
> To use an example: Imagine you have configured punctuate() to be run
> every
> 5 seconds.  If there's no data being received for a minute, then
> punctuate
> won't be called -- even though you probably would have expected this 
to
> happen 12 times during this 1 minute.
>
> (FWIW, there's an ongoing discussion to improve punctuate(), part of
> which
> is motivated by the current behavior that arguably is not very
> intuitive to
> many users.)
>
> Could this be the problem you're seeing?  See also the related
> discussion
> at
> http://stackoverflow.com/questions/39535201/kafka-problems-with-
> timestampextractor
> .
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 6:07 PM, David Garcia 
> wrote:
>
> > Hello, I’m sure this question has been asked many times.
> > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> m4.xlarges.  We
> > have an application that needs to use the punctuate() function to do
> some
> > work on a regular interval.  We are using the WallClock extractor.
> > Unfortunately, the method is never called.  I have checked the
> > filedescriptor setting for both the user as well as the process, and
> > everything seems to be fine.  Is this a known bug, or is there
> something
> > obvious I’m missing?
> >
> > One note, the application used to work on this cluster, but now it’s
> not
> > working.  Not really sure what is going on?
> >
> > -David
> >
>
>
>




broker upgrade

2016-10-11 Thread David Garcia
Hello, we are going to be upgrading the instance types of our brokers.  We will 
shut them down, upgrade, and the restart them.  All told, they will be down for 
about 15 minutes.  Upon restart, is there anything we need to do other than run 
preferred leader election?  The brokers will start to catch up on their own 
right?  How long should it take for them to catch up (in terms of how long they 
are down).  I would assume it would be less than 15 minutes.

Any tips are appreciated.  I’ve been looking at this: 
http://kafka.apache.org/documentation.html#basic_ops_restarting

-David


Re: Doubt on max.request.size

2016-10-14 Thread David Garcia
Hello Daniccan.  I apologize for the dumb question, but did you also check 
“message.max.bytes” on the broker?  Default is about 1meg (112 bytes) for 
kafka 0.10.0.  if you need to publish larger messages, you will need to adjust 
that on the brokers and then restart them.

-David

On 10/14/16, 5:49 AM, "Daniccan VP"  wrote:

Hi,

Kindly request to help with a doubt regarding the "max.request.size" 
configuration that we use in the Kafka Producer. I get the following exceptions 
sometimes in my project.

org.apache.kafka.common.errors.RecordTooLargeException: The request 
included a message larger than the max message size the server will accept.

org.apache.kafka.common.errors.RecordTooLargeException: The message is 
5855801 bytes when serialized which is larger than the maximum request size you 
have configured with the max.request.size configuration.

After facing the errors a couple of times, I have set the 
"max.request.size" to 5 MB now. But I still get the above errors. Does 
increasing the "max.request.size" affect the performance of the Kafka Producer 
and Consumer ? Is there any workaround for this ?

Thanks and Regards,
Daniccan VP | Junior Software Engineer
Email : danic...@iqsystech.com

***
 This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they have been 
addressed. If you are not the intended recipient, you are notified that 
disclosing, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system.




Re: Doubt on max.request.size

2016-10-14 Thread David Garcia
WRT performance, yes, changing message size will affect the performance of 
producers and consumers.  Please study the following to understand the 
relationship between message size and performance (graphs at the bottom 
visualize the relationship nicely):
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

-David

On 10/14/16, 5:49 AM, "Daniccan VP"  wrote:

Hi,

Kindly request to help with a doubt regarding the "max.request.size" 
configuration that we use in the Kafka Producer. I get the following exceptions 
sometimes in my project.

org.apache.kafka.common.errors.RecordTooLargeException: The request 
included a message larger than the max message size the server will accept.

org.apache.kafka.common.errors.RecordTooLargeException: The message is 
5855801 bytes when serialized which is larger than the maximum request size you 
have configured with the max.request.size configuration.

After facing the errors a couple of times, I have set the 
"max.request.size" to 5 MB now. But I still get the above errors. Does 
increasing the "max.request.size" affect the performance of the Kafka Producer 
and Consumer ? Is there any workaround for this ?

Thanks and Regards,
Daniccan VP | Junior Software Engineer
Email : danic...@iqsystech.com

***
 This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they have been 
addressed. If you are not the intended recipient, you are notified that 
disclosing, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system.




Re: Query on kafka queue message purge.

2016-10-14 Thread David Garcia
Using the kafka-topics.sh script, simply set the retention in a way to remove 
the message:

Kafka-topics.sh –zookeeper  --alter –config retention.ms= 
--topic 

This is actually deprecated, but still works in newer kafka 0.10.0.  Note: 
cleanup=delete is required for this.  This policy will only execute when the 
cleanup thread runs which, by default, is 5 mins.

-David

On 10/14/16, 3:09 AM, "Rudra Moharana"  wrote:

Hi Team,

I need a help for my query
Is there any way to remove the message from kafka queue with out stopping
zookeeper or topic server or cluster.

Thanks,
Rudra




Please help with AWS configuration

2016-10-19 Thread David Garcia
Hello everyone.  I’m having a hell of a time figuring out a Kafka performance 
issue in AWS. Any help is greatly appreciated!

Here is our AWS configuration:


-  Zookeeper Cluster (3.4.6): 3-nodes on m4.xlarges (default 
configuration) EBS volumes (sd1)

-  Kafka Cluster (0.10.0): 3 nodes on m4.2xlarges (config: 
https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440) EBS volumes 
(sd1)

Usage:

Our usage of the cluster is fairly modest (at least I think so). At peak hours, 
each broker will receive about 1.4 MB/sec. Our primary input topic has about 54 
partitions with replication set to 3 (ack=all).  Another consumer consumes this 
topic and spreads the messages across 8 other topics each with 8 
partitions…each of which has replication set to 2 (ack=all).  Downstream, 4 
other consumers consume these topics(one consumer consumes the 8 previous 
topics, transforms the messages, and sends the new messages to 8 other 
topics(ack=1) .  In all we end up generating about 206 partitions with an 
average replication of 2.26.

Our Problem:

Our cluster will hum-along just fine when suddenly, 1 or more brokers will 
start experiencing severe ISR-shrinking/expanding.  This causes underreplicated 
partitions and the producer purgatory size starts to rapidly expand(on the 
affected brokers)…this causes downstream producers to get behind in some cases.

In the Kafka configuration above, we have a couple of non-default settings, but 
nothing seems to stand out.  Is there anything obvious I’m missing (or need to 
add/adjust)?  Or is there a bug I should be aware of that would cause these 
issues.

-David


Re: Please help with AWS configuration

2016-10-19 Thread David Garcia
Sorry, had a typo in my gist.  Here is the correct location:

https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440

On 10/19/16, 4:33 PM, "David Garcia"  wrote:

Hello everyone.  I’m having a hell of a time figuring out a Kafka 
performance issue in AWS. Any help is greatly appreciated!

Here is our AWS configuration:


-  Zookeeper Cluster (3.4.6): 3-nodes on m4.xlarges (default 
configuration) EBS volumes (sd1)

-  Kafka Cluster (0.10.0): 3 nodes on m4.2xlarges (config: 
https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440 EBS volumes 
(sd1)

Usage:

Our usage of the cluster is fairly modest (at least I think so). At peak 
hours, each broker will receive about 1.4 MB/sec. Our primary input topic has 
about 54 partitions with replication set to 3 (ack=all).  Another consumer 
consumes this topic and spreads the messages across 8 other topics each with 8 
partitions…each of which has replication set to 2 (ack=all).  Downstream, 4 
other consumers consume these topics(one consumer consumes the 8 previous 
topics, transforms the messages, and sends the new messages to 8 other 
topics(ack=1) .  In all we end up generating about 206 partitions with an 
average replication of 2.26.

Our Problem:

Our cluster will hum-along just fine when suddenly, 1 or more brokers will 
start experiencing severe ISR-shrinking/expanding.  This causes underreplicated 
partitions and the producer purgatory size starts to rapidly expand(on the 
affected brokers)…this causes downstream producers to get behind in some cases.

In the Kafka configuration above, we have a couple of non-default settings, 
but nothing seems to stand out.  Is there anything obvious I’m missing (or need 
to add/adjust)?  Or is there a bug I should be aware of that would cause these 
issues.

-David




Invalid IP addresses sent from Consumers

2019-12-05 Thread David Garcia

 Hello, my consumers are reporting invalid IP address.  When running 
kafka-consumer-groups –describe… I see the following:


TOPICPARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
CONSUMER-ID HOST   CLIENT-ID

Topic1   12 1319627 1319627 0   
consumer1   /xx.yyy.zzz.xxx(non-existent-address)  
client-1-StreamThread-4-consumer

Topic2   8  44604   44604   0   
consumer2.  /xx.yyy.zzz.xxy(non-existent-address)  
client-1-StreamThread-3-consumer

I understand that this hostInfo (i.e. the ip-addresses) are reported by the 
consumers.  How can I configure my consumers to report the Host information I 
want?  It’s worth noting that my apps are running in AWS EC2.  Thx for the help!

-David J. Garcia


Static Membership AND Invalid IP addresses forConsumers

2019-12-06 Thread David Garcia
I forgot to note that we are using spot instances AND static membership.  I was 
able to confirm that setting application.server with the correct ip-address 
seems to result in the correct Host being set...however, upon spot-reap/replace 
the new ip-address does not seem to overwrite the previous ip-address in the 
host-field.  Is there a way to make it replace the host-info AND use static 
membership?


David Garcia 
Staff Big Data Engineer 
david.gar...@bazaarvoice.com 
O:  
M: 512.576.5864 
Site <https://www.bazaarvoice.com/>  |  Blog <https://www.bazaarvoice.com/blog> 
 |  Twitter <https://twitter.com/Bazaarvoice>
 
 

On 12/5/19, 1:07 PM, "David Garcia"  wrote:

EXTERNAL: This email originated from outside of Bazaarvoice. Do not click 
any links or open any attachments unless you trust the sender and know the 
content is safe.


 Hello, my consumers are reporting invalid IP address.  When running 
kafka-consumer-groups –describe… I see the following:


TOPICPARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
CONSUMER-ID HOST 
CLIENT-ID

Topic1   121319627   1319627
  0   consumer1   /xx.yyy.zzz.xxx(non-existent-address)  
client-1-StreamThread-4-consumer

Topic2   8   44604   44604  
0   consumer2  
/xx.yyy.zzz.xxy(non-existent-address)   client-1-StreamThread-3-consumer

I understand that this hostInfo (i.e. the ip-addresses) are reported by the 
consumers.  How can I configure my consumers to report the Host information I 
want?  It’s worth noting that my apps are running in AWS EC2.  Thx for the help!

-David J. Garcia




Corrupt Kafka file

2019-12-19 Thread David Garcia
Hello, we are getting the following error:

server.log:[2019-12-17 15:05:28,757] ERROR [ReplicaManager broker=5] Error 
processing fetch with max size 1048576 from consumer on partition my-topic-2: 
(fetchOffset=312239, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
server.log:org.apache.kafka.common.errors.CorruptRecordException: Found record 
size -2010690462 smaller than minimum record overhead (14) in file 
/var/lib/kafka/my-topic-2/00307631.log.

This error started occurring after our brokers got overloaded with fetch 
requests from an errant spark job.  At the moment, our consumers aren’t able to 
progress past the respective offset.  I found a few tickets that seemed 
relevant, (e.g. https://issues.apache.org/jira/browse/KAFKA-6679 )…but they 
aren’t quite the same…we were able to dump the records from the relevant files.

-David


Re: KafkaStream: puncutuate() never called even when data is received by process()

2016-11-23 Thread David Garcia
If you are consuming from more than one topic/partition, punctuate is triggered 
when the “smallest” time-value changes.  So, if there is a partition that 
doesn’t have any more messages on it, it will always have the smallest 
time-value and that time value won’t change…hence punctuate never gets called.

-David

On 11/23/16, 1:01 PM, "Matthias J. Sax"  wrote:

Your understanding is correct:

Punctuate is not triggered base on wall-clock time, but based in
internally tracked "stream time" that is derived from TimestampExtractor.
Even if you use WallclockTimestampExtractor, "stream time" is only
advance if there are input records.

Not sure why punctuate() is not triggered as you say that you do have
arriving data.

Can you share your code?



-Matthias


On 11/23/16 4:48 AM, shahab wrote:
> Hello,
> 
> I am using low level processor and I set the context.schedule(1),
> assuming that punctuate() method is invoked every 10 sec .
>  I have set
> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getCanonicalName()) )
> 
> Although data is keep coming to the topology (as I have logged the 
incoming
> tuples to process() ),  punctuate() is never executed.
> 
> What I am missing?
> 
> best,
> Shahab
> 





Re: KafkaStream: puncutuate() never called even when data is received by process()

2016-11-26 Thread David Garcia
I know that the Kafka team is working on a new way to reason about time.  My 
team's solution was to not use punctuate...but this only works if you have 
guarantees that all of the tasks will receive messages..if not all the 
partitions.  Another solution is to periodically send canaries to all 
partitions your app is listening to.  In either case it's a bandaid.  I know 
the team is aware of this bug and they are working on it.  Hopefully it will be 
addressed in 0.10.1.1

Sent from my iPhone

> On Nov 24, 2016, at 1:55 AM, shahab  wrote:
> 
> Thanks for the comments.
> @David: yes, I have a source which is reading data from two topics and one
> of them were empty while the second one was loaded with plenty of data.
> So what do you suggest to solve this ?
> Here is snippet of my code:
> 
> StreamsConfig config = new StreamsConfig(configProperties);
> TopologyBuilder builder = new TopologyBuilder();
> AppSettingsFetcher appSettingsFetcher = initAppSettingsFetcher();
> 
> StateStoreSupplier company_bucket= Stores.create("CBS")
>.withKeys(Serdes.String())
>.withValues(Serdes.String())
>.persistent()
>.build();
> 
> StateStoreSupplier profiles= Stores.create("PR")
>.withKeys(Serdes.String())
>.withValues(Serdes.String())
>.persistent()
>.build();
> 
> 
> builder
>.addSource("deltaSource", topicName, LoaderListener.LoadedDeltaTopic)
> 
>.addProcessor("deltaProcess1", () -> new DeltaProcessor(
> 
>), "deltaSource")
>.addProcessor("deltaProcess2", () -> new LoadProcessor(
> 
>), "deltaProcess1")
>.addStateStore(profiles, "deltaProcess2", "deltaProcess1")
>.addStateStore(company_bucket, "deltaProcess2", "deltaProcess1");
> 
> KafkaStreams streams = new KafkaStreams(builder, config);
> 
> streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
>@Override
>public void uncaughtException(Thread t, Throwable e) {
>e.printStackTrace();
>}
> });
> 
> streams.start();
> 
> 
>> On Wed, Nov 23, 2016 at 8:30 PM, David Garcia  wrote:
>> 
>> If you are consuming from more than one topic/partition, punctuate is
>> triggered when the “smallest” time-value changes.  So, if there is a
>> partition that doesn’t have any more messages on it, it will always have
>> the smallest time-value and that time value won’t change…hence punctuate
>> never gets called.
>> 
>> -David
>> 
>> On 11/23/16, 1:01 PM, "Matthias J. Sax"  wrote:
>> 
>>Your understanding is correct:
>> 
>>Punctuate is not triggered base on wall-clock time, but based in
>>internally tracked "stream time" that is derived from
>> TimestampExtractor.
>>Even if you use WallclockTimestampExtractor, "stream time" is only
>>advance if there are input records.
>> 
>>Not sure why punctuate() is not triggered as you say that you do have
>>arriving data.
>> 
>>Can you share your code?
>> 
>> 
>> 
>>-Matthias
>> 
>> 
>>>On 11/23/16 4:48 AM, shahab wrote:
>>> Hello,
>>> 
>>> I am using low level processor and I set the context.schedule(1),
>>> assuming that punctuate() method is invoked every 10 sec .
>>> I have set
>>> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>> WallclockTimestampExtractor.class.getCanonicalName()) )
>>> 
>>> Although data is keep coming to the topology (as I have logged the
>> incoming
>>> tuples to process() ),  punctuate() is never executed.
>>> 
>>> What I am missing?
>>> 
>>> best,
>>> Shahab
>> 
>> 
>> 
>> 


Re: Interactive Queries

2016-11-26 Thread David Garcia
I would start here: http://docs.confluent.io/3.1.0/streams/index.html


On 11/26/16, 8:27 PM, "Alan Kash"  wrote:

Hi,

New to Kafka land.

I am looking into Interactive queries feature, which transforms Topics into
Tables with history, neat !

1. What kind of queries we can run on the store ?  Point or Range ?
2. Is Indexing supported ? primary or seconday ?
3. Query language - SQL ? Custom Java Native Query ?

I see rocksdb is the persistent layer.

Did the team look at JCache API (JSR 107) -
https://jcp.org/en/jsr/detail?id=107 ?

Thanks,
Alan




Re: HOW TO GET KAFKA CURRENT TOPIC GROUP OFFSET

2016-12-02 Thread David Garcia
https://kafka.apache.org/documentation#operations

It’s in there somewhere… ;-)

On 11/28/16, 7:34 PM, "西风瘦"  wrote:

HI! WAIT YOUR ANSWER



Re: min hardware requirement

2017-01-24 Thread David Garcia
This should give you an idea: 
https://www.confluent.io/blog/design-and-deployment-considerations-for-deploying-apache-kafka-on-aws/

On 1/23/17, 10:25 PM, "Ewen Cheslack-Postava"  wrote:

Smaller servers/instances work fine for tests, as long as the workload is
scaled down as well. Most memory on a Kafka broker will end up dedicated to
page cache. For, e.g., 1GB of RAM just consider that you probably won't be
leaving much room to cache the data so your performance may suffer a bit.

-Ewen

On Fri, Jan 20, 2017 at 7:28 PM, Laxmi Narayan NIT DGP  wrote:

> Hi ,
> what is min hardware requirement for kafka ?
>
> I see min ram size for production is recommended is 32GB.
>
> what can be issue with 8 GB ram and for test purpose i was planning to use
>
> some 1gb or 4gb aws machine, is it safe to run in 1gb machine for few 
days?
>
> For log write i have big disk.
>
>
> *Regards,*
> *Laxmi Narayan Patel*
> *MCA NIT Durgapur (2011-2014)*
> *Mob:-9741292048,8345847473*
>




Re: min hardware requirement

2017-01-24 Thread David Garcia
Sorry, wrong link: http://docs.confluent.io/2.0.1/kafka/deployment.html

On 1/24/17, 2:13 PM, "David Garcia"  wrote:

This should give you an idea: 
https://www.confluent.io/blog/design-and-deployment-considerations-for-deploying-apache-kafka-on-aws/

On 1/23/17, 10:25 PM, "Ewen Cheslack-Postava"  wrote:

Smaller servers/instances work fine for tests, as long as the workload 
is
scaled down as well. Most memory on a Kafka broker will end up 
dedicated to
page cache. For, e.g., 1GB of RAM just consider that you probably won't 
be
leaving much room to cache the data so your performance may suffer a 
bit.

-Ewen

On Fri, Jan 20, 2017 at 7:28 PM, Laxmi Narayan NIT DGP 
 wrote:

> Hi ,
> what is min hardware requirement for kafka ?
>
> I see min ram size for production is recommended is 32GB.
>
> what can be issue with 8 GB ram and for test purpose i was planning 
to use
>
> some 1gb or 4gb aws machine, is it safe to run in 1gb machine for few 
days?
>
> For log write i have big disk.
>
>
> *Regards,*
> *Laxmi Narayan Patel*
> *MCA NIT Durgapur (2011-2014)*
> *Mob:-9741292048,8345847473*
>






Re: How to measure the load capacity of kafka cluster

2017-02-09 Thread David Garcia
From my experience, the bottle neck of a kafka cluster is the writing.  (i.e. 
the producers)  The most direct way to measure how “stressed” the writing 
threads are is to directly observe the producer purgatory buffer of your 
brokers.  The larger it gets the more likely a leader will report an out of 
sync replica.

-David

On 2/7/17, 7:45 PM, "Jiecxy" <253441...@qq.com> wrote:

How to measure the load capacity of one broker or whole cluster?  Or 
maximum throughput?
For the quantity of broker, is there any multiple relationship? Such as the 
capacity of two brokers is twice as much as that of one broker? 



TimeBasePartitioner for confluent

2017-03-03 Thread David Garcia
Trying to user s3-loader and am getting this error:

org.apache.kafka.common.config.ConfigException: Invalid generator class: class 
io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.newSchemaGenerator(TimeBasedPartitioner.java:107)
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.init(TimeBasedPartitioner.java:44)
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.configure(TimeBasedPartitioner.java:87)
at 
io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:146)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:100)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)


perhaps this is a configuration issue?

-David


Re: TimeBasePartitioner for confluent

2017-03-03 Thread David Garcia
Gah…nm…looked at source code…use this:

schema.generator.class=io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator

On 3/3/17, 5:36 PM, "David Garcia"  wrote:

Trying to user s3-loader and am getting this error:

org.apache.kafka.common.config.ConfigException: Invalid generator class: 
class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.newSchemaGenerator(TimeBasedPartitioner.java:107)
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.init(TimeBasedPartitioner.java:44)
at 
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.configure(TimeBasedPartitioner.java:87)
at 
io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:146)
at 
io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:100)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)


perhaps this is a configuration issue?

-David




Re: ISR churn

2017-03-22 Thread David Garcia
Look at producer purgatory size.  Anything greater than 10 is bad (from my 
experience).  Keeping it under 4 seemed to help us.  (i.e. if a broker is 
getting slammed with write, use rebalance tools or add a new broker).  Also 
check network latency and/or adjust timeout for ISR checking.  If on AWS, make 
sure to enable “enhanced networking” (aka: networking that doesn’t suck)

On 3/22/17, 3:39 PM, "Jun MA"  wrote:

Let me know if this fix your issue! I’d really interesting to know based on 
what information should we decide to expand the cluster- bytes per seconds or 
number of partitions on each broker? And what is the limitation.


> On Mar 22, 2017, at 11:46 AM, Marcos Juarez  wrote:
> 
> We're seeing the same exact pattern of ISR shrinking/resizing, mostly on 
partitions with the largest volume, with thousands of messages per second.  It 
happens at least a hundred times a day in our production cluster. We do have 
hundreds of topics in our cluster, most of them with 20 or more partitions, but 
most of them see only a few hundred messages per minute.  
> 
> We're running Kafka 0.10.0.1, and we thought upgrading to the 0.10.1.1 
version would fix the issue, but we've already deployed that version to our 
staging cluster, and we're seeing the same problem.  We still haven't tried out 
the latest 0.10.2.0 version, but I don't see any evidence pointing to a fix for 
that.
> 
> This ticket seems to have some similar details, but it doesn't seem like 
there has been follow-up, and there's no target release for fixing:
> 
> https://issues.apache.org/jira/browse/KAFKA-4674 

> 
> Jun Ma, what exactly did you do to failover the controller to a new 
broker? If that works for you, I'd like to try it in our staging clusters.
> 
> Thanks,
> 
> Marcos Juarez
> 
> 
> 
> 
> 
> On Wed, Mar 22, 2017 at 11:55 AM, Jun MA mailto:mj.saber1...@gmail.com>> wrote:
> I have similar issue with our cluster. We don’t know the root cause but 
we have some interesting observation.
> 
> 1. We do see correlation between ISR churn and fetcher connection 
close/create.
> 
> 
> 2. We’ve tried to add a broker which doesn’t have any partitions on it 
dedicate to the controller (rolling restart existing brokers and failover the 
controller to the newly added broker), and that indeed eliminate the random ISR 
churn. We have a cluster of 6 brokers (besides the dedicated controller) and 
each one has about 300 partitions on it. I suspect that kafka broker cannot 
handle running controller + 300 partitions.
> 
> Anyway that’s so far what I got, I’d also like to know how to debug this.
> We’re running kafka 0.9.0.1 with heap size 8G.
> 
> Thanks,
> Jun
> 
>> On Mar 22, 2017, at 7:06 AM, Manikumar mailto:manikumar.re...@gmail.com>> wrote:
>> 
>> Any erros related to zookeeper seesion timeout? We can also check for
>> excesssive GC.
>> Some times this may due to forming multiple controllers due to soft
>> failures.
>> You can check ActiveControllerCount on brokers, only one broker in the
>> cluster should have 1.
>> Also check for network issues/partitions
>> 
>> On Wed, Mar 22, 2017 at 7:21 PM, Radu Radutiu mailto:rradu...@gmail.com>> wrote:
>> 
>>> Hello,
>>> 
>>> Does anyone know how I can debug high ISR churn on the kafka leader on a
>>> cluster without traffic? I have 2 topics on a 4 node cluster  (replica 4
>>> and replica 3) and both show constant changes of the number of insync
>>> replicas:
>>> 
>>> [2017-03-22 15:30:10,945] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,4 to 2,4,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:31:41,193] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,4,5 to 2,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:31:41,195] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,5 to 2,5,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:35:03,443] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,5,4 to 2,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:35:03,445] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,5 to 2,5,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:37:01,443] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,5,4 to 2,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:37:01,445] INFO Partition [__consumer_offsets,0] on 
broker
>>> 2: Expanding ISR for partition

Re: ISR churn

2017-03-22 Thread David Garcia
Sure…there are two types of purgatories: Consumer and Producer
Consumer purgatory (for a partition leader) is a queue for pending requests for 
data (i.e. polling by some client for the respective partition).  It’s 
basically a waiting area for poll requests.  Generally speaking, the more 
consumers there are, the larger this queue will grow…and the larger this queue, 
the longer consumers will wait for data. 

The Producer queue is for write requests (for a partition leader).  In my 
experience, the producer queue is more of a bottle neck (because it has to 
write to disk…which is a slower operation).  Although Kafka delegates to the OS 
for disk IO, I have found this purgatory-size to be highly predictive of 
cluster stress.  WRT ISR, if you relax the replica setting for your topics, 
this will obviously mitigate this issue (i.e. fewer synchronizations needed).  
Network latency was also one of our biggest issues.  Have you checked that?

-David

On 3/22/17, 8:28 PM, "Jun MA"  wrote:

Hi David,

I checked our cluster, the producer purgatory size is under 3 mostly. But 
I’m not quite understand this metrics, could you please explain it a little bit?

Thanks,
Jun
> On Mar 22, 2017, at 3:07 PM, David Garcia  wrote:
> 
>  producer purgatory size





Re: Benchmarking streaming frameworks

2017-03-23 Thread David Garcia
I don’t think “benchmarking” frameworks WRT Kafka is a particularly 
informative.  The various frameworks available are better compared WRT their 
features and processing limitations.  For example, Akka-streams for kafka 
effects a more intuitive way to express asynchronous operations.  If you were 
to benchmark each framework with a simple poll-transformation-publish workload, 
I think you would find very little difference between them (assuming that they 
were all configured appropriately…minimum consumer bytes setting for instance). 
 I think each framework would be better evaluated according to it’s 
features….just my thoughts.

-David

On 3/23/17, 9:38 AM, "Eno Thereska"  wrote:

Hi Giselle,

Great idea! In Kafka Streams we have a few micro-benchmarks we run nightly. 
They are at: 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
 

 

It's mostly simple stuff (aggregations, joins) and we are continuously 
updating them and adding more. 
The nightly results are kept publicly at 
http://testing.confluent.io/confluent-kafka-system-test-results/ 
, e.g., see 
report on 2017-03-21: 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-03-21--001.1490119830--apache--trunk--05690f0/report.html
 

 (search for "simple_benchmark_test").

Your feedback and input is always appreciated.

Thanks,
Eno

> On 23 Mar 2017, at 10:09, Giselle van Dongen  
wrote:
> 
> Dear users of Streaming Technologies,
> 
> As a PhD student in big data analytics, I am currently in the process of
> compiling a list of benchmarks (to test multiple streaming frameworks) in
> order to create an expanded benchmarking suite. The benchmark suite is 
being
> developed as a part of my current work at Ghent University.
> 
> The included frameworks at this time are, in no particular order, Spark,
> Flink, Kafka (Streams), Storm (Trident) and Drizzle. Any pointers to
> previous work or relevant benchmarks would be appreciated.
> 
> Best regards,
> Giselle van Dongen





Re: Kafka on windows

2017-04-11 Thread David Garcia
One issue is that Kafka leverage some very specific features of the linux 
kernel that are probably far different from Windows, so I imagine the 
performance profile is likewise much different.

On 4/11/17, 8:52 AM, "Tomasz Rojek"  wrote:

Hi All,

We want to choose provider of messaging system in our company, one of
possible choices is Apache Kafka. One of operating system that will host
brokers is windows, according to documentation:

https://kafka.apache.org/documentation.html#os
"*We have seen a few issues running on Windows and Windows is not currently
a well supported platform though we would be happy to change that.*"

Can you please elaborate more on this. What exactly potential issues are we
talking about? What functionalities of kafka are influenced by this? Maybe
it occurs only on specific version of windows?

Thank you in advance for any information.

With Regards
Tomasz Rojek
Java Engineer




Re: Kafka on windows

2017-04-14 Thread David Garcia
If you want reduce maintenance headaches, I highly recommend not running 
brokers on windows (or in vm’s for that matter).  It really isn’t supported.  
Kafka needs highly performant OS primitives.  For example, XFS, and ext4 are 
recommended filesystems for reasons specific to Kafka.  If this is just for 
experimentation/curiosity, then go all out…but production systems should stick 
with the documentation’s recommendations.

On 4/12/17, 12:13 AM, "David Luu"  wrote:

I'm curious as well. That doc blurb doesn't give specifics. How is kafka
run (or tested) on Windows? Natively via the command line shell, etc. or
via cygwin, within a *nix VM on Windows, or via Windows 10's Ubuntu Linux
Bash shell? Would be interesting to see how each method I listed performs,
maybe the Windows 10 bash shell method might be most optimal among the list?

On Tue, Apr 11, 2017 at 9:41 PM, David Garcia  wrote:

> One issue is that Kafka leverage some very specific features of the linux
> kernel that are probably far different from Windows, so I imagine the
> performance profile is likewise much different.
>
> On 4/11/17, 8:52 AM, "Tomasz Rojek"  wrote:
>
> Hi All,
>
> We want to choose provider of messaging system in our company, one of
> possible choices is Apache Kafka. One of operating system that will
> host
> brokers is windows, according to documentation:
>
> https://kafka.apache.org/documentation.html#os
> "*We have seen a few issues running on Windows and Windows is not
> currently
> a well supported platform though we would be happy to change that.*"
>
> Can you please elaborate more on this. What exactly potential issues
> are we
> talking about? What functionalities of kafka are influenced by this?
> Maybe
> it occurs only on specific version of windows?
>
> Thank you in advance for any information.
>
> With Regards
> Tomasz Rojek
> Java Engineer
>
>
>


-- 
David Luu
Member of Technical Staff
Mist Systems, Inc.
1601 S. De Anza Blvd. #248
Cupertino, CA 95014




Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
Kafka is very reliable when the broker actually gets the message and replies 
back to the producer that it got the message (i.e. it won’t “lie”).  Basically, 
your producer tried to put too many bananas into the Bananer’s basket.  And 
yes, Windows is not supported.  You will get much better performance with a 
linux deployment.  But as with everything, monitor Kafka (OS metrics such as 
disk scans..etc) so that you have a sense of how much “capacity” it has.  The 
confluent control center has some of this stuff out of the box, but you really 
should monitor OS metrics as well.  Netflix has a good article for this too: 
http://techblog.netflix.com/2016/04/kafka-inside-keystone-pipeline.html

My team built a simple latency canary app and reports the numbers to new 
relic…it’s very indicative of cluster health.

-David

On 4/18/17, 4:32 PM, "jan"  wrote:

Hi Serega,

> data didn't reach producer. So why should data appear in consumer?

err, isn't it supposed to? Isn't the loss of data a very serious error?

> loss rate is more or less similar [...] Not so bad.

That made me laugh at least.  Is kafka intended to be a reliable
message delivery system, or is a 2% data loss officially acceptable?

I've been reading the other threads and one says windows is really not
supported, and certainly not for production. Perhaps that's the root
of it. Well I'm hoping to try it on linux shortly so I'll see if I can
replicate the issue but I would like to know whether it *should* work
in windows.

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
> Hi,
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> data didn't reach producer. So why should data appear in consumer?
> loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> 5000gb) Not so bad.
>
>
> 2017-04-18 21:46 GMT+02:00 jan :
>
>> Hi all, I'm something of a kafka n00b.
>> I posted the following in the  google newsgroup, haven't had a reply
>> or even a single read so I'll try here. My original msg, slightly
>> edited, was:
>>
>> 
>>
>> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> server, latest version of java)
>>
>> I've spent several days trying to sort out unexpected behaviour
>> involving kafka and the kafka console producer and consumer.
>>
>>  If I set  the console produced and console consumer to look at the
>> same topic then I can type lines into the producer window and see them
>> appear in the consumer window, so it works.
>>
>> If I try to pipe in large amounts of data to the producer, some gets
>> lost and the producer reports errors eg.
>>
>> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> (org.apache.kafka.clients.
>> producer.internals.ErrorLoggingCallback)
>> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> record(s) expired due to timeout while requesting metadata from
>> brokers for big_ptns1_repl1_nozip-0
>>
>> I'm using as input a file either shakespeare's full works (about 5.4
>> meg ascii), or a much larger file of shakespear's full works
>> replicated 900 times to make it about 5GB. Lines are ascii and short,
>> and each line should be a single record when read in by the console
>> producer. I need to do some benchmarking on time and space and this
>> was my first try.
>>
>> As mentioned, data gets lost. I presume it is expected that any data
>> we pipe into the producer should arrive in the consumer, so if I do
>> this in one windows console:
>>
>> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
>> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
>> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>>
>> and this in another:
>>
>> kafka-console-producer.bat --broker-list localhost:9092  --topic
>> big_ptns1_repl1_nozip <
>> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>>
>> then the output file "single_all_shakespear_OUT.txt" should be
>> identical to the input file "complete_works_no_bare_lines.txt" except
>> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
>> about 130K in the output.
>> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>>
>> This can't be right surely and it

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
The console producer in the 0.10.0.0 release uses the old producer which 
doesn’t have “backoff”…it’s really just for testing simple producing:

object ConsoleProducer {

  def main(args: Array[String]) {

try {
val config = new ProducerConfig(args)
val reader = 
Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))

val producer =
  if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
  } else {
new NewShinyProducer(getNewProducerProps(config))
  }



On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:

I am curious how your producer is configured. The producer maintains an
internal buffer of messages to be sent over to the broker. Is it possible
you are terminating the producer code in your test before the buffer is
exhausted?

On Tue, Apr 18, 2017 at 5:29 PM, jan  wrote:

> Thanks to both of you. Some quick points:
>
> I'd expect there to be backpressure from the producer if the broker is
> busy ie. the broker would not respond to the console producer if the
> broker was too busy accept more messages, and the producer would hang
> on the socket. Alternatively I'd hope the console producer would have
> the sense to back off and retry but clearly(?) not.
> This behaviour is actually relevant to my old job so I need to know more.
>
> Perhaps the timeout mentioned in the error msg can just be upped?
>
> *Is* the claimed timeout relevant?
> > Batch containing 8 record(s) expired due to timeout while requesting
> metadata from brokers for big_ptns1_repl1_nozip-0
>
> Why is the producer expiring records?
>
> But I'm surprised this happened because my setup is one machine with
> everything running on it. No network. Also Kafka writes to the disk
> without an fsync (or its equivalent on windows) which means it just
> gets cached in ram before being lazily written to disk, and I've got
> plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
> so it grows to ~8GB but still, it need not hit disk at all (and the
> file goes into the windows memory, not java's).
> Maybe it is GC holding things up but I dunno, GC even for a second or
> two should not cause a socket failure, just delay the read, though I'm
> not an expert on this *at all*.
>
> I'll go over the answers tomorrow more carefully but thanks anyway!
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> > Kafka can't fix networking issues like latencies, blinking,
> unavailability
> > or any other weird stuff. Kafka promises you to persist data if data
> > reaches Kafka. Data delivery responsibility to kafka is on your side. 
You
> > fail to do it according to logs.
> >
> > 0.02% not 2%
> > You should check broker logs to figure out what went wrong. All things
> > happen on one machine as far as I understand. Maybe your brokers don't
> have
> > enough mem and they stuck because of GC and don't respond to producer.
> > Async producer fails to send data. That is why you observe data loss on
> > consumer side.
> >
> >
> > 2017-04-18 23:32 GMT+02:00 jan :
> >
> >> Hi Serega,
> >>
> >> > data didn't reach producer. So why should data appear in consumer?
> >>
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> >>
> >> > loss rate is more or less similar [...] Not so bad.
> >>
> >> That made me laugh at least.  Is kafka intended to be a reliable
> >> message delivery system, or is a 2% data loss officially acceptable?
> >>
> >> I've been reading the other threads and one says windows is really not
> >> supported, and certainly not for production. Perhaps that's the root
> >> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
> >> replicate the issue but I would like to know whether it *should* work
> >> in windows.
> >>
> >> cheers
> >>
> >> jan
> >>
> >> On 18/04/2017, Serega Sheypak  wrote:
> >> > Hi,
> >> >
> >> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> >> > (org.apache.kafka.clients.
> >> > producer.internals.ErrorLoggingCallback)
> >> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> >> > record(s) expired due to timeout while requesting metadata from
> >> > brokers for big_ptns1_repl1_nozip-0
> >> >
> >> > data didn't reach producer. So why should data appear in consumer?
> >> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03%
> (150mb

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
The “NewShinyProducer” is also deprecated.

On 4/18/17, 5:41 PM, "David Garcia"  wrote:

The console producer in the 0.10.0.0 release uses the old producer which 
doesn’t have “backoff”…it’s really just for testing simple producing:

object ConsoleProducer {

  def main(args: Array[String]) {

try {
val config = new ProducerConfig(args)
val reader = 
Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))

val producer =
  if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
  } else {
new NewShinyProducer(getNewProducerProps(config))
  }



On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:

I am curious how your producer is configured. The producer maintains an
internal buffer of messages to be sent over to the broker. Is it 
possible
you are terminating the producer code in your test before the buffer is
exhausted?

On Tue, Apr 18, 2017 at 5:29 PM, jan  wrote:

> Thanks to both of you. Some quick points:
>
> I'd expect there to be backpressure from the producer if the broker is
> busy ie. the broker would not respond to the console producer if the
> broker was too busy accept more messages, and the producer would hang
> on the socket. Alternatively I'd hope the console producer would have
> the sense to back off and retry but clearly(?) not.
> This behaviour is actually relevant to my old job so I need to know 
more.
>
> Perhaps the timeout mentioned in the error msg can just be upped?
>
> *Is* the claimed timeout relevant?
> > Batch containing 8 record(s) expired due to timeout while requesting
> metadata from brokers for big_ptns1_repl1_nozip-0
>
> Why is the producer expiring records?
>
> But I'm surprised this happened because my setup is one machine with
> everything running on it. No network. Also Kafka writes to the disk
> without an fsync (or its equivalent on windows) which means it just
> gets cached in ram before being lazily written to disk, and I've got
> plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
> so it grows to ~8GB but still, it need not hit disk at all (and the
> file goes into the windows memory, not java's).
> Maybe it is GC holding things up but I dunno, GC even for a second or
> two should not cause a socket failure, just delay the read, though I'm
> not an expert on this *at all*.
>
> I'll go over the answers tomorrow more carefully but thanks anyway!
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> >> err, isn't it supposed to? Isn't the loss of data a very serious 
error?
> > Kafka can't fix networking issues like latencies, blinking,
> unavailability
> > or any other weird stuff. Kafka promises you to persist data if data
> > reaches Kafka. Data delivery responsibility to kafka is on your 
side. You
> > fail to do it according to logs.
> >
> > 0.02% not 2%
> > You should check broker logs to figure out what went wrong. All 
things
> > happen on one machine as far as I understand. Maybe your brokers 
don't
> have
> > enough mem and they stuck because of GC and don't respond to 
producer.
> > Async producer fails to send data. That is why you observe data 
loss on
> > consumer side.
> >
> >
> > 2017-04-18 23:32 GMT+02:00 jan :
> >
> >> Hi Serega,
> >>
> >> > data didn't reach producer. So why should data appear in 
consumer?
> >>
> >> err, isn't it supposed to? Isn't the loss of data a very serious 
error?
> >>
> >> > loss rate is more or less similar [...] Not so bad.
> >>
> >> That made me laugh at least.  Is kafka intended to be a reliable
> >> message delivery system, or is a 2% data loss officially 
acceptable?
> >>
> >> I've been reading the other threads and one says windows is really 
not
> >> supported, and certainly not for production. Perhaps that's the 
ro

Re: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue

2017-04-18 Thread David Garcia
What do broker logs say around the time you send your messages?

On 4/18/17, 3:21 AM, "Ranjith Anbazhakan"  
wrote:

Hi,

I have been testing behavior of multiple broker instances of kafka in same 
machine and facing inconsistent behavior of producer sent records to buffer not 
being available in queue always.

Tried kafka versions:
0.10.2.0
0.10.1.0

Scenario:

1.   Ran two broker instances in same machine. Say broker 1 as initial 
leader, broker 2 as initial follower.

2.   Stopped broker 1. Now broker 2 became leader.

3.   Now producer sends records for a given topic TEST through send() 
method, followed by flush(). Records have to go to Broker 2 logically. No 
error/exception is thrown by code. (So it is assumed data has been sent 
successfully to buffer)

4.   When using command to check the records count for TEST topic in 
Broker 2, the sent records are not added to existing records count for that 
topic in queue.

a.   Used command - kafka-run-class.bat kafka.tools.GetOffsetShell 
--broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used 
topic)

NOTE: **Step 4 is not happening always and is inconsistent**. In the 
scenario when it does not work, if Broker 1 is made UP and then made DOWN, 
records are always been available in queue in Broker 2 post doing Step 3.

Configurations:
Overall Producer configurations: (most are default values)
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers =  , 
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 6
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

Broker 1: (server.properties)
broker.id=1
port=9091
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Broker 2: (server1.properties)
broker.id=2
port=9094
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Do let know if anyone have faced similar scenarios and why such an issue 
occurs? Let know if any more details are needed.

Thanks,
Ranjith
[Aspire Systems]

This e-mail message and any attachments are for the sole use of the 
intended recipient(s) and may contain proprietary, confidential, trade secret 
or privileged information. Any unauthorized review, use, disclosure or 

Re: Streaming to cloud platform

2017-04-20 Thread David Garcia
You can use kafka connect and something like bottled water if you want to be 
fancy.

On 4/20/17, 8:46 AM, "arkaprova.s...@cognizant.com" 
 wrote:

Hi,

I would like to ingest data from RDBMS to CLOUD platform like Azure 
HDInsight BLOB using Kafka . What will be the best practice in terms of 
architectural perspective . Please suggest.

Thanks,
Arkaprova

This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.




Re: the problems of parititons assigned to consumers

2017-04-25 Thread David Garcia
For Problem 1, you will probably have to either use the low-level API, and/or 
do manual partition assignment.

For problem 2, you can simply re-publish the messages to a new topic with more 
partitions…or, as in the first problem, just use the low level API.  You can 
also create more consumer groups (i.e. if you need to add a new consumer of 
your topics, but all consumer groups are “full”, just make a new one)

-David

On 4/24/17, 2:01 AM, "揣立武"  wrote:

Hi, all. There are two problems when we use kafka about partitions assigned
to consumers.

Problem 1: Partitions will be reassigned to consumers when consumer online
or offline, then the messages latency become higher.

Problem 2: If we have two partitions, only two consumers can consume
messages。How let more consumers to consume, but expand partitions.

Thanks!




Re: Kafka Stream vs Spark

2017-04-27 Thread David Garcia
Unlike spark, you don’t need an entire framework to deploy your job.  With 
Kstreams, you just start up an application and go.  You don’t need docker 
either…although containerizing your stuff is probably a good strategy for the 
purposes of deployment management (something you get with Yarn or a spark 
Cluster)…but you’re not tied to any one framework (e.g. you can use kubernetes, 
mesos, Yarn, or anything else) 

On 4/27/17, 10:52 AM, "Mina Aslani"  wrote:

Hi,

I created a kafka stream app and as I was informed I created a docker image
with the app and launched it as a container. However, I have couple of
questions:

- Would every Kafka streaming job require a new docker image and deployment
of the container/service?
- How should I structure things differently if I had more than one Kafka
streaming app/job?
- What are the advantages of using Kafka streaming over Spark streaming?
I'm asking b/c with Spark streaming I don't need to create and deploy a new
docker image every time I added or changed an/a app/job.

Best regards,
Mina




Re: Loss of Messages

2017-05-24 Thread David Garcia
What is your in-sync timeout set to?

-David

On 5/24/17, 5:57 AM, "Vinayak Sharma"  wrote:

Hi,

I am running Kafka as a 2 node cluster.
When I am scaling up and down 1 kafka broker I am experiencing loss of
messages at consumer end during reassignment of partitions.
Do you know what might be the cause or how we can fix this issue at our end?
I tried to search about this issue but couldn't find anything.

Regards,
Vinayak.

--