Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Magnus Edenhill
+1 on librdkafka interop

Minor nitpick:
 KAFKA-1781 (state required gradle version in README)  is included in the
Release notes but is not actually fixed


2015-01-29 6:22 GMT+01:00 Jun Rao :

> This is the third candidate for release of Apache Kafka 0.8.2.0.
>
> Release Notes for the 0.8.2.0 release
>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> (SHA256)
> checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>
> /***
>
> Thanks,
>
> Jun
>


Re: Routing modifications at runtime

2015-01-29 Thread David Morales
Hi Toni,

1. Kafka can create topics on the fly, in case you need it.

https://kafka.apache.org/08/configuration.html

auto.create.topics.enabletrueEnable auto creation of topic on the server.
If this is set to true then attempts to produce, consume, or fetch metadata
for a non-existent topic will automatically create it with the default
replication factor and number of partitions.



2. About topic selection based on rules/dictionary, this must be solved on
your side.

You can use custom-code in your app or an event transport solution, like
Flume.

Flume 1.6 now includes a sink for Kafka, and it already supports dynamic
topics (by using a preprocessor)

https://github.com/thilinamb/flume-ng-kafka-sink


   -

   *topic*[optional]
   - The topic in Kafka to which the messages will be published. If this
  topic is mentioned, every message will be published to the same topic. If
  dynamic topics are required, it's possible to use a preprocessor
instead of
  a static topic. It's mandatory that either of the parameters *topic*
  or *preprocessor* is provided, because the topic cannot be null when
  publishing to Kafka. If none of these parameters are provided,
the messages
  will be published to a default topic called default-flume-topic.



Regards.




2015-01-29 0:16 GMT+01:00 Lakshmanan Muthuraman :

> Hi Toni,
>
> Couple of thoughts.
>
> 1. Kafka behaviour need not be changed at run time. Your producers which
> push your MAC data into kafka should know to which topic it should write.
> Your producer can be flume, log stash or it can  be your own custom written
> java producer.
>
> As long as your producer know which topic to write, they can keep creating
> new topics as new MAC data comes through your pipeline.
>
> On Wed, Jan 28, 2015 at 12:10 PM, Toni Cebrián 
> wrote:
>
> > Hi,
> >
> > I'm starting to weight different alternatives for data ingestion and
> > I'd like to know whether Kafka meets the problem I have.
> > Say we have a set of devices each with its own MAC and then we
> receive
> > data in Kafka. There is a dictionary defined elsewhere that says each MAC
> > to which topic must publish. So I have basically 2 questions:
> > New MACs keep comming and the dictionary must be updated accordingly. How
> > could I change this Kafka behaviour during runtime?
> > A problem for the future. Say that dictionaries are so big that they
> don't
> > fit in memory. Are there any patterns for bookkeeping internal data
> > structures and how route to them?
> >
> > T.
> >
>



-- 

David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf




Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Re: Can't create a topic; can't delete it either

2015-01-29 Thread Joel Koshy
> If you can tell me where the find the logs I can check. I haven't restarted
> my brokers since the issue.

This will be specified in the log4j properties that you are using.

On Wed, Jan 28, 2015 at 12:01:01PM -0800, Sumit Rangwala wrote:
> On Tue, Jan 27, 2015 at 10:54 PM, Joel Koshy  wrote:
> 
> > Do you still have the controller and state change logs from the time
> > you originally tried to delete the topic?
> >
> >
> If you can tell me where the find the logs I can check. I haven't restarted
> my brokers since the issue.
> 
> Sumit
> 
> 
> 
> > On Tue, Jan 27, 2015 at 03:11:48PM -0800, Sumit Rangwala wrote:
> > > I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and
> > > consumers). delete.topic.enable=true on all brokers. replication factor
> > is
> > > < number of brokers. I see this issue with just one single topic, all
> > other
> > > topics are fine (creation and deletion). Even after a day it is still in
> > > marked for deletion stage. Let me know what other  information from the
> > > brokers or the zookeepers can help me debug this issue.
> > >
> > > On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira 
> > wrote:
> > >
> > > > Also, do you have delete.topic.enable=true on all brokers?
> > > >
> > > > The automatic topic creation can fail if the default number of
> > > > replicas is greater than number of available brokers. Check the
> > > > default.replication.factor parameter.
> > > >
> > > > Gwen
> > > >
> > > > On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy 
> > wrote:
> > > > > Which version of the broker are you using?
> > > > >
> > > > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
> > > > >> While running kafka in production I found an issue where a topic
> > wasn't
> > > > >> getting created even with auto topic enabled. I then went ahead and
> > > > created
> > > > >> the topic manually (from the command line). I then delete the topic,
> > > > again
> > > > >> manually. Now my broker won't allow me to either create *the* topic
> > or
> > > > >> delete *the* topic. (other topic creation and deletion is working
> > fine).
> > > > >>
> > > > >> The topic is in "marked for deletion" stage for more than 3 hours.
> > > > >>
> > > > >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka
> > > > --list
> > > > >> --topic GRIFFIN-TldAdFormat.csv-1422321736886
> > > > >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
> > > > >>
> > > > >> If this is a known issue, is there a workaround?
> > > > >>
> > > > >> Sumit
> > > > >
> > > >
> >
> >



Potential socket leak in kafka sync producer

2015-01-29 Thread ankit tyagi
Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while publishing
kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89] Fetching
topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
failed*
*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor gets
destroyed, somehow file descriptor is not getting cleared but when i did
explicitly ran the full gc, fd count got reduced by a signification amout.


Re: Routing modifications at runtime

2015-01-29 Thread Jeff Holoman
Yeah if you're into Flume you can definitely do per event
modification/routing in an interceptor with relative ease. I don't know the
size of the total MAC addresses to look up (or actually why a hash
partitioning scheme wouldn't just work, but w/e I assume you have your
reasons). There's kind of an example of doing this here:

http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/

In the example in the blog it uses HBase to read some "profile" data. You
could sub this with whatever other store you wanted (Redis, Cassandra,
whatever)

So you'd go:

Your Systems -> Kafka (raw data) -> Flume Source->Interceptor->Kafka
Channel(Raw data to the correct topic by setting the Flume Event header)

Of course you could code that all yourself too. Whatever floats your boat.
Writing interceptors is really easy and there's quite a few examples
around.

Jeff

On Thu, Jan 29, 2015 at 4:10 AM, David Morales  wrote:

> Hi Toni,
>
> 1. Kafka can create topics on the fly, in case you need it.
>
> https://kafka.apache.org/08/configuration.html
>
> auto.create.topics.enabletrueEnable auto creation of topic on the server.
> If this is set to true then attempts to produce, consume, or fetch metadata
> for a non-existent topic will automatically create it with the default
> replication factor and number of partitions.
>
>
>
> 2. About topic selection based on rules/dictionary, this must be solved on
> your side.
>
> You can use custom-code in your app or an event transport solution, like
> Flume.
>
> Flume 1.6 now includes a sink for Kafka, and it already supports dynamic
> topics (by using a preprocessor)
>
> https://github.com/thilinamb/flume-ng-kafka-sink
>
>
>-
>
>*topic*[optional]
>- The topic in Kafka to which the messages will be published. If this
>   topic is mentioned, every message will be published to the same
> topic. If
>   dynamic topics are required, it's possible to use a preprocessor
> instead of
>   a static topic. It's mandatory that either of the parameters *topic*
>   or *preprocessor* is provided, because the topic cannot be null when
>   publishing to Kafka. If none of these parameters are provided,
> the messages
>   will be published to a default topic called default-flume-topic.
>
>
>
> Regards.
>
>
>
>
> 2015-01-29 0:16 GMT+01:00 Lakshmanan Muthuraman :
>
> > Hi Toni,
> >
> > Couple of thoughts.
> >
> > 1. Kafka behaviour need not be changed at run time. Your producers which
> > push your MAC data into kafka should know to which topic it should write.
> > Your producer can be flume, log stash or it can  be your own custom
> written
> > java producer.
> >
> > As long as your producer know which topic to write, they can keep
> creating
> > new topics as new MAC data comes through your pipeline.
> >
> > On Wed, Jan 28, 2015 at 12:10 PM, Toni Cebrián 
> > wrote:
> >
> > > Hi,
> > >
> > > I'm starting to weight different alternatives for data ingestion
> and
> > > I'd like to know whether Kafka meets the problem I have.
> > > Say we have a set of devices each with its own MAC and then we
> > receive
> > > data in Kafka. There is a dictionary defined elsewhere that says each
> MAC
> > > to which topic must publish. So I have basically 2 questions:
> > > New MACs keep comming and the dictionary must be updated accordingly.
> How
> > > could I change this Kafka behaviour during runtime?
> > > A problem for the future. Say that dictionaries are so big that they
> > don't
> > > fit in memory. Are there any patterns for bookkeeping internal data
> > > structures and how route to them?
> > >
> > > T.
> > >
> >
>
>
>
> --
>
> David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
> 
>
>
> 
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> *
>



-- 
Jeff Holoman
Systems Engineer
678-612-9519


Detecting lost connection in high level consumer

2015-01-29 Thread harikiran
Hi

I am using the 0811 Kafka High level consumer and I have configured "
consumer.timeout.ms" to a value that is not -1, say 5000ms.

I create the consumer iterator and invoke hasNext() method on it.

Irrespective of whether kafka broker was shutdown or there was no message
written to kafka, I see a ConsumerTimeOut exception after 5000ms.

My goal is to detect lost connection and reconnect but I cannot figure out
a way.

Any kind of help is appreciated.

Thanks
Hari


Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Guozhang Wang
Sorry my previous link was not complete:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
?



On Wed, Jan 28, 2015 at 10:56 PM, Su She  wrote:

> Thank you Dillian and Guozhang for the responses.
>
> Yes, Dillian you are understanding my issue correctly. I am not sure what
> the best approach to this is...I'm not sure if there's a way to whitelist
> certain IPs, create a VPC, use the cluster launcher as the kafka
> zookeeper/broker. I guess this is more of an AWS question, but I thought
> this is a problem some Kafka users must have solved already.
>
> Edit: I just tried using the cluster launcher as an intermediate. I started
> Zookeeper/Kafka Server on my Cluster launcher and then created a
> topic/produced messages. I set up a kafka consumer on one of my private EC2
> instances, but I got a No Route to host error. I pinged the cluster
> launcher <-> private instance and it works fine. I was hoping I could use
> this is as a temporary solution...any suggestions on this issue would also
> be greatly appreciated. Thanks!
>
> Best,
>
> Su
>
>
> On Wed, Jan 28, 2015 at 9:11 PM, Guozhang Wang  wrote:
>
> > Su,
> >
> > Does this help for your case?
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> >
> > Guozhang
> >
> > On Wed, Jan 28, 2015 at 3:36 PM, Dillian Murphey <
> crackshotm...@gmail.com>
> > wrote:
> >
> > > Am I understanding your question correctly... You're asking how do you
> > > establish connectivity to an instance in a private subnet from the
> > outside
> > > world?  Are you thinking in terms of zookeeper or just general aws
> > network
> > > connectivity?
> > >
> > > On Wed, Jan 28, 2015 at 11:03 AM, Su She 
> wrote:
> > >
> > > > Hello All,
> > > >
> > > > I have set up a cluster of EC2 instances using this method:
> > > >
> > > >
> > > >
> > >
> >
> http://blogs.aws.amazon.com/bigdata/post/Tx2D0J7QOVRJBRX/Deploying-Cloudera-s-Enterprise-Data-Hub-on-AWS
> > > >
> > > > As you can see the instances are w/in a private subnet. I was
> wondering
> > > if
> > > > anyone has any advice on how I can set up a Kafka zookeeper/server on
> > an
> > > > instance that receives messages from a Kafka Producer outside of the
> > > > private subnet. I have tried using the cluster launcher, but I feel
> > like
> > > it
> > > > is not a best practice and only a temporary situation.
> > > >
> > > > Thank you for the help!
> > > >
> > > > Best,
> > > >
> > > > Su
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: Resilient Producer

2015-01-29 Thread David Morales
Existing "tail" source is not the best choice in your scenario, as you have
pointed out.

SpoolDir could be a solution if your log file rotation policy is very low
(5 minutes, for example), but then you have to deal with a huge number of
files in the folder (slower listings).

There is a proposal for a new approach, something that combines the best of
"tail" and "spoolDir". Take a look here:

https://issues.apache.org/jira/browse/FLUME-2498




2015-01-29 0:24 GMT+01:00 Lakshmanan Muthuraman :

> We have been using Flume to solve a very similar usecase. Our servers write
> the log files to a local file system, and then we have flume agent which
> ships the data to kafka.
>
> Flume you can use as exec source running tail. Though the exec source runs
> well with tail, there are issues if the agent goes down or the file channel
> starts building up. If the agent goes down, you can request flume exec tail
> source to go back n number of lines or read from beginning of the file. The
> challenge is we roll our log files on a daily basis. What if goes down in
> the evening. We need to go back to the entire days worth of data for
> reprocessing which slows down the data flow. We can also go back arbitarily
> number of lines, but then we dont know what is the right number to go back.
> This is kind of challenge for us. We have tried spooling directory. Which
> works, but we need to have a different log file rotation policy. We
> considered evening going a file rotation for a minute, but it will  still
> affect the real time data flow in our kafka--->storm-->Elastic search
> pipeline with a minute delay.
>
> We are going to do a poc on logstash to see how this solves the problem of
> flume.
>
> On Wed, Jan 28, 2015 at 10:39 AM, Fernando O.  wrote:
>
> > Hi all,
> > I'm evaluating using Kafka.
> >
> > I liked this thing of Facebook scribe that you log to your own machine
> and
> > then there's a separate process that forwards messages to the central
> > logger.
> >
> > With Kafka it seems that I have to embed the publisher in my app, and
> deal
> > with any communication problem managing that on the producer side.
> >
> > I googled quite a bit trying to find a project that would basically use
> > daemon that parses a log file and send the lines to the Kafka cluster
> > (something like a tail file.log but instead of redirecting the output to
> > the console: send it to kafka)
> >
> > Does anyone knows about something like that?
> >
> >
> > Thanks!
> > Fernando.
> >
>



-- 

David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf




Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Re: One or multiple instances of MM to aggregate kafka data to one hadoop

2015-01-29 Thread Jon Bringhurst
Hey Mingjie,

Here's how we have our mirror makers configured. For some context, let me try 
to describe this using the example datacenter layout as described in:

https://engineering.linkedin.com/samza/operating-apache-samza-scale

In that example, there are four data centers (A, B, C, and D). However, we only 
need Datacenter A and B to describe this.

Datacenter A mirrors data from local(A) to aggregate(A) as well as local(B) to 
aggregate(A).

Datacenter B mirrors data from local(B) to aggregate(B) as well as local(A) to 
aggregate(B).

The diagram in the article should make easy to visualize. Note that the mirror 
makers are running in the destination datacenter and pull the traffic in.

Let's say we have two physical machines (lets call them servers 1 and 2 in 
datacenter A; servers 3 and 4 in datacenter B) in each datacenter dedicated to 
running mirror makers. This is how the layout of mirror maker processes would 
look like:

* Datacenter A MirrorMaker Cluster
* Server 1
* local(A) to aggregate(A) MirrorMaker Instance
* local(B) to aggregate(A) MirrorMaker Instance
* Server 2
* local(A) to aggregate(A) MirrorMaker Instance
* local(B) to aggregate(A) MirrorMaker Instance

* Datacenter B MirrorMaker Cluster
* Server 3
* local(B) to aggregate(B) MirrorMaker Instance
* local(A) to aggregate(B) MirrorMaker Instance
* Server 4
* local(B) to aggregate(B) MirrorMaker Instance
* local(A) to aggregate(B) MirrorMaker Instance

The benefit of this layout is that if the load becomes too high, we would then 
add on another server to each cluster that looks exactly like the others in the 
cluster (easy to provision). If you get really huge, you can start creating 
multiple mirror maker clusters that each handle a specific flow (but still have 
homogeneous processes within each cluster).

Of course, YMMV, but this is what works well for us. :)

-Jon

On Jan 28, 2015, at 3:54 PM, Daniel Compton  
wrote:

> Hi Mingjie
> 
> I would recommend the first option of running one mirrormaker instance
> pulling from multiple DC's.
> 
> A single MM instance will be able to make more efficient use of the machine
> resources in two ways:
> 1. You will only have to run one process which will be able to be allocated
> the full amount of resources
> 2. Within the process, if you run enough consumer threads, I think that
> they should be able to rebalance and pick up the load if they don't have
> anything to do. I'm not 100% sure on this, but 1 still holds.
> 
> A single MM instance should handle connectivity issues with one DC without
> affecting the rest of the consumer threads for other DC's.
> 
> You would gain process isolation running a MM per DC, but this would raise
> the operational burden and resource requirements. I'm not sure what benefit
> you'd actually get from process isolation, so I'd recommend against it.
> However I'd be interested to hear if others do things differently.
> 
> Daniel.
> 
> On Thu Jan 29 2015 at 11:14:29 AM Mingjie Lai  wrote:
> 
>> Hi.
>> 
>> We have a pretty typical data ingestion use case that we use mirrormaker at
>> one hadoop data center, to mirror kafka data from multiple remote
>> application data centers. I know mirrormaker can support to consume kafka
>> data from multiple kafka source, by one instance at one physical node. By
>> this, we can give one instance of mm multiple consumer config files, so it
>> can consume data from muti places.
>> 
>> Another option is to have multiple mirrormaker instances at one node, each
>> mm instance is dedicated to grab data from one single source data center.
>> Certainly there will be multiple mm nodes to balance the load.
>> 
>> The second option looks better since it kind of has an isolation for
>> different data centers.
>> 
>> Any recommendation for this kind of data aggregation cases?
>> 
>> Still new to kafka and mirrormaker. Welcome any information.
>> 
>> Thanks,
>> Mingjie
>> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Su She
Thank you Guozhang for an updated link.

I read the answer for "Why can't my consumers/producers connect to the
brokers?". I am confused on a couple things. I can ping the private IP of
 A (Cluster Launcher) from B (EC2 instance). I can't ping the public ip of
A (or google.com) from B which makes sense as B is in a private subnet. But
when I try to connect to the zookeeper at private.ip.A:2181 I get a No
Route to Host error. Is there a reason why I can ping private.ip.A but not
connect to private.ip.A:2181?

I have not changed any of my server or producer properties. I have tried to
change advertised host name, but no luck.

Thanks for the help!



On Thu, Jan 29, 2015 at 8:12 AM, Guozhang Wang  wrote:

> Sorry my previous link was not complete:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
> ?
>
>
>
> On Wed, Jan 28, 2015 at 10:56 PM, Su She  wrote:
>
> > Thank you Dillian and Guozhang for the responses.
> >
> > Yes, Dillian you are understanding my issue correctly. I am not sure what
> > the best approach to this is...I'm not sure if there's a way to whitelist
> > certain IPs, create a VPC, use the cluster launcher as the kafka
> > zookeeper/broker. I guess this is more of an AWS question, but I thought
> > this is a problem some Kafka users must have solved already.
> >
> > Edit: I just tried using the cluster launcher as an intermediate. I
> started
> > Zookeeper/Kafka Server on my Cluster launcher and then created a
> > topic/produced messages. I set up a kafka consumer on one of my private
> EC2
> > instances, but I got a No Route to host error. I pinged the cluster
> > launcher <-> private instance and it works fine. I was hoping I could use
> > this is as a temporary solution...any suggestions on this issue would
> also
> > be greatly appreciated. Thanks!
> >
> > Best,
> >
> > Su
> >
> >
> > On Wed, Jan 28, 2015 at 9:11 PM, Guozhang Wang 
> wrote:
> >
> > > Su,
> > >
> > > Does this help for your case?
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> > >
> > > Guozhang
> > >
> > > On Wed, Jan 28, 2015 at 3:36 PM, Dillian Murphey <
> > crackshotm...@gmail.com>
> > > wrote:
> > >
> > > > Am I understanding your question correctly... You're asking how do
> you
> > > > establish connectivity to an instance in a private subnet from the
> > > outside
> > > > world?  Are you thinking in terms of zookeeper or just general aws
> > > network
> > > > connectivity?
> > > >
> > > > On Wed, Jan 28, 2015 at 11:03 AM, Su She 
> > wrote:
> > > >
> > > > > Hello All,
> > > > >
> > > > > I have set up a cluster of EC2 instances using this method:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> http://blogs.aws.amazon.com/bigdata/post/Tx2D0J7QOVRJBRX/Deploying-Cloudera-s-Enterprise-Data-Hub-on-AWS
> > > > >
> > > > > As you can see the instances are w/in a private subnet. I was
> > wondering
> > > > if
> > > > > anyone has any advice on how I can set up a Kafka zookeeper/server
> on
> > > an
> > > > > instance that receives messages from a Kafka Producer outside of
> the
> > > > > private subnet. I have tried using the cluster launcher, but I feel
> > > like
> > > > it
> > > > > is not a best practice and only a temporary situation.
> > > > >
> > > > > Thank you for the help!
> > > > >
> > > > > Best,
> > > > >
> > > > > Su
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Resilient Producer

2015-01-29 Thread Lakshmanan Muthuraman
Thanks David. This looks to be interesting. Will definitely test this out
to see whether this solves our problem.

On Thu, Jan 29, 2015 at 8:29 AM, David Morales  wrote:

> Existing "tail" source is not the best choice in your scenario, as you have
> pointed out.
>
> SpoolDir could be a solution if your log file rotation policy is very low
> (5 minutes, for example), but then you have to deal with a huge number of
> files in the folder (slower listings).
>
> There is a proposal for a new approach, something that combines the best of
> "tail" and "spoolDir". Take a look here:
>
> https://issues.apache.org/jira/browse/FLUME-2498
>
>
>
>
> 2015-01-29 0:24 GMT+01:00 Lakshmanan Muthuraman :
>
> > We have been using Flume to solve a very similar usecase. Our servers
> write
> > the log files to a local file system, and then we have flume agent which
> > ships the data to kafka.
> >
> > Flume you can use as exec source running tail. Though the exec source
> runs
> > well with tail, there are issues if the agent goes down or the file
> channel
> > starts building up. If the agent goes down, you can request flume exec
> tail
> > source to go back n number of lines or read from beginning of the file.
> The
> > challenge is we roll our log files on a daily basis. What if goes down in
> > the evening. We need to go back to the entire days worth of data for
> > reprocessing which slows down the data flow. We can also go back
> arbitarily
> > number of lines, but then we dont know what is the right number to go
> back.
> > This is kind of challenge for us. We have tried spooling directory. Which
> > works, but we need to have a different log file rotation policy. We
> > considered evening going a file rotation for a minute, but it will  still
> > affect the real time data flow in our kafka--->storm-->Elastic search
> > pipeline with a minute delay.
> >
> > We are going to do a poc on logstash to see how this solves the problem
> of
> > flume.
> >
> > On Wed, Jan 28, 2015 at 10:39 AM, Fernando O.  wrote:
> >
> > > Hi all,
> > > I'm evaluating using Kafka.
> > >
> > > I liked this thing of Facebook scribe that you log to your own machine
> > and
> > > then there's a separate process that forwards messages to the central
> > > logger.
> > >
> > > With Kafka it seems that I have to embed the publisher in my app, and
> > deal
> > > with any communication problem managing that on the producer side.
> > >
> > > I googled quite a bit trying to find a project that would basically use
> > > daemon that parses a log file and send the lines to the Kafka cluster
> > > (something like a tail file.log but instead of redirecting the output
> to
> > > the console: send it to kafka)
> > >
> > > Does anyone knows about something like that?
> > >
> > >
> > > Thanks!
> > > Fernando.
> > >
> >
>
>
>
> --
>
> David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
> 
>
>
> 
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> *
>


LinkedIn Engineering Blog Post - Current and Future

2015-01-29 Thread Jon Bringhurst
Here's an overview of what LinkedIn plans to concentrate on in the upcoming 
year.

https://engineering.linkedin.com/kafka/kafka-linkedin-%E2%80%93-current-and-future

-Jon


signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Su She
I tried a new approach and have started ec2 instances in a public subnet
rather than a private subnet. However, now when I try to start zookeeper I
get this error. How can I go about solving this issue? Thank you.

ERROR Unexpected exception, exiting abnormally
(org.apache.zookeeper.server.ZooKeeperServerMain)
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at
org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
at
org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
at
org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
at
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
at
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)



On Thu, Jan 29, 2015 at 9:44 AM, Su She  wrote:

> Thank you Guozhang for an updated link.
>
> I read the answer for "Why can't my consumers/producers connect to the
> brokers?". I am confused on a couple things. I can ping the private IP of
>  A (Cluster Launcher) from B (EC2 instance). I can't ping the public ip of
> A (or google.com) from B which makes sense as B is in a private subnet.
> But when I try to connect to the zookeeper at private.ip.A:2181 I get a No
> Route to Host error. Is there a reason why I can ping private.ip.A but not
> connect to private.ip.A:2181?
>
> I have not changed any of my server or producer properties. I have tried
> to change advertised host name, but no luck.
>
> Thanks for the help!
>
>
>
> On Thu, Jan 29, 2015 at 8:12 AM, Guozhang Wang  wrote:
>
>> Sorry my previous link was not complete:
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
>> ?
>>
>>
>>
>> On Wed, Jan 28, 2015 at 10:56 PM, Su She  wrote:
>>
>> > Thank you Dillian and Guozhang for the responses.
>> >
>> > Yes, Dillian you are understanding my issue correctly. I am not sure
>> what
>> > the best approach to this is...I'm not sure if there's a way to
>> whitelist
>> > certain IPs, create a VPC, use the cluster launcher as the kafka
>> > zookeeper/broker. I guess this is more of an AWS question, but I thought
>> > this is a problem some Kafka users must have solved already.
>> >
>> > Edit: I just tried using the cluster launcher as an intermediate. I
>> started
>> > Zookeeper/Kafka Server on my Cluster launcher and then created a
>> > topic/produced messages. I set up a kafka consumer on one of my private
>> EC2
>> > instances, but I got a No Route to host error. I pinged the cluster
>> > launcher <-> private instance and it works fine. I was hoping I could
>> use
>> > this is as a temporary solution...any suggestions on this issue would
>> also
>> > be greatly appreciated. Thanks!
>> >
>> > Best,
>> >
>> > Su
>> >
>> >
>> > On Wed, Jan 28, 2015 at 9:11 PM, Guozhang Wang 
>> wrote:
>> >
>> > > Su,
>> > >
>> > > Does this help for your case?
>> > >
>> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
>> > >
>> > > Guozhang
>> > >
>> > > On Wed, Jan 28, 2015 at 3:36 PM, Dillian Murphey <
>> > crackshotm...@gmail.com>
>> > > wrote:
>> > >
>> > > > Am I understanding your question correctly... You're asking how do
>> you
>> > > > establish connectivity to an instance in a private subnet from the
>> > > outside
>> > > > world?  Are you thinking in terms of zookeeper or just general aws
>> > > network
>> > > > connectivity?
>> > > >
>> > > > On Wed, Jan 28, 2015 at 11:03 AM, Su She 
>> > wrote:
>> > > >
>> > > > > Hello All,
>> > > > >
>> > > > > I have set up a cluster of EC2 instances using this method:
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://blogs.aws.amazon.com/bigdata/post/Tx2D0J7QOVRJBRX/Deploying-Cloudera-s-Enterprise-Data-Hub-on-AWS
>> > > > >
>> > > > > As you can see the instances are w/in a private subnet. I was
>> > wondering
>> > > > if
>> > > > > anyone has any advice on how I can set up a Kafka
>> zookeeper/server on
>> > > an
>> > > > > instance that receives messages from a Kafka Producer outside of
>> the
>> > > > > private subnet. I have tried using the cluster launcher, but I
>> feel
>> > > like
>> > > > it
>> > > > > is not a best practice and only a temporary situation.
>> > > > >
>> > > > > Thank you for the help!
>> > > > >
>> > > > > Best,
>> > > > >
>> > > > > Su
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Can't start Zookeeper on a EC2 instance in a public subnet

2015-01-29 Thread Su She
Hello Everyone,

I previously had my EC2 instances in a private subnet, but I spun up a new
cluster in a public subnet. However, it seems to have taken me a step back
as now I can't even start the zookeeper. I am getting this error:

ERROR Unexpected exception, exiting abnormally
(org.apache.zookeeper.server.ZooKeeperServerMain)
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
at
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at
org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
at
org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
at
org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
at
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
at
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
I am confused as to how to move forward as the zookeeper.properties file
does not allow for many configurations. Thank you!


Re: Routing modifications at runtime

2015-01-29 Thread Toni Cebrián
Thank you all.

I'll have a look at flume and also at akka-http  and akka-streams since the
MACs will send the data to a REST endpoint.
El 29/01/2015 16:10, "Jeff Holoman"  escribió:

> Yeah if you're into Flume you can definitely do per event
> modification/routing in an interceptor with relative ease. I don't know the
> size of the total MAC addresses to look up (or actually why a hash
> partitioning scheme wouldn't just work, but w/e I assume you have your
> reasons). There's kind of an example of doing this here:
>
>
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
>
> In the example in the blog it uses HBase to read some "profile" data. You
> could sub this with whatever other store you wanted (Redis, Cassandra,
> whatever)
>
> So you'd go:
>
> Your Systems -> Kafka (raw data) -> Flume Source->Interceptor->Kafka
> Channel(Raw data to the correct topic by setting the Flume Event header)
>
> Of course you could code that all yourself too. Whatever floats your boat.
> Writing interceptors is really easy and there's quite a few examples
> around.
>
> Jeff
>
> On Thu, Jan 29, 2015 at 4:10 AM, David Morales 
> wrote:
>
> > Hi Toni,
> >
> > 1. Kafka can create topics on the fly, in case you need it.
> >
> > https://kafka.apache.org/08/configuration.html
> >
> > auto.create.topics.enabletrueEnable auto creation of topic on the server.
> > If this is set to true then attempts to produce, consume, or fetch
> metadata
> > for a non-existent topic will automatically create it with the default
> > replication factor and number of partitions.
> >
> >
> >
> > 2. About topic selection based on rules/dictionary, this must be solved
> on
> > your side.
> >
> > You can use custom-code in your app or an event transport solution, like
> > Flume.
> >
> > Flume 1.6 now includes a sink for Kafka, and it already supports dynamic
> > topics (by using a preprocessor)
> >
> > https://github.com/thilinamb/flume-ng-kafka-sink
> >
> >
> >-
> >
> >*topic*[optional]
> >- The topic in Kafka to which the messages will be published. If this
> >   topic is mentioned, every message will be published to the same
> > topic. If
> >   dynamic topics are required, it's possible to use a preprocessor
> > instead of
> >   a static topic. It's mandatory that either of the parameters
> *topic*
> >   or *preprocessor* is provided, because the topic cannot be null
> when
> >   publishing to Kafka. If none of these parameters are provided,
> > the messages
> >   will be published to a default topic called default-flume-topic.
> >
> >
> >
> > Regards.
> >
> >
> >
> >
> > 2015-01-29 0:16 GMT+01:00 Lakshmanan Muthuraman :
> >
> > > Hi Toni,
> > >
> > > Couple of thoughts.
> > >
> > > 1. Kafka behaviour need not be changed at run time. Your producers
> which
> > > push your MAC data into kafka should know to which topic it should
> write.
> > > Your producer can be flume, log stash or it can  be your own custom
> > written
> > > java producer.
> > >
> > > As long as your producer know which topic to write, they can keep
> > creating
> > > new topics as new MAC data comes through your pipeline.
> > >
> > > On Wed, Jan 28, 2015 at 12:10 PM, Toni Cebrián  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm starting to weight different alternatives for data ingestion
> > and
> > > > I'd like to know whether Kafka meets the problem I have.
> > > > Say we have a set of devices each with its own MAC and then we
> > > receive
> > > > data in Kafka. There is a dictionary defined elsewhere that says each
> > MAC
> > > > to which topic must publish. So I have basically 2 questions:
> > > > New MACs keep comming and the dictionary must be updated accordingly.
> > How
> > > > could I change this Kafka behaviour during runtime?
> > > > A problem for the future. Say that dictionaries are so big that they
> > > don't
> > > > fit in memory. Are there any patterns for bookkeeping internal data
> > > > structures and how route to them?
> > > >
> > > > T.
> > > >
> > >
> >
> >
> >
> > --
> >
> > David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
> > 
> >
> >
> > 
> > Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> > 28224 Pozuelo de Alarcón, Madrid
> > Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> > *
> >
>
>
>
> --
> Jeff Holoman
> Systems Engineer
> 678-612-9519
>


Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Jonathan Natkins
Hey Su,

That exception indicates that there's something else already running on the
port that you're trying to start up Zookeeper on. The quickest way to
figure out what's causing the conflict is to run netstat and look for the
port:

[root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
tcp0  0 0.0.0.0:21810.0.0.0:*
LISTEN  9814/java
[root@ip-10-0-0-45 ~]# *ps 9814*
  PID TTY  STAT   TIME COMMAND
 9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp
/var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin 


On Thu, Jan 29, 2015 at 11:47 AM, Su She  wrote:

> I tried a new approach and have started ec2 instances in a public subnet
> rather than a private subnet. However, now when I try to start zookeeper I
> get this error. How can I go about solving this issue? Thank you.
>
> ERROR Unexpected exception, exiting abnormally
> (org.apache.zookeeper.server.ZooKeeperServerMain)
> java.net.BindException: Address already in use
> at sun.nio.ch.Net.bind0(Native Method)
> at sun.nio.ch.Net.bind(Net.java:444)
> at sun.nio.ch.Net.bind(Net.java:436)
> at
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> at
>
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
> at
>
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
>
>
>
> On Thu, Jan 29, 2015 at 9:44 AM, Su She  wrote:
>
> > Thank you Guozhang for an updated link.
> >
> > I read the answer for "Why can't my consumers/producers connect to the
> > brokers?". I am confused on a couple things. I can ping the private IP of
> >  A (Cluster Launcher) from B (EC2 instance). I can't ping the public ip
> of
> > A (or google.com) from B which makes sense as B is in a private subnet.
> > But when I try to connect to the zookeeper at private.ip.A:2181 I get a
> No
> > Route to Host error. Is there a reason why I can ping private.ip.A but
> not
> > connect to private.ip.A:2181?
> >
> > I have not changed any of my server or producer properties. I have tried
> > to change advertised host name, but no luck.
> >
> > Thanks for the help!
> >
> >
> >
> > On Thu, Jan 29, 2015 at 8:12 AM, Guozhang Wang 
> wrote:
> >
> >> Sorry my previous link was not complete:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
> >> ?
> >>
> >>
> >>
> >> On Wed, Jan 28, 2015 at 10:56 PM, Su She  wrote:
> >>
> >> > Thank you Dillian and Guozhang for the responses.
> >> >
> >> > Yes, Dillian you are understanding my issue correctly. I am not sure
> >> what
> >> > the best approach to this is...I'm not sure if there's a way to
> >> whitelist
> >> > certain IPs, create a VPC, use the cluster launcher as the kafka
> >> > zookeeper/broker. I guess this is more of an AWS question, but I
> thought
> >> > this is a problem some Kafka users must have solved already.
> >> >
> >> > Edit: I just tried using the cluster launcher as an intermediate. I
> >> started
> >> > Zookeeper/Kafka Server on my Cluster launcher and then created a
> >> > topic/produced messages. I set up a kafka consumer on one of my
> private
> >> EC2
> >> > instances, but I got a No Route to host error. I pinged the cluster
> >> > launcher <-> private instance and it works fine. I was hoping I could
> >> use
> >> > this is as a temporary solution...any suggestions on this issue would
> >> also
> >> > be greatly appreciated. Thanks!
> >> >
> >> > Best,
> >> >
> >> > Su
> >> >
> >> >
> >> > On Wed, Jan 28, 2015 at 9:11 PM, Guozhang Wang 
> >> wrote:
> >> >
> >> > > Su,
> >> > >
> >> > > Does this help for your case?
> >> > >
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> >> > >
> >> > > Guozhang
> >> > >
> >> > > On Wed, Jan 28, 2015 at 3:36 PM, Dillian Murphey <
> >> > crackshotm...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Am I understanding your question correctly... You're asking how do
> >> you
> >> > > > establish connectivity to an instance in a private subnet from the
> >> > > outside
> >> > > > world?  Are you thinking in terms of zookeeper or just general aws
> >> > > network
> >> > > >

Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Su She
Thanks Jonathan!

This was the result, would it be okay for me to kill 3544? Or do I try to
access a new port?:

*sudo netstat -tulpn | grep 2181*

tcp0  0 0.0.0.0:21810.0.0.0:*
LISTEN  3544/java


*ps 3544*

PID TTY  STAT   TIME COMMAND
3544 ?Sl 0:19 /usr/java/jdk1.7.0_67-cloudera/bin/java -



On Thu, Jan 29, 2015 at 12:31 PM, Jonathan Natkins 
wrote:

> Hey Su,
>
> That exception indicates that there's something else already running on the
> port that you're trying to start up Zookeeper on. The quickest way to
> figure out what's causing the conflict is to run netstat and look for the
> port:
>
> [root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
> tcp0  0 0.0.0.0:21810.0.0.0:*
> LISTEN  9814/java
> [root@ip-10-0-0-45 ~]# *ps 9814*
>   PID TTY  STAT   TIME COMMAND
>  9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp
>
> /var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin 
>
>
> On Thu, Jan 29, 2015 at 11:47 AM, Su She  wrote:
>
> > I tried a new approach and have started ec2 instances in a public subnet
> > rather than a private subnet. However, now when I try to start zookeeper
> I
> > get this error. How can I go about solving this issue? Thank you.
> >
> > ERROR Unexpected exception, exiting abnormally
> > (org.apache.zookeeper.server.ZooKeeperServerMain)
> > java.net.BindException: Address already in use
> > at sun.nio.ch.Net.bind0(Native Method)
> > at sun.nio.ch.Net.bind(Net.java:444)
> > at sun.nio.ch.Net.bind(Net.java:436)
> > at
> > sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> > at
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> > at
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> > at
> >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> > at
> >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> > at
> >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> > at
> >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> > at
> >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
> > at
> >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
> >
> >
> >
> > On Thu, Jan 29, 2015 at 9:44 AM, Su She  wrote:
> >
> > > Thank you Guozhang for an updated link.
> > >
> > > I read the answer for "Why can't my consumers/producers connect to the
> > > brokers?". I am confused on a couple things. I can ping the private IP
> of
> > >  A (Cluster Launcher) from B (EC2 instance). I can't ping the public ip
> > of
> > > A (or google.com) from B which makes sense as B is in a private
> subnet.
> > > But when I try to connect to the zookeeper at private.ip.A:2181 I get a
> > No
> > > Route to Host error. Is there a reason why I can ping private.ip.A but
> > not
> > > connect to private.ip.A:2181?
> > >
> > > I have not changed any of my server or producer properties. I have
> tried
> > > to change advertised host name, but no luck.
> > >
> > > Thanks for the help!
> > >
> > >
> > >
> > > On Thu, Jan 29, 2015 at 8:12 AM, Guozhang Wang 
> > wrote:
> > >
> > >> Sorry my previous link was not complete:
> > >>
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
> > >> ?
> > >>
> > >>
> > >>
> > >> On Wed, Jan 28, 2015 at 10:56 PM, Su She 
> wrote:
> > >>
> > >> > Thank you Dillian and Guozhang for the responses.
> > >> >
> > >> > Yes, Dillian you are understanding my issue correctly. I am not sure
> > >> what
> > >> > the best approach to this is...I'm not sure if there's a way to
> > >> whitelist
> > >> > certain IPs, create a VPC, use the cluster launcher as the kafka
> > >> > zookeeper/broker. I guess this is more of an AWS question, but I
> > thought
> > >> > this is a problem some Kafka users must have solved already.
> > >> >
> > >> > Edit: I just tried using the cluster launcher as an intermediate. I
> > >> started
> > >> > Zookeeper/Kafka Server on my Cluster launcher and then created a
> > >> > topic/produced messages. I set up a kafka consumer on one of my
> > private
> > >> EC2
> > >> > instances, but I got a No Route to host error. I pinged the cluster
> > >> > launcher <-> private instance and it works fine. I was hoping I
> could
> > >> use
> > >> > this is as a temporary solution...any suggestions on this issue
> would
> > >> also
> > >> > be greatly appreciated. Thanks!
> > >> >
> > >> >

Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Jonathan Natkins
Hey Su,

It's hard to say, because you didn't copy the entire command string, but it
looks like it's probably a process that's being managed by Cloudera
Manager. Do you have a Cloudera Manager instance that is running a
Zookeeper quorum? If so, that's where the conflict is. You can either
reconfigure your Zookeeper in CM, or stop the service, and start up a
Zookeeper manually to try to bind to the public IPs.

Thanks,
Natty

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin 


On Thu, Jan 29, 2015 at 12:36 PM, Su She  wrote:

> Thanks Jonathan!
>
> This was the result, would it be okay for me to kill 3544? Or do I try to
> access a new port?:
>
> *sudo netstat -tulpn | grep 2181*
>
> tcp0  0 0.0.0.0:21810.0.0.0:*
> LISTEN  3544/java
>
>
> *ps 3544*
>
> PID TTY  STAT   TIME COMMAND
> 3544 ?Sl 0:19 /usr/java/jdk1.7.0_67-cloudera/bin/java -
>
>
>
> On Thu, Jan 29, 2015 at 12:31 PM, Jonathan Natkins 
> wrote:
>
> > Hey Su,
> >
> > That exception indicates that there's something else already running on
> the
> > port that you're trying to start up Zookeeper on. The quickest way to
> > figure out what's causing the conflict is to run netstat and look for the
> > port:
> >
> > [root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
> > tcp0  0 0.0.0.0:21810.0.0.0:*
> > LISTEN  9814/java
> > [root@ip-10-0-0-45 ~]# *ps 9814*
> >   PID TTY  STAT   TIME COMMAND
> >  9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp
> >
> >
> /var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z
> >
> > Jonathan "Natty" Natkins
> > StreamSets | Customer Engagement Engineer
> > mobile: 609.577.1600 | linkedin 
> >
> >
> > On Thu, Jan 29, 2015 at 11:47 AM, Su She  wrote:
> >
> > > I tried a new approach and have started ec2 instances in a public
> subnet
> > > rather than a private subnet. However, now when I try to start
> zookeeper
> > I
> > > get this error. How can I go about solving this issue? Thank you.
> > >
> > > ERROR Unexpected exception, exiting abnormally
> > > (org.apache.zookeeper.server.ZooKeeperServerMain)
> > > java.net.BindException: Address already in use
> > > at sun.nio.ch.Net.bind0(Native Method)
> > > at sun.nio.ch.Net.bind(Net.java:444)
> > > at sun.nio.ch.Net.bind(Net.java:436)
> > > at
> > >
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> > > at
> > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> > > at
> > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
> > > at
> > >
> > >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
> > >
> > >
> > >
> > > On Thu, Jan 29, 2015 at 9:44 AM, Su She  wrote:
> > >
> > > > Thank you Guozhang for an updated link.
> > > >
> > > > I read the answer for "Why can't my consumers/producers connect to
> the
> > > > brokers?". I am confused on a couple things. I can ping the private
> IP
> > of
> > > >  A (Cluster Launcher) from B (EC2 instance). I can't ping the public
> ip
> > > of
> > > > A (or google.com) from B which makes sense as B is in a private
> > subnet.
> > > > But when I try to connect to the zookeeper at private.ip.A:2181 I
> get a
> > > No
> > > > Route to Host error. Is there a reason why I can ping private.ip.A
> but
> > > not
> > > > connect to private.ip.A:2181?
> > > >
> > > > I have not changed any of my server or producer properties. I have
> > tried
> > > > to change advertised host name, but no luck.
> > > >
> > > > Thanks for the help!
> > > >
> > > >
> > > >
> > > > On Thu, Jan 29, 2015 at 8:12 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> Sorry my previous link was not complete:
> > > >>
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers
> > > >> ?
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Jan 28, 2015 at 10:56 PM, Su She 
> > wrote:
> > > >>
> > > >> > Thank you Dillian and Guozhang for the responses.
> > > >> >
> > > >> > Yes, Dillian you are understanding my issue correctly. I am not
> sure
> > > >> what
>

Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Su She
Thanks Jonathon!

1) How can I get the whole command string? What i c/p was the whole output

2) I do have Zookeeper as a service from Cloudera Manager, except I used to
have this before as well and was never a problem. Is there a way I can keep
the Zookeeper from CM as well as manually start it up? I don't want to
potentially cause other services depending on the CM zookeeper to fail. Or
maybe I can connect kafka producer to the zookeeper on CM?

Thanks a lot, really appreciate the help.

On Thu, Jan 29, 2015 at 12:42 PM, Jonathan Natkins 
wrote:

> Hey Su,
>
> It's hard to say, because you didn't copy the entire command string, but it
> looks like it's probably a process that's being managed by Cloudera
> Manager. Do you have a Cloudera Manager instance that is running a
> Zookeeper quorum? If so, that's where the conflict is. You can either
> reconfigure your Zookeeper in CM, or stop the service, and start up a
> Zookeeper manually to try to bind to the public IPs.
>
> Thanks,
> Natty
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin 
>
>
> On Thu, Jan 29, 2015 at 12:36 PM, Su She  wrote:
>
> > Thanks Jonathan!
> >
> > This was the result, would it be okay for me to kill 3544? Or do I try to
> > access a new port?:
> >
> > *sudo netstat -tulpn | grep 2181*
> >
> > tcp0  0 0.0.0.0:21810.0.0.0:*
> > LISTEN  3544/java
> >
> >
> > *ps 3544*
> >
> > PID TTY  STAT   TIME COMMAND
> > 3544 ?Sl 0:19 /usr/java/jdk1.7.0_67-cloudera/bin/java -
> >
> >
> >
> > On Thu, Jan 29, 2015 at 12:31 PM, Jonathan Natkins  >
> > wrote:
> >
> > > Hey Su,
> > >
> > > That exception indicates that there's something else already running on
> > the
> > > port that you're trying to start up Zookeeper on. The quickest way to
> > > figure out what's causing the conflict is to run netstat and look for
> the
> > > port:
> > >
> > > [root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
> > > tcp0  0 0.0.0.0:21810.0.0.0:*
> > > LISTEN  9814/java
> > > [root@ip-10-0-0-45 ~]# *ps 9814*
> > >   PID TTY  STAT   TIME COMMAND
> > >  9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp
> > >
> > >
> >
> /var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z
> > >
> > > Jonathan "Natty" Natkins
> > > StreamSets | Customer Engagement Engineer
> > > mobile: 609.577.1600 | linkedin 
> > >
> > >
> > > On Thu, Jan 29, 2015 at 11:47 AM, Su She 
> wrote:
> > >
> > > > I tried a new approach and have started ec2 instances in a public
> > subnet
> > > > rather than a private subnet. However, now when I try to start
> > zookeeper
> > > I
> > > > get this error. How can I go about solving this issue? Thank you.
> > > >
> > > > ERROR Unexpected exception, exiting abnormally
> > > > (org.apache.zookeeper.server.ZooKeeperServerMain)
> > > > java.net.BindException: Address already in use
> > > > at sun.nio.ch.Net.bind0(Native Method)
> > > > at sun.nio.ch.Net.bind(Net.java:444)
> > > > at sun.nio.ch.Net.bind(Net.java:436)
> > > > at
> > > >
> > sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> > > > at
> > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> > > > at
> > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
> > > >
> > > >
> > > >
> > > > On Thu, Jan 29, 2015 at 9:44 AM, Su She 
> wrote:
> > > >
> > > > > Thank you Guozhang for an updated link.
> > > > >
> > > > > I read the answer for "Why can't my consumers/producers connect to
> > the
> > > > > brokers?". I am confused on a couple things. I can ping the private
> > IP
> > > of
> > > > >  A (Cluster Launcher) from B (EC2 instance). I can't ping the
> public
> > ip
> > > > of
> > > > > A (or google.com) from B which makes sense as B is in a private
> > > subnet.
> > > > > But when I try to connect to the zookeeper at private.ip.A:2181 I
> > get a
> > > > No
> > > > > Route to Host error. Is there a reas

Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Jonathan Natkins
Responses inline

Jonathan "Natty" Natkins
StreamSets | Customer Engagement Engineer
mobile: 609.577.1600 | linkedin 


On Thu, Jan 29, 2015 at 12:47 PM, Su She  wrote:

> Thanks Jonathon!
>
> 1) How can I get the whole command string? What i c/p was the whole output
>

Might be something to do with your screen size, or something. Probably if
you did a `ps aux` and grepped for the process, you might end up finding
it.

>
> 2) I do have Zookeeper as a service from Cloudera Manager, except I used to
> have this before as well and was never a problem. Is there a way I can keep
> the Zookeeper from CM as well as manually start it up? I don't want to
> potentially cause other services depending on the CM zookeeper to fail. Or
> maybe I can connect kafka producer to the zookeeper on CM?
>

Sure, makes sense. Just configure your manually-started Zookeeper with
different ports. 2181 is the obvious problem port, but there's probably
other web UI ports or something. Not sure off the top of my head.


> Thanks a lot, really appreciate the help.
>
> On Thu, Jan 29, 2015 at 12:42 PM, Jonathan Natkins 
> wrote:
>
> > Hey Su,
> >
> > It's hard to say, because you didn't copy the entire command string, but
> it
> > looks like it's probably a process that's being managed by Cloudera
> > Manager. Do you have a Cloudera Manager instance that is running a
> > Zookeeper quorum? If so, that's where the conflict is. You can either
> > reconfigure your Zookeeper in CM, or stop the service, and start up a
> > Zookeeper manually to try to bind to the public IPs.
> >
> > Thanks,
> > Natty
> >
> > Jonathan "Natty" Natkins
> > StreamSets | Customer Engagement Engineer
> > mobile: 609.577.1600 | linkedin 
> >
> >
> > On Thu, Jan 29, 2015 at 12:36 PM, Su She  wrote:
> >
> > > Thanks Jonathan!
> > >
> > > This was the result, would it be okay for me to kill 3544? Or do I try
> to
> > > access a new port?:
> > >
> > > *sudo netstat -tulpn | grep 2181*
> > >
> > > tcp0  0 0.0.0.0:21810.0.0.0:*
> > > LISTEN  3544/java
> > >
> > >
> > > *ps 3544*
> > >
> > > PID TTY  STAT   TIME COMMAND
> > > 3544 ?Sl 0:19 /usr/java/jdk1.7.0_67-cloudera/bin/java -
> > >
> > >
> > >
> > > On Thu, Jan 29, 2015 at 12:31 PM, Jonathan Natkins <
> na...@streamsets.com
> > >
> > > wrote:
> > >
> > > > Hey Su,
> > > >
> > > > That exception indicates that there's something else already running
> on
> > > the
> > > > port that you're trying to start up Zookeeper on. The quickest way to
> > > > figure out what's causing the conflict is to run netstat and look for
> > the
> > > > port:
> > > >
> > > > [root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
> > > > tcp0  0 0.0.0.0:21810.0.0.0:*
> > > > LISTEN  9814/java
> > > > [root@ip-10-0-0-45 ~]# *ps 9814*
> > > >   PID TTY  STAT   TIME COMMAND
> > > >  9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java
> -cp
> > > >
> > > >
> > >
> >
> /var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z
> > > >
> > > > Jonathan "Natty" Natkins
> > > > StreamSets | Customer Engagement Engineer
> > > > mobile: 609.577.1600 | linkedin  >
> > > >
> > > >
> > > > On Thu, Jan 29, 2015 at 11:47 AM, Su She 
> > wrote:
> > > >
> > > > > I tried a new approach and have started ec2 instances in a public
> > > subnet
> > > > > rather than a private subnet. However, now when I try to start
> > > zookeeper
> > > > I
> > > > > get this error. How can I go about solving this issue? Thank you.
> > > > >
> > > > > ERROR Unexpected exception, exiting abnormally
> > > > > (org.apache.zookeeper.server.ZooKeeperServerMain)
> > > > > java.net.BindException: Address already in use
> > > > > at sun.nio.ch.Net.bind0(Native Method)
> > > > > at sun.nio.ch.Net.bind(Net.java:444)
> > > > > at sun.nio.ch.Net.bind(Net.java:436)
> > > > > at
> > > > >
> > >
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> > > > > at
> > > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> > > > > at
> > > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> > > > > at
> > > > >
> > > > >
> > > >
>

Re: Consuming Kafka Messages Inside of EC2 Instances

2015-01-29 Thread Su She
Thanks Jonathon!

Ahh got it, I connected to a different port. Also it just happened to be
that the CM zookeeper was running on the one of 10 instances that I could
have manually started zookeeper on -_-. Thanks for the help!


On Thu, Jan 29, 2015 at 12:51 PM, Jonathan Natkins 
wrote:

> Responses inline
>
> Jonathan "Natty" Natkins
> StreamSets | Customer Engagement Engineer
> mobile: 609.577.1600 | linkedin 
>
>
> On Thu, Jan 29, 2015 at 12:47 PM, Su She  wrote:
>
> > Thanks Jonathon!
> >
> > 1) How can I get the whole command string? What i c/p was the whole
> output
> >
>
> Might be something to do with your screen size, or something. Probably if
> you did a `ps aux` and grepped for the process, you might end up finding
> it.
>
> >
> > 2) I do have Zookeeper as a service from Cloudera Manager, except I used
> to
> > have this before as well and was never a problem. Is there a way I can
> keep
> > the Zookeeper from CM as well as manually start it up? I don't want to
> > potentially cause other services depending on the CM zookeeper to fail.
> Or
> > maybe I can connect kafka producer to the zookeeper on CM?
> >
>
> Sure, makes sense. Just configure your manually-started Zookeeper with
> different ports. 2181 is the obvious problem port, but there's probably
> other web UI ports or something. Not sure off the top of my head.
>
>
> > Thanks a lot, really appreciate the help.
> >
> > On Thu, Jan 29, 2015 at 12:42 PM, Jonathan Natkins  >
> > wrote:
> >
> > > Hey Su,
> > >
> > > It's hard to say, because you didn't copy the entire command string,
> but
> > it
> > > looks like it's probably a process that's being managed by Cloudera
> > > Manager. Do you have a Cloudera Manager instance that is running a
> > > Zookeeper quorum? If so, that's where the conflict is. You can either
> > > reconfigure your Zookeeper in CM, or stop the service, and start up a
> > > Zookeeper manually to try to bind to the public IPs.
> > >
> > > Thanks,
> > > Natty
> > >
> > > Jonathan "Natty" Natkins
> > > StreamSets | Customer Engagement Engineer
> > > mobile: 609.577.1600 | linkedin 
> > >
> > >
> > > On Thu, Jan 29, 2015 at 12:36 PM, Su She 
> wrote:
> > >
> > > > Thanks Jonathan!
> > > >
> > > > This was the result, would it be okay for me to kill 3544? Or do I
> try
> > to
> > > > access a new port?:
> > > >
> > > > *sudo netstat -tulpn | grep 2181*
> > > >
> > > > tcp0  0 0.0.0.0:21810.0.0.0:*
> > > > LISTEN  3544/java
> > > >
> > > >
> > > > *ps 3544*
> > > >
> > > > PID TTY  STAT   TIME COMMAND
> > > > 3544 ?Sl 0:19 /usr/java/jdk1.7.0_67-cloudera/bin/java -
> > > >
> > > >
> > > >
> > > > On Thu, Jan 29, 2015 at 12:31 PM, Jonathan Natkins <
> > na...@streamsets.com
> > > >
> > > > wrote:
> > > >
> > > > > Hey Su,
> > > > >
> > > > > That exception indicates that there's something else already
> running
> > on
> > > > the
> > > > > port that you're trying to start up Zookeeper on. The quickest way
> to
> > > > > figure out what's causing the conflict is to run netstat and look
> for
> > > the
> > > > > port:
> > > > >
> > > > > [root@ip-10-0-0-45 ~]# *netstat -tulpn | grep 2181*
> > > > > tcp0  0 0.0.0.0:21810.0.0.0:*
> > > > > LISTEN  9814/java
> > > > > [root@ip-10-0-0-45 ~]# *ps 9814*
> > > > >   PID TTY  STAT   TIME COMMAND
> > > > >  9814 ?Sl 8:11 /usr/java/jdk1.7.0_67-cloudera/bin/java
> > -cp
> > > > >
> > > > >
> > > >
> > >
> >
> /var/run/cloudera-scm-agent/process/98-zookeeper-server:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/z
> > > > >
> > > > > Jonathan "Natty" Natkins
> > > > > StreamSets | Customer Engagement Engineer
> > > > > mobile: 609.577.1600 | linkedin <
> http://www.linkedin.com/in/nattyice
> > >
> > > > >
> > > > >
> > > > > On Thu, Jan 29, 2015 at 11:47 AM, Su She 
> > > wrote:
> > > > >
> > > > > > I tried a new approach and have started ec2 instances in a public
> > > > subnet
> > > > > > rather than a private subnet. However, now when I try to start
> > > > zookeeper
> > > > > I
> > > > > > get this error. How can I go about solving this issue? Thank you.
> > > > > >
> > > > > > ERROR Unexpected exception, exiting abnormally
> > > > > > (org.apache.zookeeper.server.ZooKeeperServerMain)
> > > > > > java.net.BindException: Address already in use
> > > > > > at sun.nio.ch.Net.bind0(Native Method)
> > > > > > at sun.nio.ch.Net.bind(Net.java:444)
> > > > > > at sun.nio.ch.Net.bind(Net.java:436)
> > > > > > at
> > > > > >
> > > >
> > sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> > > > > > at
> > > > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> > > > > > at
> > > > > sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> or

Re: Errors from ReassignPartitionsCommand

2015-01-29 Thread Allen Wang
We are using 0.8.1.1.

How do we identify controller migration? Is it in logs or some metrics?

Allen

On Tue, Jan 27, 2015 at 9:35 AM, Guozhang Wang  wrote:

> Allen, which version of Kafka are you using? And if you have multiple
> brokers, is there a controller migration happened before?
>
> Guozhang
>
> On Fri, Jan 23, 2015 at 3:56 PM, Allen Wang 
> wrote:
>
> > Hello,
> >
> > We tried the ReassignPartitionsCommand to move partitions to new brokers.
> > The execution initially showed message "Successfully started reassignment
> > of partitions ...". But when I tried to verify using --verify option, it
> > reported some reassignments have failed:
> >
> > ERROR: Assigned replicas (0,5,2) don't match the list of replicas for
> > reassignment (0,5) for partition [vhs_playback_event,1]
> > ERROR: Assigned replicas (4,5,0,2) don't match the list of replicas for
> > reassignment (4,5) for partition [vhs_playback_event,11]
> > ERROR: Assigned replicas (3,5,0,2) don't match the list of replicas for
> > reassignment (3,5) for partition [vhs_playback_event,16]
> >
> > I noticed that the assigned replicas in the error messages include both
> old
> > assignment and new assignment. Is this a real error or just means
> > partitions are being copied and current state does not match the final
> > expected state?
> >
> > Since I was confused by the errors, I ran the same
> > ReassignPartitionsCommand with the same assignment again but got some
> > additional failure messages complaining that three partitions do not
> exist:
> >
> > [2015-01-23 18:15:41,333] ERROR Skipping reassignment of partition
> > [vhs_playback_event,16] since it doesn't exist
> > (kafka.admin.ReassignPartitionsCommand)
> > [2015-01-23 18:15:41,455] ERROR Skipping reassignment of partition
> > [vhs_playback_event,15] since it doesn't exist
> > (kafka.admin.ReassignPartitionsCommand)
> > [2015-01-23 18:15:41,499] ERROR Skipping reassignment of partition
> > [vhs_playback_event,17] since it doesn't exist
> > (kafka.admin.ReassignPartitionsCommand)
> >
> > These partitions later reappeared from the output of --verify.
> >
> > The other thing is that at one point the BytesOut from one broker exceeds
> > 100Mbytes, which is quite alarming.
> >
> > In the end, the reassignment was done according to the input file to
> > ReassignPartitionsCommand. But the UnderReplicatedPartitions for the
> > brokers keeps showing a positive number, even though the output of
> describe
> > topic command and ZooKeeper data show the ISRs are all in sync, and
> > Replica-MaxLag is 0.
> >
> > To sum up, the overall execution is successful but the error messages are
> > quite noisy and the metric is not consistent with what appears to be.
> >
> > Does anyone have the similar experience and is there anything we can do
> get
> > it done smoother? What can we do to reset the inconsistent
> > UnderReplicatedPartitions metric?
> >
> > Thanks,
> > Allen
> >
>
>
>
> --
> -- Guozhang
>


Kafka ETL Camus Question

2015-01-29 Thread Bhavesh Mistry
Hi Kafka Team or Linked-In  Team,

I would like to know if you guys run Camus ETL job with speculative
execution true or false.  Does it make sense to set this to false ? Having
true, it creates additional load on brokers for each map task (create a map
task to pull same partition twice).  Is there any advantage to this having
it on vs off ?

mapred.map.tasks.speculative.execution

Thanks,

Bhavesh


Re: Errors from ReassignPartitionsCommand

2015-01-29 Thread Guozhang Wang
Each broker should have a controller log, and at one period of time only
one of them will host the controller, while others' controller logs will be
almost empty. If you found some entries like "controller start-up" one
multiple controllers or if more than one controller log has large amount of
data then that would indicate controller has migrated from one broker to
another recently.

If that happens, then you probably hit this one: KAFKA-1578
, it is fixed in the
0.8.2 release.

Guozhang

On Thu, Jan 29, 2015 at 1:55 PM, Allen Wang 
wrote:

> We are using 0.8.1.1.
>
> How do we identify controller migration? Is it in logs or some metrics?
>
> Allen
>
> On Tue, Jan 27, 2015 at 9:35 AM, Guozhang Wang  wrote:
>
> > Allen, which version of Kafka are you using? And if you have multiple
> > brokers, is there a controller migration happened before?
> >
> > Guozhang
> >
> > On Fri, Jan 23, 2015 at 3:56 PM, Allen Wang 
> > wrote:
> >
> > > Hello,
> > >
> > > We tried the ReassignPartitionsCommand to move partitions to new
> brokers.
> > > The execution initially showed message "Successfully started
> reassignment
> > > of partitions ...". But when I tried to verify using --verify option,
> it
> > > reported some reassignments have failed:
> > >
> > > ERROR: Assigned replicas (0,5,2) don't match the list of replicas for
> > > reassignment (0,5) for partition [vhs_playback_event,1]
> > > ERROR: Assigned replicas (4,5,0,2) don't match the list of replicas for
> > > reassignment (4,5) for partition [vhs_playback_event,11]
> > > ERROR: Assigned replicas (3,5,0,2) don't match the list of replicas for
> > > reassignment (3,5) for partition [vhs_playback_event,16]
> > >
> > > I noticed that the assigned replicas in the error messages include both
> > old
> > > assignment and new assignment. Is this a real error or just means
> > > partitions are being copied and current state does not match the final
> > > expected state?
> > >
> > > Since I was confused by the errors, I ran the same
> > > ReassignPartitionsCommand with the same assignment again but got some
> > > additional failure messages complaining that three partitions do not
> > exist:
> > >
> > > [2015-01-23 18:15:41,333] ERROR Skipping reassignment of partition
> > > [vhs_playback_event,16] since it doesn't exist
> > > (kafka.admin.ReassignPartitionsCommand)
> > > [2015-01-23 18:15:41,455] ERROR Skipping reassignment of partition
> > > [vhs_playback_event,15] since it doesn't exist
> > > (kafka.admin.ReassignPartitionsCommand)
> > > [2015-01-23 18:15:41,499] ERROR Skipping reassignment of partition
> > > [vhs_playback_event,17] since it doesn't exist
> > > (kafka.admin.ReassignPartitionsCommand)
> > >
> > > These partitions later reappeared from the output of --verify.
> > >
> > > The other thing is that at one point the BytesOut from one broker
> exceeds
> > > 100Mbytes, which is quite alarming.
> > >
> > > In the end, the reassignment was done according to the input file to
> > > ReassignPartitionsCommand. But the UnderReplicatedPartitions for the
> > > brokers keeps showing a positive number, even though the output of
> > describe
> > > topic command and ZooKeeper data show the ISRs are all in sync, and
> > > Replica-MaxLag is 0.
> > >
> > > To sum up, the overall execution is successful but the error messages
> are
> > > quite noisy and the metric is not consistent with what appears to be.
> > >
> > > Does anyone have the similar experience and is there anything we can do
> > get
> > > it done smoother? What can we do to reset the inconsistent
> > > UnderReplicatedPartitions metric?
> > >
> > > Thanks,
> > > Allen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


per-topic configuration names... unit suffix ?

2015-01-29 Thread Thunder Stumpges
Hi all,

I am reading about the per-topic 
configs and the unit 
suffixes on a couple don't seem to match to the global/server ones.

Specifically:

retention.ms => log.retention.minutes
segment.ms => log.roll.hours

Can someone clear this up?

Is the per-topic key really 'ms'? If so, are the units actually in milliseconds?

I would expect the units to match the global, and therefore I would expect the 
key suffix to match. Maybe this is just a documentation error?

Thanks!
Thunder



RE: LinkedIn Engineering Blog Post - Current and Future

2015-01-29 Thread Thunder Stumpges
Thanks Jon. That link isn't working for me though:

"Page not found

The requested page "/kafka/kafka-linkedin-%E2%80%93-current-and-future" could 
not be found."

-Original Message-
From: Jon Bringhurst [mailto:jbringhu...@linkedin.com.INVALID] 
Sent: Thursday, January 29, 2015 10:23 AM
To: users@kafka.apache.org
Subject: LinkedIn Engineering Blog Post - Current and Future

Here's an overview of what LinkedIn plans to concentrate on in the upcoming 
year.

https://engineering.linkedin.com/kafka/kafka-linkedin-%E2%80%93-current-and-future

-Jon


Re: LinkedIn Engineering Blog Post - Current and Future

2015-01-29 Thread Jeff Holoman
https://engineering.linkedin.com/kafka/kafka-linkedin-current-and-future

On Thu, Jan 29, 2015 at 5:43 PM, Thunder Stumpges 
wrote:

> Thanks Jon. That link isn't working for me though:
>
> "Page not found
>
> The requested page "/kafka/kafka-linkedin-%E2%80%93-current-and-future"
> could not be found."
>
> -Original Message-
> From: Jon Bringhurst [mailto:jbringhu...@linkedin.com.INVALID]
> Sent: Thursday, January 29, 2015 10:23 AM
> To: users@kafka.apache.org
> Subject: LinkedIn Engineering Blog Post - Current and Future
>
> Here's an overview of what LinkedIn plans to concentrate on in the
> upcoming year.
>
>
> https://engineering.linkedin.com/kafka/kafka-linkedin-%E2%80%93-current-and-future
>
> -Jon
>



-- 
Jeff Holoman
Systems Engineer
678-612-9519


RE: LinkedIn Engineering Blog Post - Current and Future

2015-01-29 Thread Aditya Auradkar
This should work.

http://engineering.linkedin.com/kafka/kafka-linkedin-current-and-future

Aditya


From: Thunder Stumpges [tstump...@ntent.com]
Sent: Thursday, January 29, 2015 2:43 PM
To: users@kafka.apache.org
Subject: RE: LinkedIn Engineering Blog Post - Current and Future

Thanks Jon. That link isn't working for me though:

"Page not found

The requested page "/kafka/kafka-linkedin-%E2%80%93-current-and-future" could 
not be found."

-Original Message-
From: Jon Bringhurst [mailto:jbringhu...@linkedin.com.INVALID]
Sent: Thursday, January 29, 2015 10:23 AM
To: users@kafka.apache.org
Subject: LinkedIn Engineering Blog Post - Current and Future

Here's an overview of what LinkedIn plans to concentrate on in the upcoming 
year.

https://engineering.linkedin.com/kafka/kafka-linkedin-%E2%80%93-current-and-future

-Jon


RE: LinkedIn Engineering Blog Post - Current and Future

2015-01-29 Thread Thunder Stumpges
Yep, thanks guys.

-Original Message-
From: Aditya Auradkar [mailto:aaurad...@linkedin.com.INVALID] 
Sent: Thursday, January 29, 2015 2:46 PM
To: users@kafka.apache.org
Subject: RE: LinkedIn Engineering Blog Post - Current and Future

This should work.

http://engineering.linkedin.com/kafka/kafka-linkedin-current-and-future

Aditya


From: Thunder Stumpges [tstump...@ntent.com]
Sent: Thursday, January 29, 2015 2:43 PM
To: users@kafka.apache.org
Subject: RE: LinkedIn Engineering Blog Post - Current and Future

Thanks Jon. That link isn't working for me though:

"Page not found

The requested page "/kafka/kafka-linkedin-%E2%80%93-current-and-future" could 
not be found."

-Original Message-
From: Jon Bringhurst [mailto:jbringhu...@linkedin.com.INVALID]
Sent: Thursday, January 29, 2015 10:23 AM
To: users@kafka.apache.org
Subject: LinkedIn Engineering Blog Post - Current and Future

Here's an overview of what LinkedIn plans to concentrate on in the upcoming 
year.

https://engineering.linkedin.com/kafka/kafka-linkedin-%E2%80%93-current-and-future

-Jon


Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Jiangjie Qin
In meetup we said that KAFKA-1650 and follow up patches is included in
0.8.2, but it seems not on the list.


On 1/29/15, 1:01 AM, "Magnus Edenhill"  wrote:

>+1 on librdkafka interop
>
>Minor nitpick:
> KAFKA-1781 (state required gradle version in README)  is included in the
>Release notes but is not actually fixed
>
>
>2015-01-29 6:22 GMT+01:00 Jun Rao :
>
>> This is the third candidate for release of Apache Kafka 0.8.2.0.
>>
>> Release Notes for the 0.8.2.0 release
>>
>> 
>>https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.
>>html
>>
>> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> (SHA256)
>> checksum.
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>>
>> * Maven artifacts to be voted upon prior to release:
>> https://repository.apache.org/content/groups/staging/
>>
>> * scala-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>>
>> * java-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>>
>> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>>
>> 
>>https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0
>>dab378cc411f4938a9cea1eb7ea
>> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>>
>> /***
>>
>> Thanks,
>>
>> Jun
>>



Re: [VOTE] 0.8.2.0 Candidate 3

2015-01-29 Thread Jun Rao
I think we were just confirming whether that issue was fixed in 0.8.2 not.
Given that this issue only happens in unclean shutdown, I don't think it's
a blocker for 0.8.2. Also, the patch is not trivial and it's better to test
it out a bit longer in trunk.

Thanks,

Jun

On Thu, Jan 29, 2015 at 5:36 PM, Jiangjie Qin 
wrote:

> In meetup we said that KAFKA-1650 and follow up patches is included in
> 0.8.2, but it seems not on the list.
>
>
> On 1/29/15, 1:01 AM, "Magnus Edenhill"  wrote:
>
> >+1 on librdkafka interop
> >
> >Minor nitpick:
> > KAFKA-1781 (state required gradle version in README)  is included in the
> >Release notes but is not actually fixed
> >
> >
> >2015-01-29 6:22 GMT+01:00 Jun Rao :
> >
> >> This is the third candidate for release of Apache Kafka 0.8.2.0.
> >>
> >> Release Notes for the 0.8.2.0 release
> >>
> >>
> >>https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES
> .
> >>html
> >>
> >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> >> (SHA256)
> >> checksum.
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
> >>
> >> * Maven artifacts to be voted upon prior to release:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * scala-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
> >>
> >> * java-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
> >>
> >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> >>
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0
> >>dab378cc411f4938a9cea1eb7ea
> >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
> >>
> >> /***
> >>
> >> Thanks,
> >>
> >> Jun
> >>
>
>


Question on ETL while replau

2015-01-29 Thread Joshua Schumacher
What's the best way to add two 'fields' to my kafka messages once they are
stored?  Can I just do a replay on all of them and add the field?  How
would I throw out the old kafka messages that don't have the field then?  I
am using Druid to process the data, but not sure of how to propagate data
changes all the way down to kafka.

Thanks,
Josh


Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Jaikiran Pai


Which operating system are you on and what Java version? Depending on 
the OS, you could get tools (like lsof) to show which file descriptors 
are being held on to. Is it the client JVM which ends up with these leaks?


Also, would it be possible to post a snippet of your application code 
which shows how you are using the Kafka APIs?


-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while publishing
kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89] Fetching
topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
failed*
*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
 at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
 at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
 at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
 at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
 at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
 at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor gets
destroyed, somehow file descriptor is not getting cleared but when i did
explicitly ran the full gc, fd count got reduced by a signification amout.





Re: Potential socket leak in kafka sync producer

2015-01-29 Thread ankit tyagi
Hi Jaikiran,

I am using ubuntu and was able to reproduce on redhat too. Please find the
more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

*java version "1.7.0_72"*

This is happening on client side. Output of lsof was showing that maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private Producer myProducer;*

* myProducer =new Producer<>(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*List> msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before GC(histo_1)
and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
wrote:

>
> Which operating system are you on and what Java version? Depending on the
> OS, you could get tools (like lsof) to show which file descriptors are
> being held on to. Is it the client JVM which ends up with these leaks?
>
> Also, would it be possible to post a snippet of your application code
> which shows how you are using the Kafka APIs?
>
> -Jaikiran
> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>
>> Hi,
>>
>> Currently we are using sync producer client of 0.8.1 version in our
>> production box . we are getting the following exception while publishing
>> kafka message
>>
>> *[2015-01-29
>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89] Fetching
>> topic metadata with correlation id 10808 for topics [Set(*
>> *kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
>> failed*
>> *java.net.ConnectException: Connection refused*
>> *at sun.nio.ch.Net.connect0(Native Method)*
>> *at sun.nio.ch.Net.connect(Net.java:465)*
>> *at sun.nio.ch.Net.connect(Net.java:457)*
>> *at
>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>> 57)
>>  at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>  at
>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
>>  at
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>> doSend(SyncProducer.scala:68)
>>  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>>  at
>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>  at
>> kafka.producer.BrokerPartitionInfo.updateInfo(
>> BrokerPartitionInfo.scala:82)
>>
>>
>> we are using dynamic thread pool to publish message to kafka. My
>> observation is when after keep alive time when threads in my executor gets
>> destroyed, somehow file descriptor is not getting cleared but when i did
>> explicitly ran the full gc, fd count got reduced by a signification amout.
>>
>>
>


Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Manikumar Reddy
Hope you are closing the producers. can you share the attachment through
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi 
wrote:

> Hi Jaikiran,
>
> I am using ubuntu and was able to reproduce on redhat too. Please find the
> more information below.
>
>
> *DISTRIB_ID=Ubuntu*
> *DISTRIB_RELEASE=12.04*
> *DISTRIB_CODENAME=precise*
> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>
> *java version "1.7.0_72"*
>
> This is happening on client side. Output of lsof was showing that maximum
> fd were FIFO and anon. But after GC FD count was reduced significantly.
>
> Below is my Client Code which i am using for publishing message.
>
>
> * private Producer myProducer;*
>
> * myProducer =new Producer<>(new
> ProducerConfig(myProducerProperties));*
>
> *   public void send(*
> *List> msgs)*
> *{*
> *myProducer.send(msgs);*
> *}*
>
>
> we are using sync producer. I am attaching object histo before GC(histo_1)
> and after GC(histo_2) in my application.
>
> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
> wrote:
>
> >
> > Which operating system are you on and what Java version? Depending on the
> > OS, you could get tools (like lsof) to show which file descriptors are
> > being held on to. Is it the client JVM which ends up with these leaks?
> >
> > Also, would it be possible to post a snippet of your application code
> > which shows how you are using the Kafka APIs?
> >
> > -Jaikiran
> > On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
> >
> >> Hi,
> >>
> >> Currently we are using sync producer client of 0.8.1 version in our
> >> production box . we are getting the following exception while publishing
> >> kafka message
> >>
> >> *[2015-01-29
> >> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
> Fetching
> >> topic metadata with correlation id 10808 for topics [Set(*
> >> *kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
> >> failed*
> >> *java.net.ConnectException: Connection refused*
> >> *at sun.nio.ch.Net.connect0(Native Method)*
> >> *at sun.nio.ch.Net.connect(Net.java:465)*
> >> *at sun.nio.ch.Net.connect(Net.java:457)*
> >> *at
> >> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
> >>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:
> >> 57)
> >>  at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> >>  at
> >> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> >>  at
> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> >> doSend(SyncProducer.scala:68)
> >>  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> >>  at
> >> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> >>  at
> >> kafka.producer.BrokerPartitionInfo.updateInfo(
> >> BrokerPartitionInfo.scala:82)
> >>
> >>
> >> we are using dynamic thread pool to publish message to kafka. My
> >> observation is when after keep alive time when threads in my executor
> gets
> >> destroyed, somehow file descriptor is not getting cleared but when i did
> >> explicitly ran the full gc, fd count got reduced by a signification
> amout.
> >>
> >>
> >
>


Re: Potential socket leak in kafka sync producer

2015-01-29 Thread ankit tyagi
Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
public void stop()
{
LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
if (myProducer != null) {
myProducer.close();
}
}



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
wrote:

> Hope you are closing the producers. can you share the attachment through
> gist/patebin
>
> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi 
> wrote:
>
> > Hi Jaikiran,
> >
> > I am using ubuntu and was able to reproduce on redhat too. Please find
> the
> > more information below.
> >
> >
> > *DISTRIB_ID=Ubuntu*
> > *DISTRIB_RELEASE=12.04*
> > *DISTRIB_CODENAME=precise*
> > *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
> >
> > *java version "1.7.0_72"*
> >
> > This is happening on client side. Output of lsof was showing that maximum
> > fd were FIFO and anon. But after GC FD count was reduced significantly.
> >
> > Below is my Client Code which i am using for publishing message.
> >
> >
> > * private Producer myProducer;*
> >
> > * myProducer =new Producer<>(new
> > ProducerConfig(myProducerProperties));*
> >
> > *   public void send(*
> > *List> msgs)*
> > *{*
> > *myProducer.send(msgs);*
> > *}*
> >
> >
> > we are using sync producer. I am attaching object histo before
> GC(histo_1)
> > and after GC(histo_2) in my application.
> >
> > On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
> > wrote:
> >
> > >
> > > Which operating system are you on and what Java version? Depending on
> the
> > > OS, you could get tools (like lsof) to show which file descriptors are
> > > being held on to. Is it the client JVM which ends up with these leaks?
> > >
> > > Also, would it be possible to post a snippet of your application code
> > > which shows how you are using the Kafka APIs?
> > >
> > > -Jaikiran
> > > On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
> > >
> > >> Hi,
> > >>
> > >> Currently we are using sync producer client of 0.8.1 version in our
> > >> production box . we are getting the following exception while
> publishing
> > >> kafka message
> > >>
> > >> *[2015-01-29
> > >> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
> > Fetching
> > >> topic metadata with correlation id 10808 for topics [Set(*
> > >> *kafka_topic_coms_FD_test1)] from broker
> [id:0,host:localhost,port:9092]
> > >> failed*
> > >> *java.net.ConnectException: Connection refused*
> > >> *at sun.nio.ch.Net.connect0(Native Method)*
> > >> *at sun.nio.ch.Net.connect(Net.java:465)*
> > >> *at sun.nio.ch.Net.connect(Net.java:457)*
> > >> *at
> > >> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
> > >>  at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:
> > >> 57)
> > >>  at
> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> > >>  at
> > >>
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> > >>  at
> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> > >> doSend(SyncProducer.scala:68)
> > >>  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > >>  at
> > >> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > >>  at
> > >> kafka.producer.BrokerPartitionInfo.updateInfo(
> > >> BrokerPartitionInfo.scala:82)
> > >>
> > >>
> > >> we are using dynamic thread pool to publish message to kafka. My
> > >> observation is when after keep alive time when threads in my executor
> > gets
> > >> destroyed, somehow file descriptor is not getting cleared but when i
> did
> > >> explicitly ran the full gc, fd count got reduced by a signification
> > amout.
> > >>
> > >>
> > >
> >
>


Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Jaikiran Pai
What kind of a (managed) component is that which has the @PreDestroy? 
Looking at the previous snippet you added, it looks like you are 
creating the Producer in some method? If  you are going to close the 
producer in a @PreDestroy of the component, then you should be creating 
the producer in the @PostConstruct of the same component, so that you 
have proper lifecycle management of those resources.



-Jaikiran
On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:

Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
 public void stop()
 {
 LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
 if (myProducer != null) {
 myProducer.close();
 }
 }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
wrote:


Hope you are closing the producers. can you share the attachment through
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi 
wrote:


Hi Jaikiran,

I am using ubuntu and was able to reproduce on redhat too. Please find

the

more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

*java version "1.7.0_72"*

This is happening on client side. Output of lsof was showing that maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private Producer myProducer;*

* myProducer =new Producer<>(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*List> msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before

GC(histo_1)

and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
wrote:


Which operating system are you on and what Java version? Depending on

the

OS, you could get tools (like lsof) to show which file descriptors are
being held on to. Is it the client JVM which ends up with these leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:


Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while

publishing

kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

Fetching

topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker

[id:0,host:localhost,port:9092]

failed*
*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
  at

kafka.network.BlockingChannel.connect(BlockingChannel.scala:

57)
  at

kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

  at


kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)

  at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:68)
  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
  at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
  at
kafka.producer.BrokerPartitionInfo.updateInfo(
BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor

gets

destroyed, somehow file descriptor is not getting cleared but when i

did

explicitly ran the full gc, fd count got reduced by a signification

amout.






Re: Potential socket leak in kafka sync producer

2015-01-29 Thread ankit tyagi
I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai 
wrote:

> What kind of a (managed) component is that which has the @PreDestroy?
> Looking at the previous snippet you added, it looks like you are creating
> the Producer in some method? If  you are going to close the producer in a
> @PreDestroy of the component, then you should be creating the producer in
> the @PostConstruct of the same component, so that you have proper lifecycle
> management of those resources.
>
>
> -Jaikiran
>
> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>
>> Hi,
>>
>> I am closing my producer at the time of shutting down my application.
>>
>> @PreDestroy
>>  public void stop()
>>  {
>>  LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>  if (myProducer != null) {
>>  myProducer.close();
>>  }
>>  }
>>
>>
>>
>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
>> wrote:
>>
>>  Hope you are closing the producers. can you share the attachment through
>>> gist/patebin
>>>
>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
>>> ankittyagi.mn...@gmail.com>
>>> wrote:
>>>
>>>  Hi Jaikiran,

 I am using ubuntu and was able to reproduce on redhat too. Please find

>>> the
>>>
 more information below.


 *DISTRIB_ID=Ubuntu*
 *DISTRIB_RELEASE=12.04*
 *DISTRIB_CODENAME=precise*
 *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

 *java version "1.7.0_72"*

 This is happening on client side. Output of lsof was showing that
 maximum
 fd were FIFO and anon. But after GC FD count was reduced significantly.

 Below is my Client Code which i am using for publishing message.


 * private Producer myProducer;*

 * myProducer =new Producer<>(new
 ProducerConfig(myProducerProperties));*

 *   public void send(*
 *List>
 msgs)*
 *{*
 *myProducer.send(msgs);*
 *}*


 we are using sync producer. I am attaching object histo before

>>> GC(histo_1)
>>>
 and after GC(histo_2) in my application.

 On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai >>> >
 wrote:

  Which operating system are you on and what Java version? Depending on
>
 the
>>>
 OS, you could get tools (like lsof) to show which file descriptors are
> being held on to. Is it the client JVM which ends up with these leaks?
>
> Also, would it be possible to post a snippet of your application code
> which shows how you are using the Kafka APIs?
>
> -Jaikiran
> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>
>  Hi,
>>
>> Currently we are using sync producer client of 0.8.1 version in our
>> production box . we are getting the following exception while
>>
> publishing
>>>
 kafka message
>>
>> *[2015-01-29
>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
>>
> Fetching

> topic metadata with correlation id 10808 for topics [Set(*
>> *kafka_topic_coms_FD_test1)] from broker
>>
> [id:0,host:localhost,port:9092]
>>>
 failed*
>> *java.net.ConnectException: Connection refused*
>> *at sun.nio.ch.Net.connect0(Native Method)*
>> *at sun.nio.ch.Net.connect(Net.java:465)*
>> *at sun.nio.ch.Net.connect(Net.java:457)*
>> *at
>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>   at
>>
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>>>
 57)
>>   at
>>
> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>>
   at
>>
>>  kafka.producer.SyncProducer.getOrMakeConnection(
>>> SyncProducer.scala:156)
>>>
   at
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>> doSend(SyncProducer.scala:68)
>>   at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>>   at
>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>   at
>> kafka.producer.BrokerPartitionInfo.updateInfo(
>> BrokerPartitionInfo.scala:82)
>>
>>
>> we are using dynamic thread pool to publish message to kafka. My
>> observation is when after keep alive time when threads in my executor
>>
> gets

> destroyed, somehow file descriptor is not getting cleared but when i
>>
> did
>>>
 explicitly ran the full gc, fd count got reduced by a signification
>>
> amout.

>
>>
>


Re: Potential socket leak in kafka sync producer

2015-01-29 Thread ankit tyagi
attaching my producer whole code.


Creating kafkaProducer Bean in xml

* *
**
**
**
*${KAFKA_PRODUCER_BROKER_LIST}*
**
**
**


public class KafkaProducer
{
private static final Logger LOG =
LoggerFactory.getLogger(KafkaProducer.class);
private static final String SYSTEM_USER_NAME_PROPERTY = "user.name";
private static final String CONFIG_PARAM_CLIENT_ID = "client.id";
private static final String CLIENT_ID_FORMAT_STR =
"kafka.coms.producer.%s.%s.%s";

private enum ConfigParam
{
SERIALIZER_CLASS("serializer.class",
CommonPropertyParam.KAFKA_PRODUCER_SERIALIZER_CLASS),
KEY_SERIALIZER_CLASS("key.serializer.class",

CommonPropertyParam.KAFKA_PRODUCER_PARTITION_KEY_SERIALIZER_CLASS),
//commenting this to use kafka default paritioner
//PARTITIONER_CLASS("partitioner.class",
//CommonPropertyParam.KAFKA_PRODUCER_PARTITIONER_CLASS),
REQUEST_REQUIRED_ACKS("request.required.acks",
CommonPropertyParam.KAFKA_PRODUCER_REQUEST_REQUIRED_ACKS);

private final String myName;
private final PropertyParam myParam;

ConfigParam(String name, PropertyParam param)
{
myName = name;
myParam = param;
}

public String getName()
{
return myName;
}

public PropertyParam getParam()
{
return myParam;
}
}

private final String myTopic;
private final Properties myProducerProperties;

private Producer myProducer;

@Autowired
private COMSConfiguration myAppConfig;

public KafkaProducer(String topic, Properties producerProperties)
{
LOG.info("Creating Kafka Producer instance: {}", this);

myTopic = topic;
myProducerProperties = producerProperties;
}

*@PostConstruct*
*private void initializeProducer()*
*{*
*LOG.info("Initializing Kafka Producer for topic: {}", getTopic());*

*// Set producer unique client id*
*String currentUser =
System.getProperty(SYSTEM_USER_NAME_PROPERTY);*
*String currentJVMName =
ManagementFactory.getRuntimeMXBean().getName();*
*currentJVMName = currentJVMName.replace('@', '_');*

*String uniqueClientId = String.format(CLIENT_ID_FORMAT_STR,*
*getTopic(), currentUser, currentJVMName);*
*if (myProducerProperties.contains(CONFIG_PARAM_CLIENT_ID)) {*
*uniqueClientId += ":"*
*+
myProducerProperties.getProperty(CONFIG_PARAM_CLIENT_ID);*
*}*
*myProducerProperties.setProperty(CONFIG_PARAM_CLIENT_ID,*
*uniqueClientId);*

*// Set reasonable defaults for required params*
*for (ConfigParam cp : ConfigParam.values()) {*
*if (!myProducerProperties.containsKey(cp.getName())) {*
*String cpValue =
myAppConfig.getPropertyValue(cp.getParam());*
*myProducerProperties.setProperty(cp.getName(), cpValue);*
*}*
*}*

*myProducer =*
*new Producer<>(new ProducerConfig(myProducerProperties));*

*LOG.info("Initialized Kafka Producer for topic: {} and properties
{}", getTopic(),myProducerProperties);*
*}*

public String getTopic()
{
return myTopic;
}

public Producer getProducer()
{
return myProducer;
}

public void send(KeyedMessage msg)
{
myProducer.send(msg);
}

public void send(
List> msgs)
{
myProducer.send(msgs);
}

*@PreDestroy*
*public void stop()*
*{*
*LOG.info("Stopping Kafka Producer for topic: {}", myTopic);*
*if (myProducer != null) {*
*myProducer.close();*
*}*
*}*
}


On Fri, Jan 30, 2015 at 1:08 PM, ankit tyagi 
wrote:

> I have shared object histogram after and before gc on gist
> https://gist.github.com/ankit1987/f4a04a1350fdd609096d
>
> On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai 
> wrote:
>
>> What kind of a (managed) component is that which has the @PreDestroy?
>> Looking at the previous snippet you added, it looks like you are creating
>> the Producer in some method? If  you are going to close the producer in a
>> @PreDestroy of the component, then you should be creating the producer in
>> the @PostConstruct of the same component, so that you have proper lifecycle
>> management of those resources.
>>
>>
>> -Jaikiran
>>
>> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>>
>>> Hi,
>>>
>>> I am closing my producer at the time of shutting down my application.
>>>
>>> @PreDestroy
>>>  public void stop()
>>>  {
>>>  LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>>  if (myProducer != null) {
>>>  myProducer.close();
>>>  }
>>>  }
>>>
>>>
>>>
>>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy 
>>> wrote: