[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

F Méthot updated KAFKA-13272:
-----------------------------
    Description: 
Our KStream app offset stay stuck on 1 partition after outage possibly when 
exactly_once is enabled.

Running with KStream 2.8, kafka broker 2.8,
 3 brokers.

commands topic is 10 partitions (replication 2, min-insync 2)
 command-expiry-store-changelog topic is 10 partitions (replication 2, 
min-insync 2)
 events topic is 10 partitions (replication 2, min-insync 2)

with this topology

Topologies:

 
{code:java}
Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
 --> KSTREAM-TRANSFORM-0000000001
 Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
 --> KSTREAM-TRANSFORM-0000000002
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
 --> KSTREAM-SINK-0000000003
 <-- KSTREAM-TRANSFORM-0000000001
 Sink: KSTREAM-SINK-0000000003 (topic: events)
 <-- KSTREAM-TRANSFORM-0000000002
{code}
h3.  
h3. Attempt 1 at reproducing this issue

 

Our stream app runs with processing.guarantee *exactly_once* 

After a Kafka test outage where all 3 brokers pod were deleted at the same time,

Brokers restarted and initialized succesfuly.

When restarting the topology above, one of the tasks would never initialize 
fully, the restore phase would keep outputting this messages every few minutes:

 
{code:java}
2021-08-16 14:20:33,421 INFO stream-thread 
[commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
Restoration in progress for 1 partitions. 
{commands-processor-expiry-store-changelog-8: position=11775908, end=11775911, 
totalRestored=2002076} 
[commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)
{code}
Task for partition 8 would never initialize, no more data would be read from 
the source commands topic for that partition.

 

In an attempt to recover, we restarted the stream app with stream 
processing.guarantee back to at_least_once, than it proceed with reading the 
changelog and restoring partition 8 fully.

But we noticed afterward, for the next hour until we rebuilt the system, that 
partition 8 from command-expiry-store-changelog would not be cleaned/compacted 
by the log cleaner/compacter compared to other partitions. (could be unrelated, 
because we have seen that before)

So we resorted to delete/recreate our command-expiry-store-changelog topic and 
events topic and regenerate it from the commands, reading from beginning.

Things went back to normal
h3. Attempt 2 at reproducing this issue

kstream runs with *exactly-once*

We force-deleted all 3 pod running kafka.
 After that, one of the partition can’t be restored. (like reported in previous 
attempt)
 For that partition, we noticed these logs on the broker
{code:java}
[2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
command-expiry-store-changelog-9) while trying to send transaction markers for 
commands-processor-0_9, these partitions are likely deleted already and hence 
can be skipped 
(kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
Then
 - we stop the kstream app,

 - restarted kafka brokers cleanly

 - Restarting the Kstream app, 

Those logs messages showed up on the kstream app log:

 
{code:java}
2021-08-27 18:34:42,413 INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log 
[commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
 
{code}
This would cause our processor to not consume from that specific source 
topic-partition.
  Deleting downstream topic and replaying data would NOT fix the issue 
(EXACTLY_ONCE or AT_LEAST_ONCE)

Workaround found:

Deleted the group associated with the processor, and restarted the kstream 
application, application went on to process data normally. (We have resigned to 
use AT_LEAST_ONCE for now )

KStream config :
{code:java}
StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 2000
 StreamsConfig.REPLICATION_FACTOR_CONFIG, 2
 StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000
 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 24MB
 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), “earliest”
 StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE (now 
AT_LEAST_ONCE)
 producer.delivery.timeout.ms=120000
 consumer.session.timeout.ms=30000
 consumer.heartbeat.interval.ms=10000
 consumer.max.poll.interval.ms=300000
 num.stream.threads=1{code}
 
h3. Attempt 3

kstream processor now running with *at-least-once*

reprocess the entire source topic.
 Everything runs stable

We force-deleted all 3 pod running kafka.

Then services comes back up but there is that same error on kstream app:
{code:java}
2021-09-03 19:18:39,414  INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log 
{code}
Got it back processing by
 * Stopped the  kstream app,
 * Took notes of the offset
 *  deleted the group for kstream app
 * manually created the same group and set the offset to were they were for 
each partition.
 * restarted the kstream app and processing resumed  normally.

 
h3. Attempt 4

kstream processor now running with *at-least-once*

reprocess the entire source topic.
 Everything runs stable

We do a gracefull restart of kafka brokers.

Then services comes back up but there is that same error on kstream app:
{code:java}
2021-09-03 19:18:39,414  INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log {code}
h3. Attempt 5

kstream processor now running with *at-least-once*

*Changed the source* *commands topic to min.in-sync-replicas=1 (instead of 2)*

reprocess the entire source topic.
 Everything runs stable now

We do multiple gracefull restart of kafka brokers.

The kstream processors recovers succesfully, no more unstable offsets warning, 
no more at-least-one  processor stuck .

 Notes that our topics are using replication-factor 2, it seems that running 
with replication-factor 2, min-in-sync 2 is problematic.

We force-deleted all 3 pod running kafka.

Then services comes back up but there is that same error on kstream app:
{code:java}
2021-09-03 19:18:39,414  INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log {code}
 

 

 

 

 

  was:
Our KStream app offset stay stuck on 1 partition after outage possibly when 
exactly_once is enabled.

Running with KStream 2.8, kafka broker 2.8,
 3 brokers.

commands topic is 10 partitions (replication 2, min-insync 2)
 command-expiry-store-changelog topic is 10 partitions (replication 2, 
min-insync 2)
 events topic is 10 partitions (replication 2, min-insync 2)

with this topology

Topologies:

 
{code:java}
Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
 --> KSTREAM-TRANSFORM-0000000001
 Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
 --> KSTREAM-TRANSFORM-0000000002
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
 --> KSTREAM-SINK-0000000003
 <-- KSTREAM-TRANSFORM-0000000001
 Sink: KSTREAM-SINK-0000000003 (topic: events)
 <-- KSTREAM-TRANSFORM-0000000002
{code}
h3.  
h3. Attempt 1 at reproducing this issue

 

Our stream app runs with processing.guarantee *exactly_once* 

After a Kafka test outage where all 3 brokers pod were deleted at the same time,

Brokers restarted and initialized succesfuly.

When restarting the topology above, one of the tasks would never initialize 
fully, the restore phase would keep outputting this messages every few minutes:

 
{code:java}
2021-08-16 14:20:33,421 INFO stream-thread 
[commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
Restoration in progress for 1 partitions. 
{commands-processor-expiry-store-changelog-8: position=11775908, end=11775911, 
totalRestored=2002076} 
[commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)
{code}
Task for partition 8 would never initialize, no more data would be read from 
the source commands topic for that partition.

 

In an attempt to recover, we restarted the stream app with stream 
processing.guarantee back to at_least_once, than it proceed with reading the 
changelog and restoring partition 8 fully.

But we noticed afterward, for the next hour until we rebuilt the system, that 
partition 8 from command-expiry-store-changelog would not be cleaned/compacted 
by the log cleaner/compacter compared to other partitions. (could be unrelated, 
because we have seen that before)

So we resorted to delete/recreate our command-expiry-store-changelog topic and 
events topic and regenerate it from the commands, reading from beginning.

Things went back to normal
h3. Attempt 2 at reproducing this issue

kstream runs with *exactly-once*

We force-deleted all 3 pod running kafka.
 After that, one of the partition can’t be restored. (like reported in previous 
attempt)
 For that partition, we noticed these logs on the broker
{code:java}
[2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
command-expiry-store-changelog-9) while trying to send transaction markers for 
commands-processor-0_9, these partitions are likely deleted already and hence 
can be skipped 
(kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
Then
 - we stop the kstream app,

 - restarted kafka brokers cleanly

 - Restarting the Kstream app, 

Those logs messages showed up on the kstream app log:

 
{code:java}
2021-08-27 18:34:42,413 INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log 
[commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
 
{code}
This would cause our processor to not consume from that specific source 
topic-partition.
  Deleting downstream topic and replaying data would NOT fix the issue 
(EXACTLY_ONCE or AT_LEAST_ONCE)

Workaround found:

Deleted the group associated with the processor, and restarted the kstream 
application, application went on to process data normally. (We have resigned to 
use AT_LEAST_ONCE for now )

KStream config :
{code:java}
StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 2000
 StreamsConfig.REPLICATION_FACTOR_CONFIG, 2
 StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000
 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 24MB
 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), “earliest”
 StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE (now 
AT_LEAST_ONCE)
 producer.delivery.timeout.ms=120000
 consumer.session.timeout.ms=30000
 consumer.heartbeat.interval.ms=10000
 consumer.max.poll.interval.ms=300000
 num.stream.threads=1{code}
 
h3. Attempt 3

kstream processor now running with *at-least-once*

reprocess the entire source topic.
 Everything runs stable

We force-deleted all 3 pod running kafka.

Then services comes back up but there is that same error on kstream app:
{code:java}
2021-09-03 19:18:39,414  INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log 
{code}
Got it back processing by
 * Stopped the  kstream app,
 * Took notes of the offset
 *  deleted the group for kstream app
 * manually created the same group and set the offset to were they were for 
each partition.
 * restarted the kstream app and processing resumed  normally.

 
h3. Attempt 4

kstream processor now running with *at-least-once*

reprocess the entire source topic.
 Everything runs stable

We do a gracefull restart of kafka brokers.

Then services comes back up but there is that same error on kstream app:
{code:java}
2021-09-03 19:18:39,414  INFO [Consumer 
clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
 groupId=commands-processor] The following partitions still have unstable 
offsets which are not cleared on the broker side: [commands-9], this could be 
either transactional offsets waiting for completion, or normal offsets waiting 
for replication after appending to local log {code}
h3. Attempt 5

kstream processor now running with *at-least-once*

*Changed the source* *commands topic to min.in-sync-replicas=1 (instead of 2)*

reprocess the entire source topic.
 Everything runs stable now

We do a gracefull restart of kafka brokers.

The kstream processors recovers succesfully, no more unstable offsets warning, 
no more at-least-one  processor stuck .

 Notes that our topics are using replication-factor 2, it seems that running 
with replication-factor 2, min-in-sync 2 is problematic.

 

 

 

 

 


> KStream offset stuck after brokers outage
> -----------------------------------------
>
>                 Key: KAFKA-13272
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13272
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>         Environment: Kafka running on Kubernetes
> centos
>            Reporter: F Méthot
>            Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [commands])
>  --> KSTREAM-TRANSFORM-0000000001
>  Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
>  --> KSTREAM-TRANSFORM-0000000002
>  <-- KSTREAM-SOURCE-0000000000
>  Processor: KSTREAM-TRANSFORM-0000000002 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-0000000003
>  <-- KSTREAM-TRANSFORM-0000000001
>  Sink: KSTREAM-SINK-0000000003 (topic: events)
>  <-- KSTREAM-TRANSFORM-0000000002
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop the kstream app,
>  - restarted kafka brokers cleanly
>  - Restarting the Kstream app, 
> Those logs messages showed up on the kstream app log:
>  
> {code:java}
> 2021-08-27 18:34:42,413 INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log 
> [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>  
> {code}
> This would cause our processor to not consume from that specific source 
> topic-partition.
>   Deleting downstream topic and replaying data would NOT fix the issue 
> (EXACTLY_ONCE or AT_LEAST_ONCE)
> Workaround found:
> Deleted the group associated with the processor, and restarted the kstream 
> application, application went on to process data normally. (We have resigned 
> to use AT_LEAST_ONCE for now )
> KStream config :
> {code:java}
> StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 2000
>  StreamsConfig.REPLICATION_FACTOR_CONFIG, 2
>  StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000
>  StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 24MB
>  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), “earliest”
>  StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE (now 
> AT_LEAST_ONCE)
>  producer.delivery.timeout.ms=120000
>  consumer.session.timeout.ms=30000
>  consumer.heartbeat.interval.ms=10000
>  consumer.max.poll.interval.ms=300000
>  num.stream.threads=1{code}
>  
> h3. Attempt 3
> kstream processor now running with *at-least-once*
> reprocess the entire source topic.
>  Everything runs stable
> We force-deleted all 3 pod running kafka.
> Then services comes back up but there is that same error on kstream app:
> {code:java}
> 2021-09-03 19:18:39,414  INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log 
> {code}
> Got it back processing by
>  * Stopped the  kstream app,
>  * Took notes of the offset
>  *  deleted the group for kstream app
>  * manually created the same group and set the offset to were they were for 
> each partition.
>  * restarted the kstream app and processing resumed  normally.
>  
> h3. Attempt 4
> kstream processor now running with *at-least-once*
> reprocess the entire source topic.
>  Everything runs stable
> We do a gracefull restart of kafka brokers.
> Then services comes back up but there is that same error on kstream app:
> {code:java}
> 2021-09-03 19:18:39,414  INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log {code}
> h3. Attempt 5
> kstream processor now running with *at-least-once*
> *Changed the source* *commands topic to min.in-sync-replicas=1 (instead of 2)*
> reprocess the entire source topic.
>  Everything runs stable now
> We do multiple gracefull restart of kafka brokers.
> The kstream processors recovers succesfully, no more unstable offsets 
> warning, no more at-least-one  processor stuck .
>  Notes that our topics are using replication-factor 2, it seems that running 
> with replication-factor 2, min-in-sync 2 is problematic.
> We force-deleted all 3 pod running kafka.
> Then services comes back up but there is that same error on kstream app:
> {code:java}
> 2021-09-03 19:18:39,414  INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to