[jira] [Created] (KAFKA-15032) kafka clients continuously connecting to the broken broker and can not switch to the good one

2023-05-28 Thread Dai Ma (Jira)
Dai Ma created KAFKA-15032:
--

 Summary: kafka clients continuously connecting to the broken 
broker and can not switch to the good one
 Key: KAFKA-15032
 URL: https://issues.apache.org/jira/browse/KAFKA-15032
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.8.1
Reporter: Dai Ma


* Kafka Cluster: 192.168.1.2:9092(1001), 192.168.1.3:9092(1002)
 * Kafka Topic Config: 2 partition, 2 replicas
 * Kafka Client Config: bootstrap.servers: 192.168.1.2:9092,192.168.1.3:9092
 * Operation: stop 1001 then start 1001, stop 1002 and do not start
 ** when 1001 stop, client is normal
 ** when 1001 start and 1002 stop, Kafka Client is keep connecting to 1002

{code:java}
[2023-05-16 23:27:02,450] WARN [Consumer clientId=consumer-123-1, groupId=123] 
Connection to node 1002 (192.168.1.3:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient)
[2023-05-16 23:27:03,004] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Give up sending metadata request since no node is available 
(org.apache.kafka.clients.NetworkClient)
[2023-05-16 23:27:03,054] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Give up sending metadata request since no node is available 
(org.apache.kafka.clients.NetworkClient)
[2023-05-16 23:27:03,557] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Initialize connection to node 192.168.1.3:9092 (id: 1002 rack: null) for 
sending metadata request (org.apache.kafka.clients.NetworkClient)
[2023-05-16 23:27:03,557] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Initiating connection to node 192.168.1.3:9092 (id: 1002 rack: null) using 
address /192.168.1.3 (org.apache.kafka.clients.NetworkClient)
[2023-05-16 23:27:03,559] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Set SASL client state to SEND_APIVERSIONS_REQUEST 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2023-05-16 23:27:03,559] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Creating SaslClient: 
client=null;service=kafka;serviceHostname=192.168.1.3;mechs=[PLAIN] 
(org.apache.kafka.common.security.authenticator.SaslClientAuthentica
[2023-05-16 23:27:03,560] DEBUG [Consumer clientId=consumer-123-1, groupId=123] 
Connection with 192.168.1.3 disconnected 
(org.apache.kafka.common.network.Selector) {code}
I have made a preliminary diagnosis of the issue:
 # Kafka Clients use bootstrap server: 1001, 1002, this is only for init 
connection
 # When 1001 stop, topic partition leader is  all in 1002, Kafka Clients 
refresh metadata and remove 1001 from available node list,client will only 
connet 1002
 # When 1001 start, topic partition leader is  still all in 1002;
 # When 1002 stop topic partition leader is swithed to 1001,but kafka Clients 
don't know this information,will still connect 1002。

I tried to change metadata.max.age.ms to 1000ms, and it can avoid this problem 
to some extent, but I’m worried that if all the clients configure it this way, 
it will put some pressure on Kafka Broker.

I think that when the 1002 node cannot connect for some time, the Kafka client 
should automatically try to reconnect using bootstrap.servers. When it connects 
to the 1001 node, the client will work normally. Because there is still one 
broker alive, a two-node cluster should tolerate one broker shutdown.

 

I started a discussion on StackOverflow:[Consecutively restarting two Kafka 
Brokers., if the second node fails to start, the client continuously connecting 
to the second node - Stack 
Overflow|https://stackoverflow.com/questions/76345056/consecutively-restarting-two-kafka-brokers-if-the-second-node-fails-to-start]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2023-05-28 Thread Satish Duggana
Hi Andrew,
Thanks for the nice KIP on a very interesting feature about
introducing some of the traditional MessageQueue semantics to Kafka.
It is good to see that we are extending the existing consumer groups
concepts and related mechanisms for shared subscriptions instead of
bringing any large architectural/protocol changes.

This KIP talks about introducing a durable subscription feature for
topics with multiple consumers consuming messages parallely from a
single topic partition.

101 Are you planning to extend this functionality for queueing
semantics like JMS point to point style in future?

102 When a message is rejected by the target consumer, how do users
know what records/offsets are dropped because of the failed records
due to rejection ack or due to timeouts etc before DLQs are
introduced?

103 It talks about SPSO values, earliest being the default and user
can reset it to a target offset timestamp. What is the maximum value
for SPEO? It is good to clarify what could be the maximum value for
SPSO and SPEO. It can be HW or LogStableOffset or some other value?

104 KIP mentions that "share.delivery.count.limit" as the maximum
number of delivery attempts for a record delivered to a share group.
But the actual delivery count may be more than this number as the
leader may fail updating the delivery count as leader or consumer may
fail and more delivery attempts may be made later. It may be the
minimum number of delivery attempts instead of the maximum delivery
attempts.

Thanks,
Satish.


On Wed, 24 May 2023 at 21:26, Andrew Schofield
 wrote:
>
> Hi Stanislav,
> Thanks for your email. You bring up some interesting points.
>
> 1) Tiered storage
> I think the situation here for fetching historical data is equivalent to what 
> happens if a user resets the committed offset for a consumer
> group back to an earlier point in time. So, I will mention this in the next 
> update to the KIP document but I think there's nothing
> especially different here.
>
> 2) SSO initialized to the latest offset
> The KIP does mention that it is possible for an administrator to set the SSO 
> using either AdminClient.alterShareGroupOffsets or
> kafka-share-groups.sh. It is entirely intentional that there is no 
> KafkaConsumer config for initializing the SSO. I know that's how it
> can be done for consumer groups, but it suffers from the situation where 
> different consumers have different opinions about
> the initial value (earliest vs latest) and then the first one in wins. Also, 
> KIP-842 digs into some problems with how consumer
> group offset reset works 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
>  so
> I've tried to sidestep those problems too.
>
> Another possibility is to follow KIP-848 which proposes that 
> AdminClient.incrementalAlterConfigs is enhanced to support a new
> resource type called GROUP and supporting a dynamic group config in this 
> manner would give a single point of control.
>
> 3) Durable storage
> The KIP does not yet describe how durable storage works. I have a few ideas 
> that I want to flesh out before updating the KIP.
>
> I will rule out using a compacted topic though. The problem is that each 
> record on a compacted topic is a key:value pair, and
> it's not obvious what to use as the key. If it's the share group name, it 
> needs the entire in-flight record state to be recorded in
> one hit which is extremely inefficient.
>
> 4) Batch acknowledgement
> You are correct that compression makes delivery and acknowledgement of 
> individual messages within a compressed batch
> more complicated. Again, I will defer a proper answer here until I've dug 
> more deeply.
>
> 5) Member management
> Member management will be similar to consumer groups. I anticipate that it 
> will build entirely on the new consumer group
> protocol in KIP-848. There seems little benefit in supporting the legacy 
> protocol when this KIP is targeting versions of Kafka
> which will all have KIP-848.
>
> The two cases you mention:
> i) If a bad consumer doesn't even heartbeat, it will be ejected from the 
> group. This does not involve a rebalance.
> ii) If a bad consumer heartbeats but always times out message processing, it 
> will slow the advancement of the SSO/SEO. There
> is the possibility that such a consumer would invalidate completely valid 
> messages. In order to do this, it would have to acquire
> the same set of message repeatedly, to the exclusion of other consumers, and 
> thus bump the delivery count to the limit.
> This is unlikely but not impossible.
>
> 6) Processing semantics
> Delivery is at-least-once.
>
> 7) Acronyms
> I hadn't thought about the implications of "Kafka SEO". I think I'll change 
> it to "Share Partition Start Offset" (SPSO) and
> "Share Partition End Offset" (SPEO).
>
> There is a lot of work ahead for this KIP. I intend to work on the protocol 
> changes next.
>
> Thanks for getting involved in the discussion.
> 

Re: [DISCUSS] KIP-932: Queues for Kafka

2023-05-28 Thread Satish Duggana
Minor correction on 103, latest instead of earliest for SPSO default value.

103 It talks about SPSO values, latest being the default and user
can reset it to a target offset timestamp. What is the maximum value
for SPEO? It is good to clarify what could be the maximum value for
SPSO and SPEO. It can be HW or LogStableOffset or some other value?

Thanks,
Satish.

On Mon, 29 May 2023 at 10:06, Satish Duggana  wrote:
>
> Hi Andrew,
> Thanks for the nice KIP on a very interesting feature about
> introducing some of the traditional MessageQueue semantics to Kafka.
> It is good to see that we are extending the existing consumer groups
> concepts and related mechanisms for shared subscriptions instead of
> bringing any large architectural/protocol changes.
>
> This KIP talks about introducing a durable subscription feature for
> topics with multiple consumers consuming messages parallely from a
> single topic partition.
>
> 101 Are you planning to extend this functionality for queueing
> semantics like JMS point to point style in future?
>
> 102 When a message is rejected by the target consumer, how do users
> know what records/offsets are dropped because of the failed records
> due to rejection ack or due to timeouts etc before DLQs are
> introduced?
>
> 103 It talks about SPSO values, earliest being the default and user
> can reset it to a target offset timestamp. What is the maximum value
> for SPEO? It is good to clarify what could be the maximum value for
> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>
> 104 KIP mentions that "share.delivery.count.limit" as the maximum
> number of delivery attempts for a record delivered to a share group.
> But the actual delivery count may be more than this number as the
> leader may fail updating the delivery count as leader or consumer may
> fail and more delivery attempts may be made later. It may be the
> minimum number of delivery attempts instead of the maximum delivery
> attempts.
>
> Thanks,
> Satish.
>
>
> On Wed, 24 May 2023 at 21:26, Andrew Schofield
>  wrote:
> >
> > Hi Stanislav,
> > Thanks for your email. You bring up some interesting points.
> >
> > 1) Tiered storage
> > I think the situation here for fetching historical data is equivalent to 
> > what happens if a user resets the committed offset for a consumer
> > group back to an earlier point in time. So, I will mention this in the next 
> > update to the KIP document but I think there's nothing
> > especially different here.
> >
> > 2) SSO initialized to the latest offset
> > The KIP does mention that it is possible for an administrator to set the 
> > SSO using either AdminClient.alterShareGroupOffsets or
> > kafka-share-groups.sh. It is entirely intentional that there is no 
> > KafkaConsumer config for initializing the SSO. I know that's how it
> > can be done for consumer groups, but it suffers from the situation where 
> > different consumers have different opinions about
> > the initial value (earliest vs latest) and then the first one in wins. 
> > Also, KIP-842 digs into some problems with how consumer
> > group offset reset works 
> > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
> >  so
> > I've tried to sidestep those problems too.
> >
> > Another possibility is to follow KIP-848 which proposes that 
> > AdminClient.incrementalAlterConfigs is enhanced to support a new
> > resource type called GROUP and supporting a dynamic group config in this 
> > manner would give a single point of control.
> >
> > 3) Durable storage
> > The KIP does not yet describe how durable storage works. I have a few ideas 
> > that I want to flesh out before updating the KIP.
> >
> > I will rule out using a compacted topic though. The problem is that each 
> > record on a compacted topic is a key:value pair, and
> > it's not obvious what to use as the key. If it's the share group name, it 
> > needs the entire in-flight record state to be recorded in
> > one hit which is extremely inefficient.
> >
> > 4) Batch acknowledgement
> > You are correct that compression makes delivery and acknowledgement of 
> > individual messages within a compressed batch
> > more complicated. Again, I will defer a proper answer here until I've dug 
> > more deeply.
> >
> > 5) Member management
> > Member management will be similar to consumer groups. I anticipate that it 
> > will build entirely on the new consumer group
> > protocol in KIP-848. There seems little benefit in supporting the legacy 
> > protocol when this KIP is targeting versions of Kafka
> > which will all have KIP-848.
> >
> > The two cases you mention:
> > i) If a bad consumer doesn't even heartbeat, it will be ejected from the 
> > group. This does not involve a rebalance.
> > ii) If a bad consumer heartbeats but always times out message processing, 
> > it will slow the advancement of the SSO/SEO. There
> > is the possibility that such a consumer would invalidate completely valid 
> > me

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1876

2023-05-28 Thread Apache Jenkins Server
See