Re: v0.10 MirrorMaker producer cannot send v0.8 message from v0.10 broker

2016-09-16 Thread Gerard Klijs
This is a known bug, I think it was fixed in the 0.10.0.1 release. You
could alternatively use a custom message handler for the mirror maker, and
then use the produce without a timestamp when the timestamp is -1 in the
consuming message.

On Thu, Sep 15, 2016 at 9:48 AM Samuel Zhou  wrote:

> Hi,
>
> I have a pipeline that publish message with v0.8 client, the message goes
> to v0.10 broker first then mirror maker will consume it and publish it to
> another v0.10 brokers. But I got the following message from MM log:
>
> java.lang.IllegalArgumentException: Invalid timestamp -1
>
> at
>
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60)
>
> at
>
> kafka.tools.MirrorMaker$defaultMirrorMakerMessageHandler$.handle(MirrorMaker.scala:678)
>
> at
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:414)
>
> Above error makes MM dead. I am not sure if KAFKA-3188 covers the test. Or
> are there any parameters for MM that can fix above error?
>
> Thanks!
>
> Samuel
>


SASL_PLAINTEXT Authentication/Connection failure

2016-09-16 Thread Max Bridgewater
Hi,

I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
successful. I posted the full story on SO:
http://stackoverflow.com/questions/39521691/kafka-authentication-producer-unable-to-connect-producer

Bottom line is, when I start the server in SASL_PLAINTEXT mode, the below
exception keeps popping up in the logs. The first issue is that you see it
only when you change log level to DEBUG, while in reality the server isn't
in a functioning state. Should the error be printed at error level?

Now, the real issue is I don't understand why this is happening. It seems
the server is connecting to itself and trying to authenticate against
itself and failing to do so. What is wrong in my configuration?

In  server.properties, I have:

isteners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.

I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf. And the
content of kafka_client_jaas.conf is:

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="alice-secret"
   user_alice="alice-secret";
};

No client is up. The only things I have up are ZK and the Kafka server.
Here is the stack trace:

2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection to node
0 at 0.0.0.0:9092.
2016-09-15 22:06:09 DEBUG Acceptor:52 - Accepted connection from /127.0.0.1
on /127.0.1.1:9092. sendBufferSize [actual|requested]: [102400|102400]
recvBufferSize [actual|requested]: [102400|102400]
2016-09-15 22:06:09 DEBUG Processor:52 - Processor 0 listening to new
connection from /127.0.0.1:59669
2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
state to SEND_HANDSHAKE_REQUEST
2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:133 - Creating
SaslClient: client=null;service=kafka;serviceHostname=0.0.0.0;mechs=[PLAIN]
2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
state to RECEIVE_HANDSHAKE_RESPONSE
2016-09-15 22:06:09 DEBUG NetworkClient:476 - Completed connection to node 0
2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
state to HANDSHAKE_REQUEST
2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:310 - Handle Kafka
request SASL_HANDSHAKE
2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:354 - Using SASL
mechanism 'PLAIN' provided by client
2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
state to AUTHENTICATE
2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
state to INITIAL
2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
state to INTERMEDIATE
2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
state to FAILED
2016-09-15 22:06:09 DEBUG Selector:345 - Connection with /127.0.0.1
disconnected
java.io.IOException: javax.security.sasl.SaslException: Authentication
failed: Invalid JAAS configuration [Caused by
javax.security.sasl.SaslException: Authentication failed: Invalid username
or password]
at
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:243)
at
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at kafka.network.Processor.poll(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:412)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.security.sasl.SaslException: Authentication failed:
Invalid JAAS configuration [Caused by javax.security.sasl.SaslException:
Authentication failed: Invalid username or password]
at
org.apache.kafka.common.security.plain.PlainSaslServer.evaluateResponse(PlainSaslServer.java:101)
at
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:228)
... 6 more
Caused by: javax.security.sasl.SaslException: Authentication failed:
Invalid username or password
at
org.apache.kafka.common.security.plain.PlainSaslServer.evaluateResponse(PlainSaslServer.java:98)
... 7 more
2016-09-15 22:06:09 DEBUG Selector:345 - Connection with 0.0.0.0/0.0.0.0
disconnected
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239)
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182)
at
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at
org.apache.

Re: SASL_PLAINTEXT Authentication/Connection failure

2016-09-16 Thread Rajini Sivaram
Max,

I think there is a typo in your configuration. You intended admin password
to be admin-secret?

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="alice-secret"  *=> Change to **"admin-secret"*
   user_alice="alice-secret";
};


Since your inter-broker security protocol is SASL_PLAINTEXT, the controller
uses SASL with the username "admin" and that connection is failing since
the server thinks the expected password is "alice-secret".



On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater 
wrote:

> Hi,
>
> I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
> successful. I posted the full story on SO:
> http://stackoverflow.com/questions/39521691/kafka-authentication-producer-
> unable-to-connect-producer
>
> Bottom line is, when I start the server in SASL_PLAINTEXT mode, the below
> exception keeps popping up in the logs. The first issue is that you see it
> only when you change log level to DEBUG, while in reality the server isn't
> in a functioning state. Should the error be printed at error level?
>
> Now, the real issue is I don't understand why this is happening. It seems
> the server is connecting to itself and trying to authenticate against
> itself and failing to do so. What is wrong in my configuration?
>
> In  server.properties, I have:
>
> isteners=SASL_PLAINTEXT://0.0.0.0:9092
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
>
> Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.
>
> I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf. And the
> content of kafka_client_jaas.conf is:
>
> KafkaServer {
>org.apache.kafka.common.security.plain.PlainLoginModule required
>username="admin"
>password="admin-secret"
>user_admin="alice-secret"
>user_alice="alice-secret";
> };
>
> No client is up. The only things I have up are ZK and the Kafka server.
> Here is the stack trace:
>
> 2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection to node
> 0 at 0.0.0.0:9092.
> 2016-09-15 22:06:09 DEBUG Acceptor:52 - Accepted connection from /
> 127.0.0.1
> on /127.0.1.1:9092. sendBufferSize [actual|requested]: [102400|102400]
> recvBufferSize [actual|requested]: [102400|102400]
> 2016-09-15 22:06:09 DEBUG Processor:52 - Processor 0 listening to new
> connection from /127.0.0.1:59669
> 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
> state to SEND_HANDSHAKE_REQUEST
> 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:133 - Creating
> SaslClient: client=null;service=kafka;serviceHostname=0.0.0.0;mechs=
> [PLAIN]
> 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
> state to RECEIVE_HANDSHAKE_RESPONSE
> 2016-09-15 22:06:09 DEBUG NetworkClient:476 - Completed connection to node
> 0
> 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
> state to HANDSHAKE_REQUEST
> 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:310 - Handle Kafka
> request SASL_HANDSHAKE
> 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:354 - Using SASL
> mechanism 'PLAIN' provided by client
> 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
> state to AUTHENTICATE
> 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
> state to INITIAL
> 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
> state to INTERMEDIATE
> 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL server
> state to FAILED
> 2016-09-15 22:06:09 DEBUG Selector:345 - Connection with /127.0.0.1
> disconnected
> java.io.IOException: javax.security.sasl.SaslException: Authentication
> failed: Invalid JAAS configuration [Caused by
> javax.security.sasl.SaslException: Authentication failed: Invalid username
> or password]
> at
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.
> authenticate(SaslServerAuthenticator.java:243)
> at
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> java:318)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at kafka.network.Processor.poll(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:412)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: javax.security.sasl.SaslException: Authentication failed:
> Invalid JAAS configuration [Caused by javax.security.sasl.SaslException:
> Authentication failed: Invalid username or password]
> at
> org.apache.kafka.common.security.plain.PlainSaslServer.evaluateResponse(
> PlainSaslServer.java:101)
> at
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.
> authenticate(SaslServerAuthenticator.java:228)
> ... 6 more
> Caused by: javax.security.sasl.SaslException: Authentication failed:
> Invalid us

Large # of Topics/Partitions

2016-09-16 Thread Jens Rantil
Hi,

This might also be of interest: http://www.confluent
.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

Cheers,
Jens

On Monday, August 8, 2016, Daniel Fagnan  wrote:

> Thanks Tom! This was very helpful and I’ll explore having a more static
> set of partitions as that seems to fit Kafka a lot better.
>
> Cheers,
> Daniel
>
> > On Aug 8, 2016, at 12:27 PM, Tom Crayford  wrote:
> >
> > Hi Daniel,
> >
> > Kafka doesn't provide this kind of isolation or scalability for many many
> > streams. The usual design is to use a consistent hash of some "key" to
> > attribute your data to a particular partition. That of course, doesn't
> > isolate things fully, but has everything in a partition dependent on each
> > other.
> >
> > We've found that over a few thousand to a few tens of thousands of
> > partitions clusters hit a lot of issues (it depends on the write pattern,
> > how much memory you give brokers and zookeeper, and if you plan on ever
> > deleting topics).
> >
> > Another option is to manage multiple clusters, and keep under a certain
> > limit of partitions in each cluster. That is of course additional
> > operational overhead and complexity.
> >
> > I'm not sure I 100% understand your mechanism for tracking pending
> offsets,
> > but it seems like that might be your best option.
> >
> > Thanks
> >
> > Tom Crayford
> > Heroku Kafka
> >
> > On Mon, Aug 8, 2016 at 8:12 PM, Daniel Fagnan 
> wrote:
> >
> >> Hey all,
> >>
> >> I’m currently in the process of designing a system around Kafka and I’m
> >> wondering the recommended way to manage topics. Each event stream we
> have
> >> needs to be isolated from each other. A failure from one should not
> affect
> >> another event stream from processing (by failure, we mean a downstream
> >> failure that would require us to replay the messages).
> >>
> >> So my first thought was to create a topic per event stream. This allows
> a
> >> larger event stream to be partitioned for added parallelism but keep the
> >> default # of partitions down as much as possible. This would solve the
> >> isolation requirement in that a topic can keep failing and we’ll
> continue
> >> replaying the messages without affected all the other topics.
> >>
> >> We read it’s not recommended to have your data model dictate the # of
> >> partitions or topics in Kafka and we’re unsure about this approach if we
> >> need to triple our event stream.
> >>
> >> We’re currently looking at 10,000 event streams (or topics) but we don’t
> >> want to be spinning up additional brokers just so we can add more event
> >> stream, especially if the load for each is reasonable.
> >>
> >> Another option we were looking into was to not isolate at the
> >> topic/partition level but to keep a set of pending offsets persisted
> >> somewhere (seemingly what Twitter Heron or Storm does but they don’t
> seem
> >> to persist the pending offsets).
> >>
> >> Thoughts?
>
>

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Lost message in asynchronous kafka producer

2016-09-16 Thread Agostino Calamita
Hi,
I have 2 brokers with a topic with replication factor = 2.
Brokers are configured with min.insync.replicas=2.

I use a producer in asynchronous mode to send 10 messages.

After some seconds after producer start, I stop one broker.

On producer side I got no exceptions, so for producer application is all
OK, but in broker log message I see
 ERROR [Replica Manager on Broker 0]: Error processing append operation on
partition diameternew-10 (kafka.server.ReplicaManager)

So I lost all messages after broker shutdown.

Is there a way to catch as soon as possible this kind of exeption ?

I know that I can use

 m_kafkaProducer.send(prMessage).get();

or

Future _future = m_kafkaProducer.send(prMessage);

or callback in send method, but they are very slow.


Thank.


Re: SASL_PLAINTEXT Authentication/Connection failure

2016-09-16 Thread Max Bridgewater
Thanks Rajini. That was the issue. Now I am facing another one. I am not
sure why my consumer is trying to use the topic in PLAINTEXT. The consumer
config is:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN


KAFKA_OPTS is set to /home/kafka/kafka_client_jaas.conf. I can confirm that
this file is being read because if I change the file name to something
non-existing, I get file not found exception.

The content of this jaas file:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};


I launch the consumer with:
bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test3
--from-beginning --consumer.config=config/consumer.properties

The server config:

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

The producer config:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

Now, when I launch the consumer, I get following error:

[2016-09-16 05:09:11,908] WARN
[test-consumer-group_pascalvm-1474016950388-699882ba-leader-finder-thread],
Failed to find leader for Set([test3,0])
(kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.BrokerEndPointNotAvailableException: End point with security
protocol PLAINTEXT not found for broker 0
at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
at
kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:166)
at
kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:166)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

What am I missing?




On Fri, Sep 16, 2016 at 3:57 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Max,
>
> I think there is a typo in your configuration. You intended admin password
> to be admin-secret?
>
> KafkaServer {
>org.apache.kafka.common.security.plain.PlainLoginModule required
>username="admin"
>password="admin-secret"
>user_admin="alice-secret"  *=> Change to **"admin-secret"*
>user_alice="alice-secret";
> };
>
>
> Since your inter-broker security protocol is SASL_PLAINTEXT, the controller
> uses SASL with the username "admin" and that connection is failing since
> the server thinks the expected password is "alice-secret".
>
>
>
> On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater <
> max.bridgewa...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
> > successful. I posted the full story on SO:
> > http://stackoverflow.com/questions/39521691/kafka-
> authentication-producer-
> > unable-to-connect-producer
> >
> > Bottom line is, when I start the server in SASL_PLAINTEXT mode, the below
> > exception keeps popping up in the logs. The first issue is that you see
> it
> > only when you change log level to DEBUG, while in reality the server
> isn't
> > in a functioning state. Should the error be printed at error level?
> >
> > Now, the real issue is I don't understand why this is happening. It seems
> > the server is connecting to itself and trying to authenticate against
> > itself and failing to do so. What is wrong in my configuration?
> >
> > In  server.properties, I have:
> >
> > isteners=SASL_PLAINTEXT://0.0.0.0:9092
> > security.inter.broker.protocol=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=PLAIN
> > sasl.enabled.mechanisms=PLAIN
> >
> > Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.
> >
> > I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf. And
> the
> > content of kafka_client_jaas.conf is:
> >
> > KafkaServer {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret"
> >user_admin="alice-secret"
> >user_alice="alice-secret";
> > };
> >
> > No client is up. The only things I have up are ZK and the Kafka server.
> > Here is the stack trace:
> >
> > 2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection to
> node
> > 0 at 0.0.0.0:9092.
> > 2016-09-15 22:06:09 DEBUG Acceptor:52 - Accepted connection from /
> > 127.0.0.1
> > on /127.0.1.1:9092. sendBufferSize [actual|requested]: [102400|102400]
> > recvBufferSize [actual|requested]: [102400|102400]
> > 2016-09-15 22:06:09 DEBUG Processor:52 - Processor 0 listening to new
> > connection from /127.0.0.1:59669
> > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL client
> > state to SEND_HANDSHAKE_REQUEST
> > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:133 - Creating
> > SaslC

Re: SASL_PLAINTEXT Authentication/Connection failure

2016-09-16 Thread Rajini Sivaram
Max,

You need to use the new consumer since the old consumer does not support
security features. For console-consumer, you need to add the option
--new-consumer.

On Fri, Sep 16, 2016 at 10:14 AM, Max Bridgewater  wrote:

> Thanks Rajini. That was the issue. Now I am facing another one. I am not
> sure why my consumer is trying to use the topic in PLAINTEXT. The consumer
> config is:
>
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=PLAIN
>
>
> KAFKA_OPTS is set to /home/kafka/kafka_client_jaas.conf. I can confirm
> that
> this file is being read because if I change the file name to something
> non-existing, I get file not found exception.
>
> The content of this jaas file:
>
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="alice"
>   password="alice-secret";
> };
>
>
> I launch the consumer with:
> bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test3
> --from-beginning --consumer.config=config/consumer.properties
>
> The server config:
>
> listeners=SASL_PLAINTEXT://localhost:9092
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
>
> The producer config:
>
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=PLAIN
>
> Now, when I launch the consumer, I get following error:
>
> [2016-09-16 05:09:11,908] WARN
> [test-consumer-group_pascalvm-1474016950388-699882ba-leader-
> finder-thread],
> Failed to find leader for Set([test3,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.BrokerEndPointNotAvailableException: End point with security
> protocol PLAINTEXT not found for broker 0
> at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
> at
> kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> l$1.apply(ZkUtils.scala:166)
> at
> kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> l$1.apply(ZkUtils.scala:166)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
>
> What am I missing?
>
>
>
>
> On Fri, Sep 16, 2016 at 3:57 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Max,
> >
> > I think there is a typo in your configuration. You intended admin
> password
> > to be admin-secret?
> >
> > KafkaServer {
> >org.apache.kafka.common.security.plain.PlainLoginModule required
> >username="admin"
> >password="admin-secret"
> >user_admin="alice-secret"  *=> Change to **"admin-secret"*
> >user_alice="alice-secret";
> > };
> >
> >
> > Since your inter-broker security protocol is SASL_PLAINTEXT, the
> controller
> > uses SASL with the username "admin" and that connection is failing since
> > the server thinks the expected password is "alice-secret".
> >
> >
> >
> > On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater <
> > max.bridgewa...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
> > > successful. I posted the full story on SO:
> > > http://stackoverflow.com/questions/39521691/kafka-
> > authentication-producer-
> > > unable-to-connect-producer
> > >
> > > Bottom line is, when I start the server in SASL_PLAINTEXT mode, the
> below
> > > exception keeps popping up in the logs. The first issue is that you see
> > it
> > > only when you change log level to DEBUG, while in reality the server
> > isn't
> > > in a functioning state. Should the error be printed at error level?
> > >
> > > Now, the real issue is I don't understand why this is happening. It
> seems
> > > the server is connecting to itself and trying to authenticate against
> > > itself and failing to do so. What is wrong in my configuration?
> > >
> > > In  server.properties, I have:
> > >
> > > isteners=SASL_PLAINTEXT://0.0.0.0:9092
> > > security.inter.broker.protocol=SASL_PLAINTEXT
> > > sasl.mechanism.inter.broker.protocol=PLAIN
> > > sasl.enabled.mechanisms=PLAIN
> > >
> > > Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.
> > >
> > > I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf. And
> > the
> > > content of kafka_client_jaas.conf is:
> > >
> > > KafkaServer {
> > >org.apache.kafka.common.security.plain.PlainLoginModule required
> > >username="admin"
> > >password="admin-secret"
> > >user_admin="alice-secret"
> > >user_alice="alice-secret";
> > > };
> > >
> > > No client is up. The only things I have up are ZK and the Kafka server.
> > > Here is the stack trace:
> > >
> > > 2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection to
> > node
> > > 0 at 0.0.0.0:9092.
> > > 2016-09

Re: SASL_PLAINTEXT Authentication/Connection failure

2016-09-16 Thread Max Bridgewater
Awesome. After adding --new-consumer and --bootstrap-server, I got it
working.

Thanks a lot.

On Fri, Sep 16, 2016 at 5:33 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Max,
>
> You need to use the new consumer since the old consumer does not support
> security features. For console-consumer, you need to add the option
> --new-consumer.
>
> On Fri, Sep 16, 2016 at 10:14 AM, Max Bridgewater <
> max.bridgewa...@gmail.com
> > wrote:
>
> > Thanks Rajini. That was the issue. Now I am facing another one. I am not
> > sure why my consumer is trying to use the topic in PLAINTEXT. The
> consumer
> > config is:
> >
> > security.protocol=SASL_PLAINTEXT
> > sasl.mechanism=PLAIN
> >
> >
> > KAFKA_OPTS is set to /home/kafka/kafka_client_jaas.conf. I can confirm
> > that
> > this file is being read because if I change the file name to something
> > non-existing, I get file not found exception.
> >
> > The content of this jaas file:
> >
> > KafkaClient {
> >   org.apache.kafka.common.security.plain.PlainLoginModule required
> >   username="alice"
> >   password="alice-secret";
> > };
> >
> >
> > I launch the consumer with:
> > bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test3
> > --from-beginning --consumer.config=config/consumer.properties
> >
> > The server config:
> >
> > listeners=SASL_PLAINTEXT://localhost:9092
> > security.inter.broker.protocol=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=PLAIN
> > sasl.enabled.mechanisms=PLAIN
> >
> > The producer config:
> >
> > security.protocol=SASL_PLAINTEXT
> > sasl.mechanism=PLAIN
> >
> > Now, when I launch the consumer, I get following error:
> >
> > [2016-09-16 05:09:11,908] WARN
> > [test-consumer-group_pascalvm-1474016950388-699882ba-leader-
> > finder-thread],
> > Failed to find leader for Set([test3,0])
> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > kafka.common.BrokerEndPointNotAvailableException: End point with
> security
> > protocol PLAINTEXT not found for broker 0
> > at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> > at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
> > at
> > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> > l$1.apply(ZkUtils.scala:166)
> > at
> > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> > l$1.apply(ZkUtils.scala:166)
> > at
> > scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:244)
> > at
> > scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:244)
> >
> > What am I missing?
> >
> >
> >
> >
> > On Fri, Sep 16, 2016 at 3:57 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > Max,
> > >
> > > I think there is a typo in your configuration. You intended admin
> > password
> > > to be admin-secret?
> > >
> > > KafkaServer {
> > >org.apache.kafka.common.security.plain.PlainLoginModule required
> > >username="admin"
> > >password="admin-secret"
> > >user_admin="alice-secret"  *=> Change to **"admin-secret"*
> > >user_alice="alice-secret";
> > > };
> > >
> > >
> > > Since your inter-broker security protocol is SASL_PLAINTEXT, the
> > controller
> > > uses SASL with the username "admin" and that connection is failing
> since
> > > the server thinks the expected password is "alice-secret".
> > >
> > >
> > >
> > > On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater <
> > > max.bridgewa...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
> > > > successful. I posted the full story on SO:
> > > > http://stackoverflow.com/questions/39521691/kafka-
> > > authentication-producer-
> > > > unable-to-connect-producer
> > > >
> > > > Bottom line is, when I start the server in SASL_PLAINTEXT mode, the
> > below
> > > > exception keeps popping up in the logs. The first issue is that you
> see
> > > it
> > > > only when you change log level to DEBUG, while in reality the server
> > > isn't
> > > > in a functioning state. Should the error be printed at error level?
> > > >
> > > > Now, the real issue is I don't understand why this is happening. It
> > seems
> > > > the server is connecting to itself and trying to authenticate against
> > > > itself and failing to do so. What is wrong in my configuration?
> > > >
> > > > In  server.properties, I have:
> > > >
> > > > isteners=SASL_PLAINTEXT://0.0.0.0:9092
> > > > security.inter.broker.protocol=SASL_PLAINTEXT
> > > > sasl.mechanism.inter.broker.protocol=PLAIN
> > > > sasl.enabled.mechanisms=PLAIN
> > > >
> > > > Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.
> > > >
> > > > I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf.
> And
> > > the
> > > > content of kafka_cl

Re: Publish to 1 topic, consume from N

2016-09-16 Thread Luiz Cordeiro
Hello Marko,

The difference is that the publisher has to know before hand to which queues to 
publish. With the AMQ model, you have a more decoupled model: The publisher 
only know the Exchange, and the consumers register themselves without the 
producer knowing about them. Therefore, let’s say you want to have a new 
service that will consume existing events. With the AMQ abstraction, you don’t 
have to change the producer whatsoever, you just bind a new consumer to the 
exchange. Without this decoupling, you’d have to modify the producer to publish 
to the new queues (or, at least, have the list of queues to publish to in a 
configuration file and add the new one to it, should you design the application 
with expandability in mind).

For small applications that is not a big hurdle, but when you have several 
applications, written by different teams, with different update schedules and 
roadmaps, it’s good to have this decoupling.

But like Ali said, the way to achieve such a decoupling with Kafka would be to 
have a separate service doing the mapping.

I just asked because someone else could have done such a mapper and could have 
open-sourced it, so I wouldn’t have to reinvent the wheel.

I hope the scenario is more clear now.


> Am 15.09.2016 um 19:49 schrieb Marko Bonaći :
> 
> 1. You can create N topics
> 2. You control from producer where each message goes
> 3. You have consumer that fetches from M different topics:
> https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)
> 
> Isn't this architecture flexible enough for any type of use case? What do
> you think cannot be achieved?
> 
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
> 
> On Thu, Sep 15, 2016 at 11:01 AM, Ali Akhtar  wrote:
> 
>> It sounds like you can implement the 'mapping service'  component yourself
>> using Kafka.
>> 
>> Have all of your messages go to one kafka topic. Have one consumer group
>> listening to this 'everything goes here' topic. This consumer group acts as
>> your mapping service. It looks at each message, and based on your rules, it
>> sends that message to a different topic for those specific rules.
>> 
>> Then you have your consumers listening to the specific topics that they
>> need to. Your mapping service does the job of redirecting messages from the
>> 'everything' topic to the specific topics based on your rules.
>> 
>> On Thu, Sep 15, 2016 at 1:43 PM, Luiz Cordeiro <
>> luiz.corde...@mobilityhouse.com> wrote:
>> 
>>> Hello,
>>> 
>>> We’re considering migrating an AMQ-based platform to Kafka. However our
>>> application logic needs an AMQ feature called Dynamic Binding, that is,
>> on
>>> AMQ one publishes messages to an Exchange, which can be dynamically
>>> configured to deliver a copy of the message to several queues, based on
>>> binding rules. So when a new client comes alive, it may create its
>> binding
>>> rules to specify a set of topics to listen to, and receive all the
>> messages
>>> from these topics on a private queue.
>>> 
>>> I understand that Kafka neither provides this nor will, as it is not its
>>> objective, but I was wondering if there’s another component, an overlay
>> to
>>> Kafka, that could provide this feature while using Kafka behind the
>> scenes
>>> for the persistence, something like this:
>>> 
>>> Publisher --> Mapping Service --> Kafka <-- Consumers
>>> ^  |
>>> |   Binding rules  |
>>> \--/
>>> 
>>> Are you aware of such a component? Otherwise, how would you solve this
>>> issue of publish to 1 place and have it replicated on N topics.
>>> 
>>> Best Regards,
>>> Luiz
>>> 
>> 



Schema for json converter

2016-09-16 Thread Srikrishna Alla
I am trying to use jdbc connector to send records from Kafka 0.9 to DB. I
am using jsonConverter to convert the records. My connector is failing when
its checking the Schema I am using. Please let me know what is the issue
with my json schema.

Configuration used:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's
setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Record that has been sent to the topic -
{"schema":{"type":"struct","fields":[{"name":"error_code",
"type":"string","optional":"false"},{"name":"error_time","
type":"string","optional":"false"},{"name":"error_msg","
type":"string","optional":"false"},{"name":"source","
type":"string","optional":"false"},{"name":"criticality",
"type":"string","optional":"false"}]},"payload":{"error_
code":"RAW104","error_time":"09/15/2016@18:00:32","error_msg":"Not
accepting","source":"APPLICATION","criticality":"WARN"}}


Error I am seeing:
[2016-09-15 18:01:07,513] ERROR Thread WorkerSinkTask-jdbc-sink-test-0
exiting with uncaught exception:  (org.apache.kafka.connect.
util.ShutdownableThread:84)
*org.apache.kafka.connect.errors.DataException: Struct schema's field name
not specified properly*
   at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(
JsonConverter.java:493)
   at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(
JsonConverter.java:344)
   at org.apache.kafka.connect.json.JsonConverter.toConnectData(
JsonConverter.java:334)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.
convertMessages(WorkerSinkTask.java:266)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
WorkerSinkTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
iteration(WorkerSinkTaskThread.java:90)
   at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
execute(WorkerSinkTaskThread.java:58)
   at org.apache.kafka.connect.util.ShutdownableThread.run(
ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-jdbc-sink-test-0"
*org.apache.kafka.connect.errors.DataException:
Struct schema's field name not specified properly*
   at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(
JsonConverter.java:493)
   at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(
JsonConverter.java:344)
   at org.apache.kafka.connect.json.JsonConverter.toConnectData(
JsonConverter.java:334)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.
convertMessages(WorkerSinkTask.java:266)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
WorkerSinkTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
iteration(WorkerSinkTaskThread.java:90)
   at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
execute(WorkerSinkTaskThread.java:58)
   at org.apache.kafka.connect.util.ShutdownableThread.run(
ShutdownableThread.java:82)

Thanks,
Sri


Slow machine disrupting the cluster

2016-09-16 Thread Gerard Klijs
We just had an interesting issue, luckily this was only on our test cluster.
Because of some reason one of the machines in a cluster became really slow.
Because it was still alive, it stil was the leader for some
topic-partitions. Our mirror maker reads and writes to multiple
topic-partitions on each thread. When committing the offsets this will fail
for the topic-partitions located on the slow machine, because the consumers
have timed out. The data for these topic-partitions will be send over and
over, causing a flood of duplicate messages.
What would be the best way to prevent this in the future. Is there some way
the broker could notice it's performing poorly and shut's off for example?


Error kafka-stream method punctuate in context.forward()

2016-09-16 Thread Hamza HACHANI
Good morning,

I have a problem with a kafka-stream application.

In fact I 've created already two kafka stream applications :

StatsByMinute : entry topic : uplinks, out topic : statsM.

StatsByHour : entrey topic : statsM, out topic : statsH.

StatsByDay : entry topic : statsH, out topic : statsD.



The three of these application hava naerly the same struture ( StatsByMinute 
and StatsBy Hour/Stats By Day are only different in the application ID KVstore 
and the mthos process() ).

StatsBy Day and Stats BY Hour have exactly the same structure (the only 
exception is the ID parameters) .


The Problem is that stastByMinute and StatsByHour works parfectly.

But this this not the case for StatsByDay where i verified that i do receive 
data and process it (so process works). but in the line context.forward in 
punctuate  there is a problem.

I get the following error :


[2016-09-16 15:44:24,467] ERROR Streams application error during processing in 
thread [StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54)
at 
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227)
at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Exception in thread "StreamThread-1" java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54)
at 
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227)
at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)




Re: Slow machine disrupting the cluster

2016-09-16 Thread David Garcia
To remediate, you could start another broker, rebalance, and then shut down the 
busted broker.  But, you really should put some monitoring on your system (to 
help diagnose the actual problem).  Datadog has a pretty good set of articles 
for using jmx to do this: 
https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/

There are lots of jmx metrics gathering tools too…such as jmxtrans: 
https://github.com/jmxtrans/jmxtrans


confluent also offers tooling (such as command center) to help with monitoring.


As far as mirror maker goes, you can play with the consumer/producer timeout 
settings to make sure the process waits long enough for a slow machine.

-David

On 9/16/16, 7:11 AM, "Gerard Klijs"  wrote:

We just had an interesting issue, luckily this was only on our test cluster.
Because of some reason one of the machines in a cluster became really slow.
Because it was still alive, it stil was the leader for some
topic-partitions. Our mirror maker reads and writes to multiple
topic-partitions on each thread. When committing the offsets this will fail
for the topic-partitions located on the slow machine, because the consumers
have timed out. The data for these topic-partitions will be send over and
over, causing a flood of duplicate messages.
What would be the best way to prevent this in the future. Is there some way
the broker could notice it's performing poorly and shut's off for example?




Slow 'rolled new log segment' ?

2016-09-16 Thread Stephen Powis
Hey!

I'm trying to determine why I had a weird hiccup in publishing to my Kafka
cluster.  I'm running Kafka 0.10.0.1.  Here's a graph of my producer times:



You can see at around ~10:08 we had a large spike in kafka's internal
publishing, waiting for the leader.  Our producers are configured to
timeout write requests after 150ms.  So during this bump, lots of producers
timed out.

I first dug into my GC logs, and see nothing relevant or taking over 0.00X
seconds around the same time frame.

Digging thru server logs on the leader, at roughly the same time, I've
found the following:

[2016-09-16 10:08:02,508] INFO Rolled new log segment for 'EventStream-27'
> in 743 ms. (kafka.log.Log)
> [2016-09-16 10:08:34,727] INFO Rolled new log segment for 'EventStream-33'
> in 16 ms. (kafka.log.Log)
> [2016-09-16 10:09:01,485] INFO Rolled new log segment for 'EventStream-15'
> in 14 ms. (kafka.log.Log)
>

Reviewing logs going backwards and forwards, it seems like most new log
segments take ~15ms or less.

Curious if anyone knows any other potential causes or things to review that
might cause this issue?  Do you guys think the new log segment could cause
this issue?  Any ideas what would cause this new segment to take so long to
get created?

Thanks for the help!
Stephen


Re: Lost message in asynchronous kafka producer

2016-09-16 Thread Damian Guy
You need to use one of the options you've already outlined. Without this
you are just doing fire-and-forget.

On Fri, 16 Sep 2016 at 09:25 Agostino Calamita 
wrote:

> Hi,
> I have 2 brokers with a topic with replication factor = 2.
> Brokers are configured with min.insync.replicas=2.
>
> I use a producer in asynchronous mode to send 10 messages.
>
> After some seconds after producer start, I stop one broker.
>
> On producer side I got no exceptions, so for producer application is all
> OK, but in broker log message I see
>  ERROR [Replica Manager on Broker 0]: Error processing append operation on
> partition diameternew-10 (kafka.server.ReplicaManager)
>
> So I lost all messages after broker shutdown.
>
> Is there a way to catch as soon as possible this kind of exeption ?
>
> I know that I can use
>
>  m_kafkaProducer.send(prMessage).get();
>
> or
>
> Future _future = m_kafkaProducer.send(prMessage);
>
> or callback in send method, but they are very slow.
>
>
> Thank.
>


java.nio.channels.ClosedChannelException in console-consumer.sh

2016-09-16 Thread Ali Akhtar
I've created a 3 broker kafka cluster, changing only the config values for
broker id, log.dirs, and zookeeper connect. I left the remaining fields as
default.

The broker ids are 1, 2, 3. I opened the port 9092 on AWS.

I then created a topic 'test' with replication factor of 2, and 3
partitions.

When I describe the topic using kafka-topics.sh --describe , it shows:

Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2

So it looks like the 3 brokers have successfully connected to each other.

I then tried running

bin/kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic
 test --from-beginning

But it began to give me a lot of the following exceptions:

 WARN Fetctestng topic metadata with correlation id 1 for topics
[Set(test)] from broker [BrokerEndPoint(1,kafka1-1876849043-91zso,9092)]
failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-16 17:22:11,323] WARN Fetctestng topic metadata with correlation
id 2 for topics [Set(test)] from broker
[BrokerEndPoint(3,kafka3-2571399577-96he4,9092)] failed
(kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Likewise when I run kafka-console-producer.sh , I see errors like:

[2016-09-16 17:24:42,901] WARN Error while fetching metadata with
correlation id 1343 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-09-16 17:24:43,012] WARN Error while fetching metadata with
correlation id 1344 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-09-16 17:24:43,127] WARN Error while fetching metadata with
correlation id 1345 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)

Any ideas what the problem here is?

I'm using kafka_2.11-0.10.0.1



Re: java.nio.channels.ClosedChannelException in console-consumer.sh

2016-09-16 Thread Ali Akhtar
Some googling indicates that there are issues on AWS / EC2 when using the
private IP, and its recommended to use the public ip as the advertised
hostname instead.

I have zookeeper and Kafka both running on EC2, and both are in the same
availability zone, so both should be able to talk to each other using the
private IPs.

Shouldn't that be enough? I don't want to expose kafka publicly.

On Fri, Sep 16, 2016 at 10:48 PM, Ali Akhtar  wrote:

> I've created a 3 broker kafka cluster, changing only the config values for
> broker id, log.dirs, and zookeeper connect. I left the remaining fields as
> default.
>
> The broker ids are 1, 2, 3. I opened the port 9092 on AWS.
>
> I then created a topic 'test' with replication factor of 2, and 3
> partitions.
>
> When I describe the topic using kafka-topics.sh --describe , it shows:
>
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
> Topic: test Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
> Topic: test Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
>
> So it looks like the 3 brokers have successfully connected to each other.
>
> I then tried running
>
> bin/kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic
>  test --from-beginning
>
> But it began to give me a lot of the following exceptions:
>
>  WARN Fetctestng topic metadata with correlation id 1 for topics
> [Set(test)] from broker [BrokerEndPoint(1,kafka1-1876849043-91zso,9092)]
> failed (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> doSend(SyncProducer.scala:79)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
> at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(
> ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-16 17:22:11,323] WARN Fetctestng topic metadata with correlation
> id 2 for topics [Set(test)] from broker 
> [BrokerEndPoint(3,kafka3-2571399577-96he4,9092)]
> failed (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> doSend(SyncProducer.scala:79)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
> at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(
> ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
> Likewise when I run kafka-console-producer.sh , I see errors like:
>
> [2016-09-16 17:24:42,901] WARN Error while fetching metadata with
> correlation id 1343 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
> [2016-09-16 17:24:43,012] WARN Error while fetching metadata with
> correlation id 1344 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
> [2016-09-16 17:24:43,127] WARN Error while fetching metadata with
> correlation id 1345 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
>
> Any ideas what the problem here is?
>
> I'm using kafka_2.11-0.10.0.1
> 
>
>
>
>


Re: Exception while deserializing in kafka streams

2016-09-16 Thread Walter rakoff
Guozhang,

Any clues on this one?

Walter

On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff 
wrote:

> Guozhang,
>
> I am using 0.10.0.0. Could the below log be the cause?
>
> 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> schema.registry.url = http://192.168.50.6: 8081
> was supplied but isn't a known config.
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>
> Walter
>
>
> On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang  wrote:
>
>> Hello Walter,
>>
>> Which version of Kafka were you using?
>>
>> I ask this because there was a bug causing the serde passed through config
>> to NOT being configured when constructed:
>> https://issues.apache.org/jira/browse/KAFKA-3639
>>
>>
>> Which is fixed in the 0.10.0.0 release, which means you will only hit it
>> if
>> you are using the tech-preview release version.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
>> wrote:
>>
>> > Hello,
>> >
>> > I get the below exception when deserilaizing avro records
>> > using KafkaAvroDeserializer.
>> >
>> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
>> > [StreamThread-1]
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.common.errors.SerializationException:
>> > Error deserializing Avro message for id 4
>> > Caused by: java.lang.NullPointerException
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:120)
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:92)
>> >
>> > I can confirm that schema registry URL is accessible and
>> url/schemas/ids/4
>> > does return valid schema.
>> > May be some initialization didn't happen correctly?
>> >
>> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> "
>> > 192.168.50.6:8081")
>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.Long.getClass.getName)
>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
>> > GenericAvroSerdeWithSchemaRegistry])
>> >
>> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
>> > y471k9nj94tlxro/avro_serde.txt?dl=0
>> >
>> > Walter
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: Performance issue with KafkaStreams

2016-09-16 Thread Caleb Welton
Is there a specific way that I need to build kafka for that to work?

bash$  export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
org.apache.kafka.streams.perf.SimpleBenchmark
Error: Could not find or load main class
org.apache.kafka.streams.perf.SimpleBenchmark

bash$ find . -name SimpleBenchmark.java
./streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java

bash$  find . -name SimpleBenchmark.class

bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar | grep
SimpleBenchmark



On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska 
wrote:

> Hi Caleb,
>
> We have a benchmark that we run nightly to keep track of performance. The
> numbers we have do indicate that consuming through streams is indeed slower
> than just a pure consumer, however the performance difference is not as
> large as you are observing. Would it be possible for you to run this
> benchmark in your environment and report the numbers? The benchmark is
> included with Kafka Streams and you run it like this:
>
> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> org.apache.kafka.streams.perf.SimpleBenchmark
>
> You'll need a Kafka broker to be running. The benchmark will report
> various numbers, but one of them is the performance of the consumer and the
> performance of streams reading.
>
> Thanks
> Eno
>
> > On 9 Sep 2016, at 18:19, Caleb Welton 
> wrote:
> >
> > Same in both cases:
> > client.id=Test-Prototype
> > application.id=test-prototype
> >
> > group.id=test-consumer-group
> > bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> > replication.factor=2
> >
> > auto.offset.reset=earliest
> >
> >
> > On Friday, September 9, 2016 8:48 AM, Eno Thereska <
> eno.there...@gmail.com> wrote:
> >
> >
> >
> > Hi Caleb,
> >
> > Could you share your Kafka Streams configuration (i.e., StreamsConfig
> > properties you might have set before the test)?
> >
> > Thanks
> > Eno
> >
> >
> > On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton 
> wrote:
> >
> >> I have a question with respect to the KafkaStreams API.
> >>
> >> I noticed during my prototyping work that my KafkaStreams application
> was
> >> not able to keep up with the input on the stream so I dug into it a bit
> and
> >> found that it was spending an inordinate amount of time in
> >> org.apache.kafka.common.network.Seloctor.select().  Not exactly a
> shooting
> >> gun itself, so I dropped the implementation down to a single processor
> >> reading off a source.
> >>
> >> public class TestProcessor extends AbstractProcessor {
> >>static long start = -1;
> >>static long count = 0;
> >>
> >>@Override
> >>public void process(String key, String value) {
> >>if (start < 0) {
> >>start = System.currentTimeMillis();
> >>}
> >>count += 1;
> >>if (count > 100) {
> >>long end = System.currentTimeMillis();
> >>double time = (end-start)/1000.0;
> >>System.out.printf("Processed %d records in %f seconds (%f
> >> records/s)\n", count, time, count/time);
> >>start = -1;
> >>count = 0;
> >>}
> >>
> >>}
> >>
> >> }
> >>
> >> ...
> >>
> >>
> >> TopologyBuilder topologyBuilder = new TopologyBuilder();
> >> topologyBuilder
> >>.addSource("SOURCE", stringDeserializer, StringDeserializer,
> >> "input")
> >>.addProcessor("PROCESS", TestProcessor::new, "SOURCE");
> >>
> >>
> >>
> >> Which I then ran through the KafkaStreams API, and then repeated with
> >> the KafkaConsumer API.
> >>
> >> Using the KafkaConsumer API:
> >> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> >> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> >> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> >> Processed 101 records in 1.19 seconds (840336.974790 records/s)
> >>
> >> Using the KafkaStreams API:
> >> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> >> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> >> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> >> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
> >>
> >>
> >> The profile on the KafkaStreams consisted of:
> >>
> >> 89.2% org.apache.kafka.common.network.Selector.select()
> >> 7.6% org.apache.kafka.clients.producer.internals.
> >> ProduceRequestResult.await()
> >> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
> >>
> >>
> >> Is a 5X performance difference between Kafka Consumer and the
> >> KafkaStreams api expected?
> >>
> >> Are there specific things I can do to diagnose/tune the system?
> >>
> >> Thanks,
> >>  Caleb
> >>
>
>


Re: Time of derived records in Kafka Streams

2016-09-16 Thread Elias Levy
On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereska 
wrote:

>
> For aggregations, the timestamp will be that of the latest record being
> aggregated.
>

How does that account for out of order records?

What about kstream-kstream joins?  The output from the join could be
triggered by a record received from either stream depending on the order
they are received and processed.  If the timestamp of the output is just
the timestamp of the latest received record, then it seems that the
timestamp could be that of either record.  Although I suppose that the best
effort stream synchronization effort that Kafka Streams attempts means that
usually the timestamp will be that of the later record.


Upgrading from 0.8 to 0.10

2016-09-16 Thread Vadim Keylis
We are planning to upgrade from 0.8 to 0.10. I am reading about possible
performance impact of the upgrade. Do the performance hit due to
conversation to 0.8 format or its related to something else?  Would someone
suggest best way to measure the performance hit?

Thanks so much in advance,
Vadim


any update on this?

2016-09-16 Thread kant kodali

https://issues.apache.org/jira/browse/KAFKA-1793
It would be great to use Consul instead of Zookeeper for Kafka and I think it
would benefit Kafka a lot from the exponentially growing consul community.