答复: kafka controller setting for detecting broker failure and re-electing a new leader for partitions?

2018-01-25 Thread Hu Xi
Yu Yang,


There does exist a broker-side config named 'controller.socket.timeout.ms'. 
Decrease it to a reasonably smaller value might be a help but please use it 
with caution.


发件人: Yu Yang 
发送时间: 2018年1月25日 15:42
收件人: users@kafka.apache.org
主题: kafka controller setting for detecting broker failure and re-electing a new 
leader for partitions?

Hi everyone,

Recently we had a cluster in which the controller failed to connect to a
broker A for an extended period of time.  I had expected that the
controller would identify the broker as a failed broker, and re-elect
another broker as the leader for partitions that were hosted on broker A.
However, this did not happen in that cluster. What happened was that broker
A was still considered as the leader for some partitions, and those
partitions are marked as under replicated partitions. Is there any
configuration setting in kafka to speed up the broker failure detection?


2018-01-24 14:13:57,132] WARN [Controller-37-to-broker-4-send-thread],
Controller 37's connection to broker testkafka04:9092 (id: 4 rack: null)
was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 3 ms
at
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:231)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Thanks!

Regards,
-Yu


Cater to processing longer than max.poll.interval.ms

2018-01-25 Thread Sameer Kumar
I have a scneario, let say due to GC or any other issue, my consumer takes
longer than max.poll.interval.ms to process data, what is the alternative
for preventing the consumer to be marked dead and not shun it out of the
consumer group.

Though the consumer has not died and session.timeout.ms is being sent at
regular intervals in this case.

-Sameer.


Best practices Partition Key

2018-01-25 Thread Maria Pilar
Hi everyone,

I´m trying to understand the best practice to define the partition key. I
have defined some topics that they are related with entities in cassandra
data model, the relationship is one-to-one, one entity - one topic, because
I need to ensure the properly ordering in the events. I have created one
partition for each topic to ensure it as well.

If I will use kafka like a datastore and search throgh the records, I know
that could be a best practice use the partition key of Cassandra (e.g
Customer ID) as a partition key in kafka

any comment please ??

thanks


Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Dmitry Minkovsky
You may not be surprised that after further investigation it turns out this
was related to some logic in my topology.

On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky 
wrote:

> Hi Gouzhang,
>
> Here it is:
>
> topology.stream(MAILBOX_OPERATION_REQUESTS,
> Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
>   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
>   .groupByKey(Serialized.with(byteStringSerde,
> mailboxOperationRequestSerde))
>   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> mailboxOperationRequestSerde));
>
>
>
>
> On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang  wrote:
>
>> Dmitry,
>>
>> For your topology it is not expected to happen, could you elaborate a bit
>> more on your code snippet as well as the input data? Is there a good way
>> to
>> re-produce it?
>>
>>
>> Guozhang
>>
>>
>> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky 
>> wrote:
>>
>> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group by
>> ->
>> > reduce. It emits duplicate key/value/timestamps (i.e. total duplicates).
>> >
>> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky > >
>> > wrote:
>> >
>> > > Can someone explain what is causing this? I am experiencing this too.
>> My
>> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` are
>> at
>> > > their default values, so quite substantial. I tried raising them but
>> it
>> > had
>> > > no effect.
>> > >
>> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski 
>> > wrote:
>> > >
>> > >> Hi
>> > >> I run an app where I transform KTable to stream and then I groupBy
>> and
>> > >> aggregate and capture the results in KTable again. That generates
>> many
>> > >> duplicates.
>> > >>
>> > >> I have played with exactly once semantics that seems to reduce
>> > duplicates
>> > >> for records that should be unique. But I still get duplicates on keys
>> > that
>> > >> have two or more records.
>> > >>
>> > >> I could not reproduce it on small number of records so I disable
>> caching
>> > >> by
>> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
>> > loads
>> > >> of duplicates, even these previously eliminated by exactly once
>> > semantics.
>> > >> Now I have hard time to enable it again on Confluent 3.3.
>> > >>
>> > >> But, generally what it the best deduplication strategy for Kafka
>> > Streams?
>> > >>
>> > >> Artur
>> > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: Best practices Partition Key

2018-01-25 Thread Dmitry Minkovsky
> one entity - one topic, because I need to ensure the properly ordering in
the events.

This is a great in insight. I discovered that keeping entity-related things
on one topic is much easier than splitting entity-related things onto
multiple topics. If you have one topic, replaying that topic is trivial. If
you have multiple topics, replaying those topics requires careful
synchronization. In my case, I am doing event capture and I have
entity-related events on multiple topics. For example, for a user entity I
have topics `join-requests` and `settings-update-requests`. Having separate
topics is superficially nicer in terms of consuming them with Kafka
Streams: you can set up topic-specific serdes. But the benefit you get from
this is dwarfed by the complexity of then having to synchronize these two
streams if you want to replay them. Your situation seems simpler though
because you are not even doing event capture, but just logging complete
entities out of Cassandra.

> If I will use kafka like a datastore and search throgh the records,

Interactive Queries API makes this very nice.

On Thu, Jan 25, 2018 at 8:47 AM, Maria Pilar  wrote:

> Hi everyone,
>
> I´m trying to understand the best practice to define the partition key. I
> have defined some topics that they are related with entities in cassandra
> data model, the relationship is one-to-one, one entity - one topic, because
> I need to ensure the properly ordering in the events. I have created one
> partition for each topic to ensure it as well.
>
> If I will use kafka like a datastore and search throgh the records, I know
> that could be a best practice use the partition key of Cassandra (e.g
> Customer ID) as a partition key in kafka
>
> any comment please ??
>
> thanks
>


Re: Best practices Partition Key

2018-01-25 Thread Dmitry Minkovsky
> I know that could be a best practice use the partition key of Cassandra
(e.g Customer ID) as a partition key in kafka

Yeah, the Kafka Producer will hash that key with murmur so all entities
coming out of cassandra with the same partition key will end up on the same
kafka partition. Then you can use Kafka Streams Interactive Queries to get
data..

On Thu, Jan 25, 2018 at 10:02 AM, Dmitry Minkovsky 
wrote:

> > one entity - one topic, because I need to ensure the properly ordering
> in the events.
>
> This is a great in insight. I discovered that keeping entity-related
> things on one topic is much easier than splitting entity-related things
> onto multiple topics. If you have one topic, replaying that topic is
> trivial. If you have multiple topics, replaying those topics requires
> careful synchronization. In my case, I am doing event capture and I have
> entity-related events on multiple topics. For example, for a user entity I
> have topics `join-requests` and `settings-update-requests`. Having separate
> topics is superficially nicer in terms of consuming them with Kafka
> Streams: you can set up topic-specific serdes. But the benefit you get from
> this is dwarfed by the complexity of then having to synchronize these two
> streams if you want to replay them. Your situation seems simpler though
> because you are not even doing event capture, but just logging complete
> entities out of Cassandra.
>
> > If I will use kafka like a datastore and search throgh the records,
>
> Interactive Queries API makes this very nice.
>
> On Thu, Jan 25, 2018 at 8:47 AM, Maria Pilar  wrote:
>
>> Hi everyone,
>>
>> I´m trying to understand the best practice to define the partition key. I
>> have defined some topics that they are related with entities in cassandra
>> data model, the relationship is one-to-one, one entity - one topic,
>> because
>> I need to ensure the properly ordering in the events. I have created one
>> partition for each topic to ensure it as well.
>>
>> If I will use kafka like a datastore and search throgh the records, I know
>> that could be a best practice use the partition key of Cassandra (e.g
>> Customer ID) as a partition key in kafka
>>
>> any comment please ??
>>
>> thanks
>>
>
>


Re: Best practices Partition Key

2018-01-25 Thread Maria Pilar
Yes, I´m capturing different events from the same entity/resource (create,
update and delete) for that reason I´ve choosen that options however my
question is if i can improve my solution if I want to use kafka as
datastore including the partition key of cassandra for each entity as
partition key of kafka.

On 25 January 2018 at 16:02, Dmitry Minkovsky  wrote:

> > one entity - one topic, because I need to ensure the properly ordering in
> the events.
>
> This is a great in insight. I discovered that keeping entity-related things
> on one topic is much easier than splitting entity-related things onto
> multiple topics. If you have one topic, replaying that topic is trivial. If
> you have multiple topics, replaying those topics requires careful
> synchronization. In my case, I am doing event capture and I have
> entity-related events on multiple topics. For example, for a user entity I
> have topics `join-requests` and `settings-update-requests`. Having separate
> topics is superficially nicer in terms of consuming them with Kafka
> Streams: you can set up topic-specific serdes. But the benefit you get from
> this is dwarfed by the complexity of then having to synchronize these two
> streams if you want to replay them. Your situation seems simpler though
> because you are not even doing event capture, but just logging complete
> entities out of Cassandra.
>
> > If I will use kafka like a datastore and search throgh the records,
>
> Interactive Queries API makes this very nice.
>
> On Thu, Jan 25, 2018 at 8:47 AM, Maria Pilar  wrote:
>
> > Hi everyone,
> >
> > I´m trying to understand the best practice to define the partition key. I
> > have defined some topics that they are related with entities in cassandra
> > data model, the relationship is one-to-one, one entity - one topic,
> because
> > I need to ensure the properly ordering in the events. I have created one
> > partition for each topic to ensure it as well.
> >
> > If I will use kafka like a datastore and search throgh the records, I
> know
> > that could be a best practice use the partition key of Cassandra (e.g
> > Customer ID) as a partition key in kafka
> >
> > any comment please ??
> >
> > thanks
> >
>


Re: 答复: kafka controller setting for detecting broker failure and re-electing a new leader for partitions?

2018-01-25 Thread Yu Yang
Thanks for the reply, Xi! The default value of 'controller.socket.timeout.ms'
is 3. That is 30 seconds. What we have observed was that the controller
would not assign another replica as the leader, even if it failed to send
updated topic metadata information too the problematic broker for >30
minutes. Reducing controller.socket.timeout.ms will not help.

Based on the current kaka implementation, when such an exception is raised
up, ControllerChannelManager will catch the exception and keep retrying.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerChannelManager.scala#L222


On Thu, Jan 25, 2018 at 12:02 AM, Hu Xi  wrote:

> Yu Yang,
>
>
> There does exist a broker-side config named 'controller.socket.timeout.ms'.
> Decrease it to a reasonably smaller value might be a help but please use it
> with caution.
>
> 
> 发件人: Yu Yang 
> 发送时间: 2018年1月25日 15:42
> 收件人: users@kafka.apache.org
> 主题: kafka controller setting for detecting broker failure and re-electing
> a new leader for partitions?
>
> Hi everyone,
>
> Recently we had a cluster in which the controller failed to connect to a
> broker A for an extended period of time.  I had expected that the
> controller would identify the broker as a failed broker, and re-elect
> another broker as the leader for partitions that were hosted on broker A.
> However, this did not happen in that cluster. What happened was that broker
> A was still considered as the leader for some partitions, and those
> partitions are marked as under replicated partitions. Is there any
> configuration setting in kafka to speed up the broker failure detection?
>
>
> 2018-01-24 14:13:57,132] WARN [Controller-37-to-broker-4-send-thread],
> Controller 37's connection to broker testkafka04:9092 (id: 4 rack: null)
> was unsuccessful (kafka.controller.RequestSendThread)
> java.net.SocketTimeoutException: Failed to connect within 3 ms
> at
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.
> scala:231)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.
> scala:182)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> scala:181)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
> Thanks!
>
> Regards,
> -Yu
>


Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Guozhang Wang
Hello Dmitry,

What does your distributeMailboxOperation in the flatMap do? Would it
possibly generates multiple records for the follow-up aggregation for each
input?


Guozhang


On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky 
wrote:

> You may not be surprised that after further investigation it turns out this
> was related to some logic in my topology.
>
> On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky 
> wrote:
>
> > Hi Gouzhang,
> >
> > Here it is:
> >
> > topology.stream(MAILBOX_OPERATION_REQUESTS,
> > Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
> >   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
> >   .groupByKey(Serialized.with(byteStringSerde,
> > mailboxOperationRequestSerde))
> >   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> > mailboxOperationRequestSerde));
> >
> >
> >
> >
> > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang 
> wrote:
> >
> >> Dmitry,
> >>
> >> For your topology it is not expected to happen, could you elaborate a
> bit
> >> more on your code snippet as well as the input data? Is there a good way
> >> to
> >> re-produce it?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <
> dminkov...@gmail.com>
> >> wrote:
> >>
> >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group
> by
> >> ->
> >> > reduce. It emits duplicate key/value/timestamps (i.e. total
> duplicates).
> >> >
> >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <
> dminkov...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Can someone explain what is causing this? I am experiencing this
> too.
> >> My
> >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering`
> are
> >> at
> >> > > their default values, so quite substantial. I tried raising them but
> >> it
> >> > had
> >> > > no effect.
> >> > >
> >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski 
> >> > wrote:
> >> > >
> >> > >> Hi
> >> > >> I run an app where I transform KTable to stream and then I groupBy
> >> and
> >> > >> aggregate and capture the results in KTable again. That generates
> >> many
> >> > >> duplicates.
> >> > >>
> >> > >> I have played with exactly once semantics that seems to reduce
> >> > duplicates
> >> > >> for records that should be unique. But I still get duplicates on
> keys
> >> > that
> >> > >> have two or more records.
> >> > >>
> >> > >> I could not reproduce it on small number of records so I disable
> >> caching
> >> > >> by
> >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I
> got
> >> > loads
> >> > >> of duplicates, even these previously eliminated by exactly once
> >> > semantics.
> >> > >> Now I have hard time to enable it again on Confluent 3.3.
> >> > >>
> >> > >> But, generally what it the best deduplication strategy for Kafka
> >> > Streams?
> >> > >>
> >> > >> Artur
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang


Re: Cater to processing longer than max.poll.interval.ms

2018-01-25 Thread R Krishna
Think, new versions have better ways of doing this. In 0.10.2, because
poll() ensure liveness, you can disable auto commits and use consumer
pause() to avoid calling poll() (so brokers may ignore max.poll.interval.ms)
so those partitions are not assigned to other consumers and also handle
ConsumerRebalanceListener onPartitionsAssigned or reduce amount of data
being processed using max.poll.records.
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

On Thu, Jan 25, 2018 at 1:27 AM, Sameer Kumar 
wrote:

> I have a scneario, let say due to GC or any other issue, my consumer takes
> longer than max.poll.interval.ms to process data, what is the alternative
> for preventing the consumer to be marked dead and not shun it out of the
> consumer group.
>
> Though the consumer has not died and session.timeout.ms is being sent at
> regular intervals in this case.
>
> -Sameer.
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Re: Problem with multiple Kafka Streams

2018-01-25 Thread Gustavo Torres
Hello Guozhang:

Actually, you are right. I implemented a custom partitioner to distribute
messages evenly among all partitions and started to see all consumers
working!

Thanks a lot!

Best
Gustavo

2018-01-24 19:40 GMT-02:00 Guozhang Wang :

> Hello Gustavo,
>
> How did you check that the second app's consumers does not consume
> anything? And could you confirm that there are indeed data in these two
> partitions that are fetchable?
>
>
> Guozhang
>
>
> On Wed, Jan 24, 2018 at 8:57 AM, Gustavo Torres 
> wrote:
>
> > Hi there:
> >
> > This is the first time I'm posting on the mailing list. Actually, it's
> the
> > first time I'm working with Kafka and I'm having trouble setting up a
> Kafka
> > Streams app (using Kafka 1.0).
> >
> > I have two instances of my app, each one running a Kafka Stream and both
> > having the same AppID.
> > My topic has 4 partitions and each stream is configured with 2 threads.
> >
> > I verified that both streams have partitions assigned and both states are
> > Running.
> >
> > The problem I'm having is this: the first app runs and starts consuming 2
> > partitions while the second one consumes nothing although its state is
> > Running.
> >
> > I will appreciate any help with this issue since I've trying to solve
> this
> > for days!
> >
> > Best
> > Gustavo
> >
>
>
>
> --
> -- Guozhang
>



-- 
[image: Catho]

*Gustavo Torres*
*Especialista em Inteligência Artificial - SeekAI*

www.catho.com.br


 



Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Dmitry Minkovsky
Hi Gouzhang,

I am sorry to have bothered you, but I figured out the problem and it was
related to logic in my topology. Please disregard the question.

Thank you!
Dmitry

On Thu, Jan 25, 2018 at 2:55 PM, Guozhang Wang  wrote:

> Hello Dmitry,
>
> What does your distributeMailboxOperation in the flatMap do? Would it
> possibly generates multiple records for the follow-up aggregation for each
> input?
>
>
> Guozhang
>
>
> On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky 
> wrote:
>
> > You may not be surprised that after further investigation it turns out
> this
> > was related to some logic in my topology.
> >
> > On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky 
> > wrote:
> >
> > > Hi Gouzhang,
> > >
> > > Here it is:
> > >
> > > topology.stream(MAILBOX_OPERATION_REQUESTS,
> > > Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
> > >   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
> > >   .groupByKey(Serialized.with(byteStringSerde,
> > > mailboxOperationRequestSerde))
> > >   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> > > mailboxOperationRequestSerde));
> > >
> > >
> > >
> > >
> > > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang 
> > wrote:
> > >
> > >> Dmitry,
> > >>
> > >> For your topology it is not expected to happen, could you elaborate a
> > bit
> > >> more on your code snippet as well as the input data? Is there a good
> way
> > >> to
> > >> re-produce it?
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <
> > dminkov...@gmail.com>
> > >> wrote:
> > >>
> > >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group
> > by
> > >> ->
> > >> > reduce. It emits duplicate key/value/timestamps (i.e. total
> > duplicates).
> > >> >
> > >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <
> > dminkov...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Can someone explain what is causing this? I am experiencing this
> > too.
> > >> My
> > >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering`
> > are
> > >> at
> > >> > > their default values, so quite substantial. I tried raising them
> but
> > >> it
> > >> > had
> > >> > > no effect.
> > >> > >
> > >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <
> art...@gmail.com>
> > >> > wrote:
> > >> > >
> > >> > >> Hi
> > >> > >> I run an app where I transform KTable to stream and then I
> groupBy
> > >> and
> > >> > >> aggregate and capture the results in KTable again. That generates
> > >> many
> > >> > >> duplicates.
> > >> > >>
> > >> > >> I have played with exactly once semantics that seems to reduce
> > >> > duplicates
> > >> > >> for records that should be unique. But I still get duplicates on
> > keys
> > >> > that
> > >> > >> have two or more records.
> > >> > >>
> > >> > >> I could not reproduce it on small number of records so I disable
> > >> caching
> > >> > >> by
> > >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I
> > got
> > >> > loads
> > >> > >> of duplicates, even these previously eliminated by exactly once
> > >> > semantics.
> > >> > >> Now I have hard time to enable it again on Confluent 3.3.
> > >> > >>
> > >> > >> But, generally what it the best deduplication strategy for Kafka
> > >> > Streams?
> > >> > >>
> > >> > >> Artur
> > >> > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Kafka Reassignment - traffic is different for the same type of reassignment

2018-01-25 Thread Yi Yin
Hello everyone,


I have a question about how reassignments work.

When I issue a reassignment for similar topicpartitions, the throughput
between the reassignments is very different even though all settings are
similar. There is a huge difference on when the partitions finish their
reassignment.

Here is my reassignment mapping:
{ "partitions": [
{ "topic": "my_topic", "partition": 100, "replicas": [1426,1425] },
{ "topic": "my_topic", "partition": 101, "replicas": [1476,1287] }
], "version": 1 }

I am moving partition 100 from 1035  to 1426.
I am moving partition 101 from 1037 to 1476.

Prior to the reassignment, 1035 and 1037 have the same network, and the
above partitions are the only partitions on this hosts.

When I kick off the reassignment, 1037's network out spikes 3x more than
1035. As a result, partition 101's reassignment finishes much sooner than
100.


our server.properties is identical across the whole cluster.

I'm curious as to why the two partition reassignment has drastically
different throughput. Their source and destination brokers are identical,
at no point was there saturation. And the two partitions have the same data
size.

If anyone has insights into this behaviour, please let me know how this can
happen.

Thank you!

Yi


Re: Kafka Consumer Issue

2018-01-25 Thread chintan mavawala
You may want to check replication factor of _consumer_offsets topic. By
default, it is 1. It should be increased to 3 in your case.

Regards,
Chintan

On 25-Jan-2018 12:24 PM, "Siva A"  wrote:

> Kafka version i am using is 0.10.0.1
>
> On Thu, Jan 25, 2018 at 12:23 PM, Siva A  wrote:
>
> > Hi All,
> >
> > I have a 3 node Kafka cluster.
> > I am trying to consume data from logstash(version 5.5.2) using the new
> > consumer API.
> >
> > When Kafka2 and Kafka3 is down i am still able to consume the data
> without
> > any issues.
> >
> > But whenever the kafka1 is down the logstash consumer is just hang there.
> > Anyone faced this kind of issue in any consumer(logstash or java or
> > anything)?
> >
> > Thanks in advance.
> >
> > Thanks
> > Siva
> >
>


Re: Kafka Consumer Issue

2018-01-25 Thread Siva A
Yes it was the issue. Fixed yesterday. Thanks for your update.

On Jan 26, 2018 11:27 AM, "chintan mavawala"  wrote:

> You may want to check replication factor of _consumer_offsets topic. By
> default, it is 1. It should be increased to 3 in your case.
>
> Regards,
> Chintan
>
> On 25-Jan-2018 12:24 PM, "Siva A"  wrote:
>
> > Kafka version i am using is 0.10.0.1
> >
> > On Thu, Jan 25, 2018 at 12:23 PM, Siva A 
> wrote:
> >
> > > Hi All,
> > >
> > > I have a 3 node Kafka cluster.
> > > I am trying to consume data from logstash(version 5.5.2) using the new
> > > consumer API.
> > >
> > > When Kafka2 and Kafka3 is down i am still able to consume the data
> > without
> > > any issues.
> > >
> > > But whenever the kafka1 is down the logstash consumer is just hang
> there.
> > > Anyone faced this kind of issue in any consumer(logstash or java or
> > > anything)?
> > >
> > > Thanks in advance.
> > >
> > > Thanks
> > > Siva
> > >
> >
>