[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2020-04-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084602#comment-17084602
 ] 

Théo commented on KAFKA-7077:
-

i'm going to take a close look on it !

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9543) Consumer offset reset after new segment rolling

2020-04-16 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084605#comment-17084605
 ] 

Jason Gustafson commented on KAFKA-9543:


In KAFKA-9838, I fixed what I thought was a minor race condition with 
truncation. Since there is no truncation involved with a segment roll, I 
thought it was unrelated. After thinking a little more about the edge case it 
uncovered, it seems possible it can explain this issue as well. When we roll a 
segment, if the log end offset is equal to the high watermark, then both will 
be updated to point to the new segment. The problem is that the leader also 
keeps a cache of pointers to offsets which may refer to the older segment. It 
is possible that a leader update following the roll would leave the high 
watermark pointing to the older segment. This was not a problem prior to 2.4 
because we always looked up the position of the high watermark on every fetch.

As an example, let's say we have a single segment which begins at offset 0 and 
suppose that we have log end offset = high watermark = 5. The sequence of 
events is like this:

t1: Initial state
log end offset = (offset = 10, segment = 0)
high watermark = (offset = 5, segment = 0)

t2: Log is rolled
log end offset = (offset = 10, segment = 10)
high watermark = (offset = 5, segment = 0)

t3: Leader calls `maybeIncrementHighWatermark` to update high watermark to 10, 
but with a reference to the old segment:
log end offset = (offset = 10, segment = 10)
high watermark = (offset = 10, segment = 0)

I verified with a simple test case that the log can get into this state. Prior 
to the fix in KAFKA-9838, a fetch from the high watermark with the log in this 
state would result in a read of all the data from segment 10. So that opens the 
door to a race condition like the following:

1. Say one record is appended and we update the log end offset:
log end offset = (offset = 11, segment = 10)
high watermark = (offset = 10, segment = 0)

2. Now say that one more record is appended to segment 10 at offset 11, but log 
end offset is not immediately updated
3. A fetch at offset 10 returns the two new record at offset 10 and 11 because 
of the bug above.
4. A second fetch at offset 12 now returns out of range error because log end 
offset is still 11
5. Finally log end offset is updated to 12.

This is a devilishly tricky scenario to hit in a test case. I was able to do 
it, but only by introducing an artificial delay into the append logic. Still I 
think this is probably on the right track since it explains how it is possible 
to hit an out of range error with only a segment roll and also why versions 
older than 2.4 are not affected.

Unfortunately the patch for KAFKA-9838 did not get into 2.5.0, which was just 
released today. However, I've merged it into the 
[2.4|https://github.com/apache/kafka/commit/e1f18df7f6615109b6cc77b66d9be37b09256a0a]
 and 
[2.5|https://github.com/apache/kafka/commit/3b17fecc9b6af5f896ce2df4b8b6ce23cfd40f17]
 branches. If anyone who is running into this problem is willing to try one of 
these patches and let me know, I'd appreciate it. I will think a little bit 
more whether the patch fixes all problems that can be caused by this case when 
the high watermark points to the wrong segment (I think it does, but not 100% 
sure).

> Consumer offset reset after new segment rolling
> ---
>
> Key: KAFKA-9543
> URL: https://issues.apache.org/jira/browse/KAFKA-9543
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rafał Boniecki
>Priority: Major
> Attachments: Untitled.png, image-2020-04-06-17-10-32-636.png
>
>
> After upgrade from kafka 2.1.1 to 2.4.0, I'm experiencing unexpected consumer 
> offset resets.
> Consumer:
> {code:java}
> 2020-02-12T11:12:58.402+01:00 hostname 4a2a39a35a02 
> [2020-02-12T11:12:58,402][INFO 
> ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer 
> clientId=logstash-1, groupId=logstash] Fetch offset 1632750575 is out of 
> range for partition stats-5, resetting offset
> {code}
> Broker:
> {code:java}
> 2020-02-12 11:12:58:400 CET INFO  
> [data-plane-kafka-request-handler-1][kafka.log.Log] [Log partition=stats-5, 
> dir=/kafka4/data] Rolled new log segment at offset 1632750565 in 2 ms.{code}
> All resets are perfectly correlated to rolling new segments at the broker - 
> segment is rolled first, then, couple of ms later, reset on the consumer 
> occurs. Attached is grafana graph with consumer lag per partition. All sudden 
> spikes in lag are offset resets due to this bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2020-04-16 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned KAFKA-9320:
--

Assignee: (was: Nikolay Izhikov)

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Priority: Major
>  Labels: needs-kip
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2020-04-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084602#comment-17084602
 ] 

Théo edited comment on KAFKA-7077 at 4/16/20, 9:04 AM:
---

i'm going to take a close look on it !

I didn't find pull request so if nobody is on it i'm ready to take it


was (Author: schmidt96u):
i'm going to take a close look on it !

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface

2020-04-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-9230:
-
Fix Version/s: 2.5.0

> Change User Customizable Metrics API in StreamsMetrics interface
> 
>
> Key: KAFKA-9230
> URL: https://issues.apache.org/jira/browse/KAFKA-9230
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.5.0
>
>
> As proposed in KIP-444, the user-customizable metrics API in the 
> StreamsMetrics interface shall be improved. For more details, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9117) Add active-process-ratio Metric

2020-04-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9117.
--
Fix Version/s: (was: 2.6.0)
   Resolution: Duplicate

> Add active-process-ratio Metric 
> 
>
> Key: KAFKA-9117
> URL: https://issues.apache.org/jira/browse/KAFKA-9117
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> A metric that measures the percentage of time the hosting thread is spending 
> with an active task shall be added. This metric is described in KIP-444.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9590) Add configuration to limit number of partitions

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084728#comment-17084728
 ] 

ASF GitHub Bot commented on KAFKA-9590:
---

gokul2411s commented on pull request #8499: [WIP] KAFKA-9590: Add configuration 
to limit number of partitions
URL: https://github.com/apache/kafka/pull/8499
 
 
   This is a prototype (which works) for the Admin API path for creating topics 
and adding partitions to existing topics. The goal is to motivate the 
discussion of the KIP 578 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-578%3A+Add+configuration+to+limit+number+of+partitions),
 so that reviewers have some more concrete details on approach.
   
   For now, I have tested manually. Once the KIP is approved, I will write some 
unit and integration tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add configuration to limit number of partitions
> ---
>
> Key: KAFKA-9590
> URL: https://issues.apache.org/jira/browse/KAFKA-9590
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, core
>Reporter: Gokul Ramanan Subramanian
>Assignee: Gokul Ramanan Subramanian
>Priority: Major
>
> Tracks 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-578%3A+Add+configuration+to+limit+number+of+partitions].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2020-04-16 Thread Philip Bourke (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084756#comment-17084756
 ] 

Philip Bourke commented on KAFKA-9137:
--

What version is this fixed in? 2.5 was released today, I don't see this Jira in 
the release notes.

> Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live 
> sessions
> -
>
> Key: KAFKA-9137
> URL: https://issues.apache.org/jira/browse/KAFKA-9137
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Lucas Bradstreet
>Priority: Major
>
> We have recently seen cases where brokers end up in a bad state where fetch 
> session evictions occur at a high rate (> 16 per second) after a roll. This 
> increase in eviction rate included the following pattern in our logs:
>  
> {noformat}
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
> removed ()
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
> removed () broker 6: October 31st 2019, 17:52:45.500 Created a new 
> incremental FetchContext for session id 2046264334, epoch 9792: added (), 
> updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () 
> broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9793: added (), updated 
> (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
> lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
> lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
> removed () 
> broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
> 2046264334. 
> broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no 
> such session ID found. 
> broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
> leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
> (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
> {noformat}
> This pattern appears to be problematic for two reasons. Firstly, the replica 
> fetcher for broker 4 was clearly able to send multiple incremental fetch 
> requests to broker 6, and receive replies, and did so right up to the point 
> where broker 6 evicted its fetch session within milliseconds of multiple 
> fetch requests. The second problem is that replica fetchers are considered 
> privileged for the fetch session cache, and should not be evicted by consumer 
> fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache 
> slots (the default for max.incremental.fetch.session.cache.slots), and it 
> thus very unlikely that this session should have been evicted by another 
> replica fetcher session.
> This cluster also appears to be causing cycles of fetch session evictions 
> where the cluster never stabilizes into a state where fetch sessions are not 
> evicted. The above logs are the best example I could find of a case where a 
> session clearly should not have been evicted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9879) How kafka deletes tombstone messages?

2020-04-16 Thread VIkram (Jira)
VIkram created KAFKA-9879:
-

 Summary: How kafka deletes tombstone messages?
 Key: KAFKA-9879
 URL: https://issues.apache.org/jira/browse/KAFKA-9879
 Project: Kafka
  Issue Type: Bug
Reporter: VIkram


I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13   PartitionCount:1ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
Topic: reddyvel_13  Partition: 0Leader: 1   Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

 

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
[2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
og.LogCleaner)}}
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager)
[2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. 
(kafka.log.Log)}}

Compaction has been triggered

 

{{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to 
Thu Apr 02 07:11:59 EDT 2020)... (kafka.l
og.LogCleaner)}}

Here, above log message says {{discarding tombstones prior to Thu Apr 02 
07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing 
timestamp ({{2020-04-02 07:12:09,908}})   -   {{delete.retention.ms}} (10 
seconds) of my topic. I'm not able to figure out the link between these.

I want to understand at what time does kafka trigger deletion of tombstone 
messages. Can someone explain the tombstone deletion algorithm in simpler terms 
and the reasoning behind it?

 

It's not a bug but I need more information on this. I have posted this in other 
forums like stackoverflow but did not get any reply. The kafka official 
documentation doesn't have this information. If this is not the correct 
platform for this, kindly guide me to the relevant platform.

 

Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9879) How kafka deletes tombstone messages?

2020-04-16 Thread VIkram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

VIkram updated KAFKA-9879:
--
Description: 
I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
 Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

+*cleaner log*+

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
 [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

+*cleaner log*+ 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
 og.LogCleaner)}}

 
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 +*cleaner log*+

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager)
 [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. 
(kafka.log.Log)}}

Compaction has been triggered

+*cleaner log*+ 

{{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to 
Thu Apr 02 07:11:59 EDT 2020)... (kafka.l
 og.LogCleaner)}}

 

Here, above log message says {{discarding tombstones prior to Thu Apr 02 
07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing 
timestamp ({{2020-04-02 07:12:09,908}})   -   {{delete.retention.ms}} (10 
seconds) of my topic. I'm not able to figure out the link between these.

I want to understand at what time does kafka trigger deletion of tombstone 
messages. Can someone explain the tombstone deletion algorithm in simpler terms 
and the reasoning behind it?

 

It's not a bug but I need more information on this. I have posted this in other 
forums like stackoverflow but did not get any reply. The kafka official 
documentation doesn't have this information. If this is not the correct 
platform for this, kindly guide me to the relevant platform.

 

Thanks in advance.

  was:
I was able to delete records in kafka using tombstone messages after few 
attempts. However the algorithm (or logic) that kafka uses to delete these 
tombstone messages is still unclear to me.

>From my observations, I could figure out that there is some relation between 
>last modified time of a segment and deletion of tombstone messages. I have 
>looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit 
>complex to understand.

*Topic details*

 

{{Topic:reddyvel_13   PartitionCount:1ReplicationFactor:3 
Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1
Topic: reddyvel_13  Partition: 0Leader: 1   Replicas: 1,5,2 Isr: 1,5,2}}

I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, 
{{delete.retention.ms=1}}

*Timeline of events*
 * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}}

 

{{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] 
Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager)
[2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, 
dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. 
(kafka.log.Log)}}

Compaction has been triggered immediately on this closed segment

 

{{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 
(cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to 
Wed Dec 31 19:00:00 EST 1969)... (kafka.l
og.LogCleaner)}}
 * Sent few more messages along with few tombstones (to delete messages present 
in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}}

 

{{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddy

[jira] [Created] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)
Nicolas Carlot created KAFKA-9880:
-

 Summary: Error while range compacting during bulk loading of FIFO 
compacted RocksDB Store
 Key: KAFKA-9880
 URL: https://issues.apache.org/jira/browse/KAFKA-9880
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.1
Reporter: Nicolas Carlot


When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
exception thrown by the KStream process 
is:org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store         at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target 
level exceeds number of levels         at 
org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?]    
     at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]         ... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:

 
{code:java}
CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
fifoOptions.setAllowCompaction(true); 
options.setCompactionOptionsFIFO(fifoOptions); 
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:
{code:java}
@Override@Override @SuppressWarnings("deprecation") public void 
toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } 
catch (final RocksDBException e) { try { if 
(columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
compacting while restoring  store " + name, e); } else { log.warn("Compaction 
of store " + name + " for bulk loading failed. Will continue without compacted 
store, which will be slower.", e); } } catch (RocksDBException e1) { throw new 
ProcessorStateException("Error while range compacting during restoring  store " 
+ name, e); } } }
{code}
I'm not very proud of this workaround, but it suits my use cases well.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
--
Description: 
When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:

 
{code:java}
CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
fifoOptions.setAllowCompaction(true); 
options.setCompactionOptionsFIFO(fifoOptions); 
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:
{code:java}
// code placeholder
public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { 
try { db.compactRange(columnFamily, true, 1, 0); } catch (final 
RocksDBException e) { try { if 
(columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
compacting during restoring  store " + name, e); } else { log.warn("Compaction 
of store " + name + " for bulk loading failed. Will continue without compacted 
store, which will be slower.", e); } } catch (RocksDBException e1) { throw new 
ProcessorStateException("Error while range compacting during restoring  store " 
+ name, e); } } }
{code}
 

I'm not very proud of this workaround, but it suits my use cases well.
{code:java}
 {code}
 

 

  was:
When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
exception thrown by the KStream process 
is:org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store         at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]         at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestore

[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
--
Description: 
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:

 
{code:java}
CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
fifoOptions.setAllowCompaction(true); 
options.setCompactionOptionsFIFO(fifoOptions); 
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:

 

I'm not very proud of this workaround, but it suits my use cases well.

 
{code:java}
// code placeholder
public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { 
try { db.compactRange(columnFamily, true, 1, 0); } catch (final 
RocksDBException e) { try { if 
(columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
compacting during restoring  store " + name, e); } else { log.warn("Compaction 
of store " + name + " for bulk loading failed. Will continue without compacted 
store, which will be slower.", e); } } catch (RocksDBException e1) { throw new 
ProcessorStateException("Error while range compacting during restoring  store " 
+ name, e); } } }
{code}







 

 

 

  was:
When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)

[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
--
Description: 
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
//
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:
{code:java}
// 
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:

 
{code:java}
//
public void toggleDbForBulkLoading() {
  try {
db.compactRange(columnFamily, true, 1, 0);
  } catch (final RocksDBException e) {
try {
  if (columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) {
throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
  }
  else {
log.warn("Compaction of store " + name + " for bulk loading failed. 
Will continue without compacted store, which will be slower.", e);
  }
} catch (RocksDBException e1) {
  throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
}
  }
}
{code}
 

I'm not very proud of this workaround, but it suits my use cases well.

 

 

  was:
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[k

[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
--
Description: 
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:
{code:java}
// CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:

 
{code:java}
// code placeholder
public void toggleDbForBulkLoading() {
  try {
db.compactRange(columnFamily, true, 1, 0);
  } catch (final RocksDBException e) {
try {
  if (columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) {
throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
  }
  else {
log.warn("Compaction of store " + name + " for bulk loading failed. 
Will continue without compacted store, which will be slower.", e);
  }
} catch (RocksDBException e1) {
  throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
}
  }
}
{code}
 

I'm not very proud of this workaround, but it suits my use cases well.

 

 

  was:
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.j

[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2020-04-16 Thread Nicolas Carlot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
--
Description: 
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:
{code:java}
// 
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:

 
{code:java}
//
public void toggleDbForBulkLoading() {
  try {
db.compactRange(columnFamily, true, 1, 0);
  } catch (final RocksDBException e) {
try {
  if (columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) {
throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
  }
  else {
log.warn("Compaction of store " + name + " for bulk loading failed. 
Will continue without compacted store, which will be slower.", e);
  }
} catch (RocksDBException e1) {
  throw new ProcessorStateException("Error while range compacting 
during restoring  store " + name, e);
}
  }
}
{code}
 

I'm not very proud of this workaround, but it suits my use cases well.

 

 

  was:
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafk

[jira] [Commented] (KAFKA-9854) Re-authenticating causes mismatched parse of response

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084822#comment-17084822
 ] 

ASF GitHub Bot commented on KAFKA-9854:
---

rajinisivaram commented on pull request #8471: KAFKA-9854 Re-authenticating 
causes mismatched parse of response
URL: https://github.com/apache/kafka/pull/8471
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Re-authenticating causes mismatched parse of response
> -
>
> Key: KAFKA-9854
> URL: https://issues.apache.org/jira/browse/KAFKA-9854
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Critical
>
> the schema of LIST_OFFSETS consists of
>  # throttle_time_ms:INT32 and
>  # responses:ARRAY
>  
>  If throttle_time_ms is zero and size of responses is small enough, its 
> binary is compatible to schema of SASL_HANDSHAKE composed of
>  # error_code:INT16 and
>  # mechanisms:ARRAY(STRING)
>  
>  Hence, there is no Schema error when SASL_HANDSHAKE tries to parse response 
> of LIST_OFFSETS but the check of correction id throws IllegalStateException 
> due to mismatched error. The IllegalStateException is NOT caught and the 
> mismatched response is not sent back to Selector so the cascading error 
> happens that all following responses are parsed by incorrect Schema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9854) Re-authenticating causes mismatched parse of response

2020-04-16 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-9854.
---
Fix Version/s: 2.5.1
   2.6.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> Re-authenticating causes mismatched parse of response
> -
>
> Key: KAFKA-9854
> URL: https://issues.apache.org/jira/browse/KAFKA-9854
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Critical
> Fix For: 2.6.0, 2.5.1
>
>
> the schema of LIST_OFFSETS consists of
>  # throttle_time_ms:INT32 and
>  # responses:ARRAY
>  
>  If throttle_time_ms is zero and size of responses is small enough, its 
> binary is compatible to schema of SASL_HANDSHAKE composed of
>  # error_code:INT16 and
>  # mechanisms:ARRAY(STRING)
>  
>  Hence, there is no Schema error when SASL_HANDSHAKE tries to parse response 
> of LIST_OFFSETS but the check of correction id throws IllegalStateException 
> due to mismatched error. The IllegalStateException is NOT caught and the 
> mismatched response is not sent back to Selector so the cascading error 
> happens that all following responses are parsed by incorrect Schema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2020-04-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833
 ] 

Théo commented on KAFKA-7077:
-

I just submit a PR with those little changes and i'm waiting to any new 
proposition to bed add 

https://github.com/apache/kafka/pull/8500

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2020-04-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833
 ] 

Théo edited comment on KAFKA-7077 at 4/16/20, 12:48 PM:


I just submit a [Pull Request]([https://github.com/apache/kafka/pull/8500])  
with those little changes and i'm waiting to any new proposition to bed add

 


was (Author: schmidt96u):
I just submit a PR with those little changes and i'm waiting to any new 
proposition to bed add 

https://github.com/apache/kafka/pull/8500

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2020-04-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833
 ] 

Théo edited comment on KAFKA-7077 at 4/16/20, 12:48 PM:


I just submit a Pull Request([https://github.com/apache/kafka/pull/8500])  with 
those little changes and i'm waiting to any new proposition to bed add

 


was (Author: schmidt96u):
I just submit a [Pull Request]([https://github.com/apache/kafka/pull/8500])  
with those little changes and i'm waiting to any new proposition to bed add

 

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of a Intergration Test

2020-04-16 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-9881:


 Summary: Verify whether RocksDBMetrics Get Measurements from 
RocksDB in a Unit Test instead of a Intergration Test
 Key: KAFKA-9881
 URL: https://issues.apache.org/jira/browse/KAFKA-9881
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna


The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to 
complete. The main part of the runtime is spent in the two tests that verify 
whether the rocksDB metrics get actual measurements from RocksDB. Those tests 
need to wait for the thread that collects the measurements of the RocksDB 
metrics to trigger the first recordings of the metrics. These tests do not need 
to run as integration tests and thus they shall be converted into unit tests to 
save runtime.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test

2020-04-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-9881:
-
Summary: Verify whether RocksDBMetrics Get Measurements from RocksDB in a 
Unit Test instead of an Intergration Test  (was: Verify whether RocksDBMetrics 
Get Measurements from RocksDB in a Unit Test instead of a Intergration Test)

> Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test 
> instead of an Intergration Test
> --
>
> Key: KAFKA-9881
> URL: https://issues.apache.org/jira/browse/KAFKA-9881
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Major
>
> The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to 
> complete. The main part of the runtime is spent in the two tests that verify 
> whether the rocksDB metrics get actual measurements from RocksDB. Those tests 
> need to wait for the thread that collects the measurements of the RocksDB 
> metrics to trigger the first recordings of the metrics. These tests do not 
> need to run as integration tests and thus they shall be converted into unit 
> tests to save runtime.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084881#comment-17084881
 ] 

ASF GitHub Bot commented on KAFKA-9881:
---

cadonna commented on pull request #8501: KAFKA-9881: Convert integration test 
to verify measurements from RocksDB to unit test
URL: https://github.com/apache/kafka/pull/8501
 
 
   The integration test RocksDBMetricsIntegrationTest takes pretty long to 
complete.
   Most of the runtime is spent in the two tests that verify whether the RocksDB
   metrics get actual measurements from RocksDB. Those tests need to wait for 
the thread
   that collects the measurements of the RocksDB metrics to trigger the first 
recordings
   of the metrics.
   
   This PR adds a unit test that verifies whether the Kafka Streams metrics get 
the
   measurements from RocksDB and removes the two integration tests that 
verified it
   before. The verification of the creation and scheduling of the RocksDB 
metrics
   recording trigger thread is already contained in KafkaStreamsTest and 
consequently
   it is not part of this PR.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test 
> instead of an Intergration Test
> --
>
> Key: KAFKA-9881
> URL: https://issues.apache.org/jira/browse/KAFKA-9881
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Major
>
> The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to 
> complete. The main part of the runtime is spent in the two tests that verify 
> whether the rocksDB metrics get actual measurements from RocksDB. Those tests 
> need to wait for the thread that collects the measurements of the RocksDB 
> metrics to trigger the first recordings of the metrics. These tests do not 
> need to run as integration tests and thus they shall be converted into unit 
> tests to save runtime.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-04-16 Thread zhongyushuo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084985#comment-17084985
 ] 

zhongyushuo commented on KAFKA-7500:


[~ryannedolan] I got your point. Thanks for your reply.

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9860:
---
Summary: Transactional Producer could pre-add partitions  (was: 
Transactional Producer could add partitions by batch at the end)

> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work by the end of the 
> transaction, we save unnecessary round trips.
> The proposed solution is to bump the EndTxn request with a field of 
> partitions being added.
> We need to discuss the edge cases and correctness however, for sure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9860:
---
Description: 
As of today, the Producer transaction manager bookkeeps the partitions involved 
with current transaction. Each time it sees a new partition, it will try to 
send a request to add all the involved partitions to the broker, which results 
in multiple requests. If we could batch the work at the beginning of the 
transaction, we save unnecessary round trips.

The idea is that most times the output partitions for a Producer is constant 
overtime, so we could leverage the last transactions affected partitions to do 
a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of 
partitions actually being added. The transaction coordinator will only send 
markers to the partitions included in the EndTxn. If the first batch is not a 
superset of affected partitions as we are producing data, we would still need a 
second AddPartition call. 

  was:
As of today, the Producer transaction manager bookkeeps the partitions involved 
with current transaction. Each time it sees a new partition, it will try to 
send a request to add all the involved partitions to the broker, which results 
in multiple requests. If we could batch the work by the end of the transaction, 
we save unnecessary round trips.

The proposed solution is to bump the EndTxn request with a field of partitions 
being added.

We need to discuss the edge cases and correctness however, for sure.


> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9860:
---
Labels: need-kip  (was: )

> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9860:
---
Component/s: producer 

> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9882) Add Block getAssignments()

2020-04-16 Thread Jesse Anderson (Jira)
Jesse Anderson created KAFKA-9882:
-

 Summary: Add Block getAssignments()
 Key: KAFKA-9882
 URL: https://issues.apache.org/jira/browse/KAFKA-9882
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Affects Versions: 2.5.0
Reporter: Jesse Anderson


In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
poll(Duration). The poll(Duration) does not block for consumer assignments.

Now, there isn't a blocking method that can get consumer assignments.

A new KafkaConsumer method needs to be added that blocks while getting consumer 
assignments.

The current workaround is to poll for a short amount of time in a while loop 
and check the size of assignment(). This isn't a great method of verifying the 
consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085058#comment-17085058
 ] 

ASF GitHub Bot commented on KAFKA-9796:
---

rajinisivaram commented on pull request #8448: KAFKA-9796; Broker shutdown 
could be stuck forever under certain conditions
URL: https://github.com/apache/kafka/pull/8448
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker shutdown could be stuck forever under certain conditions
> ---
>
> Key: KAFKA-9796
> URL: https://issues.apache.org/jira/browse/KAFKA-9796
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> During the broker initialisation, the Acceptor threads are started early to 
> know the bound port and delays starting the processors to the end of the 
> initialisation sequence. We have found out that the shutdown of a broker 
> could be stuck forever under the following conditions:
>  - the shutdown procedure is started before the processors are started;
>  - the `newConnections` queues of the processors are full; and
>  - an extra new connection has been accepted but can't be queued up in a 
> processor.
> For instance, this could happen if a `NodeExistsException` is raised when the 
> broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of 
> the acceptor threads and waits until they are (first thread dump bellow). If 
> an acceptor as a pending connection which can't be queued up in a processor, 
> it ends up waiting until space is made is new queue to accept the new 
> connection (second thread dump bellow). As the processors are not started, 
> the new connection queues are not drained so it never releases the acceptor 
> thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s 
> tid=0x7f625001c800 nid=0x272 waiting on condition  [0x7f6257ca4000]
>java.lang.Thread.State: WAITING (parking)
>   at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>   - parking to wait for  <0x000689a61800> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
>   at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
>   at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>   at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer$$Lambda$604/0x000840540840.apply(Unknown 
> Source)
>   at scala.collection.Iterator.foreach(Iterator.scala:941)
>   at scala.collection.Iterator.foreach$(Iterator.scala:941)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>   at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
>   at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
>   - locked <0x000689a61ac0> (a kafka.network.SocketServer)
>   at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
>   at 
> kafka.server.KafkaServer$$Lambda$602/0x00084052b040.apply$mcV$sp(Unknown 
> Source)
>   at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
>   at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>   at kafka.Kafka$.main(Kafka.scala:82)
>   at kafka.Kafka.main(Kafka.scala)
> {noformat}
> *Acceptor waits on processor to accept the new connection*
> {noformat}
> "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 
> prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s ti

[jira] [Resolved] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions

2020-04-16 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-9796.
---
Fix Version/s: 2.6.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> Broker shutdown could be stuck forever under certain conditions
> ---
>
> Key: KAFKA-9796
> URL: https://issues.apache.org/jira/browse/KAFKA-9796
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.6.0
>
>
> During the broker initialisation, the Acceptor threads are started early to 
> know the bound port and delays starting the processors to the end of the 
> initialisation sequence. We have found out that the shutdown of a broker 
> could be stuck forever under the following conditions:
>  - the shutdown procedure is started before the processors are started;
>  - the `newConnections` queues of the processors are full; and
>  - an extra new connection has been accepted but can't be queued up in a 
> processor.
> For instance, this could happen if a `NodeExistsException` is raised when the 
> broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of 
> the acceptor threads and waits until they are (first thread dump bellow). If 
> an acceptor as a pending connection which can't be queued up in a processor, 
> it ends up waiting until space is made is new queue to accept the new 
> connection (second thread dump bellow). As the processors are not started, 
> the new connection queues are not drained so it never releases the acceptor 
> thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s 
> tid=0x7f625001c800 nid=0x272 waiting on condition  [0x7f6257ca4000]
>java.lang.Thread.State: WAITING (parking)
>   at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>   - parking to wait for  <0x000689a61800> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
>   at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
>   at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>   at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer$$Lambda$604/0x000840540840.apply(Unknown 
> Source)
>   at scala.collection.Iterator.foreach(Iterator.scala:941)
>   at scala.collection.Iterator.foreach$(Iterator.scala:941)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>   at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
>   at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
>   - locked <0x000689a61ac0> (a kafka.network.SocketServer)
>   at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
>   at 
> kafka.server.KafkaServer$$Lambda$602/0x00084052b040.apply$mcV$sp(Unknown 
> Source)
>   at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
>   at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>   at kafka.Kafka$.main(Kafka.scala:82)
>   at kafka.Kafka.main(Kafka.scala)
> {noformat}
> *Acceptor waits on processor to accept the new connection*
> {noformat}
> "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 
> prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x7f62523b5000 
> nid=0x2ca waiting on condition  [0x7f615713]
>java.lang.Thread.State: WAITING (parking)
>   at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>   - parking to wait for  <0x000689a7cad8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>   at 
> java

[jira] [Assigned] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9860:
--

Assignee: (was: Boyang Chen)

> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085121#comment-17085121
 ] 

John Roesler commented on KAFKA-9224:
-

Hi [~bchen225242] , this ticket has just come to my attention again. In the 
description, it says that some writes to the store may not be guaranteed to 
happen after the transaction commits, but it's not clear to me how that can be 
the case. Since this is the whole basis for this feature request, can you 
elaborate, please?

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the write to state store is not guaranteed to happen 
> after the ongoing transaction is finished. This means interactive query could 
> see uncommitted data within state store which is not ideal for users relying 
> on state stores for strong consistency. Ideally, we should have an option to 
> include state store commit as part of ongoing transaction, however an 
> immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085121#comment-17085121
 ] 

John Roesler edited comment on KAFKA-9224 at 4/16/20, 5:51 PM:
---

Hi [~bchen225242] , this ticket has just come to my attention again. In the 
description, it says that some writes to the store may not be guaranteed to 
happen after the transaction commits, but it's not clear to me how that can be 
the case. For example, Streams deterministically produces state updates given a 
fixed sequence of input records. If a transaction gets aborted because the 
instance loses connection to the broker or something, it's true that the local 
state store would get rolled back, but the new owner (either another instance 
or the same instance after recovery) should recompute exactly the same state.

Since this is the whole basis for this feature request, can you elaborate, 
please?


was (Author: vvcephei):
Hi [~bchen225242] , this ticket has just come to my attention again. In the 
description, it says that some writes to the store may not be guaranteed to 
happen after the transaction commits, but it's not clear to me how that can be 
the case. Since this is the whole basis for this feature request, can you 
elaborate, please?

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the write to state store is not guaranteed to happen 
> after the ongoing transaction is finished. This means interactive query could 
> see uncommitted data within state store which is not ideal for users relying 
> on state stores for strong consistency. Ideally, we should have an option to 
> include state store commit as part of ongoing transaction, however an 
> immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8856) Add Streams Config for Backward-compatible Metrics

2020-04-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8856:
---
Description: 
With KIP-444 the tag names and names of streams metrics change. To allow users 
having a grace period of changing their corresponding monitoring / alerting 
eco-systems, a config shall be added that specifies which version of the 
metrics names will be exposed.

The definition of the new config is:
 name: built.in.metrics.version
 type: Enum
 values:

{"0.10.0", "0.10.1", ... "2.3", "2.4"}

default: "2.4"

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams]

  was:
With KIP-444 the tag names and names of streams metrics change. To allow users 
having a grace period of changing their corresponding monitoring / alerting 
eco-systems, a config shall be added that specifies which version of the 
metrics names will be exposed.

The definition of the new config is:
name: built.in.metrics.version
type: Enum
values: {"0.10.0", "0.10.1", ... "2.3", "2.4"}
default: "2.4" 


> Add Streams Config for Backward-compatible Metrics
> --
>
> Key: KAFKA-8856
> URL: https://issues.apache.org/jira/browse/KAFKA-8856
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
> Fix For: 2.4.0
>
>
> With KIP-444 the tag names and names of streams metrics change. To allow 
> users having a grace period of changing their corresponding monitoring / 
> alerting eco-systems, a config shall be added that specifies which version of 
> the metrics names will be exposed.
> The definition of the new config is:
>  name: built.in.metrics.version
>  type: Enum
>  values:
> {"0.10.0", "0.10.1", ... "2.3", "2.4"}
> default: "2.4"
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8856) Add Streams Config for Backward-compatible Metrics

2020-04-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8856:
---
Labels: kip  (was: )

> Add Streams Config for Backward-compatible Metrics
> --
>
> Key: KAFKA-8856
> URL: https://issues.apache.org/jira/browse/KAFKA-8856
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
> Fix For: 2.4.0
>
>
> With KIP-444 the tag names and names of streams metrics change. To allow 
> users having a grace period of changing their corresponding monitoring / 
> alerting eco-systems, a config shall be added that specifies which version of 
> the metrics names will be exposed.
> The definition of the new config is:
> name: built.in.metrics.version
> type: Enum
> values: {"0.10.0", "0.10.1", ... "2.3", "2.4"}
> default: "2.4" 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085141#comment-17085141
 ] 

Guozhang Wang commented on KAFKA-9224:
--

Since it is considered targeting the same issue of 8870, I'd guess it is a typo 
in the description? Probably it says "some writes to the store should not be 
reflected until the transaction commits"?

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the write to state store is not guaranteed to happen 
> after the ongoing transaction is finished. This means interactive query could 
> see uncommitted data within state store which is not ideal for users relying 
> on state stores for strong consistency. Ideally, we should have an option to 
> include state store commit as part of ongoing transaction, however an 
> immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9224:
---
Description: Currently under EOS, the uncommitted write could be reflected 
in the state store before the ongoing transaction is finished. This means 
interactive query could see uncommitted data within state store which is not 
ideal for users relying on state stores for strong consistency. Ideally, we 
should have an option to include state store commit as part of ongoing 
transaction, however an immediate step towards a better reasoned system is to 
`write after transaction commit`, which means we always buffer data within 
stream cache for EOS until the ongoing transaction is committed.  (was: 
Currently under EOS, the write to state store is not guaranteed to happen after 
the ongoing transaction is finished. This means interactive query could see 
uncommitted data within state store which is not ideal for users relying on 
state stores for strong consistency. Ideally, we should have an option to 
include state store commit as part of ongoing transaction, however an immediate 
step towards a better reasoned system is to `write after transaction commit`, 
which means we always buffer data within stream cache for EOS until the ongoing 
transaction is committed.)

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085163#comment-17085163
 ] 

Boyang Chen commented on KAFKA-9224:


[~vvcephei] [~guozhang]Yea, it's a typo, and I just fixed it.

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085166#comment-17085166
 ] 

Guozhang Wang commented on KAFKA-9860:
--

I think this is a good idea. Probably we should list all the pros and cons in 
more details in the description above. For example, in this model if the txn 
coordinator timed out a txn without see EndTxn request, then it has to send a 
marker to all the recorded partitions which may be a superset of the partitions 
that actually have data sent to.

> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9878) Block EndTxn call until the txn markers are committed

2020-04-16 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085171#comment-17085171
 ] 

Guozhang Wang commented on KAFKA-9878:
--

The original rationale is that if after committing the current txn there are 
some time gap before the producer send the next record for the new txn, then 
this time gap can be "pipelined". So theoretically I see both pros and cons of 
this proposal here. In practice, it is likely that the time gap we were 
thinking to be pipeline is small. Combining with KAFKA-9860, I think maybe we 
can instead make the beginTxn call a blocking call which would be sending the 
batched addPartitions request, and which could be blocked until the previous 
transaction completes --- so that the time before the previous commitTxn call 
and the next beginTxn call can still be overlapped.



> Block EndTxn call until the txn markers are committed
> -
>
> Key: KAFKA-9878
> URL: https://issues.apache.org/jira/browse/KAFKA-9878
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> Currently the EndTxn call from Producer will immediately return as the 
> control record is written to the txn coordinator log. The ongoing transaction 
> will be going to a pending state to wait for all txn markers to be 
> propagated. In the meantime, producer client will start another new 
> transaction but being rejected constantly until the pending state gets 
> resolved, which is unnecessary round trips and more burden to the broker to 
> handle repetitive requests.
> To avoid this situation, we should make the Producer client wait for txn 
> marker completion instead. This will incur better performance overall, as no 
> more back-off shall be triggered for a subsequent transaction to begin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions

2020-04-16 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9860:
---
Description: 
As of today, the Producer transaction manager bookkeeps the partitions involved 
with current transaction. Each time it sees a new partition, it will try to 
send a request to add all the involved partitions to the broker, which results 
in multiple requests. If we could batch the work at the beginning of the 
transaction, we save unnecessary round trips.

The idea is that most times the output partitions for a Producer is constant 
overtime, so we could leverage the last transactions affected partitions to do 
a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of 
partitions actually being added. The transaction coordinator will only send 
markers to the partitions included in the EndTxn. If the first batch is not a 
superset of affected partitions as we are producing data, we would still need a 
second AddPartition call. 

The cons for this approach is unnecessary marker requests will be sent out when 
an ongoing transaction never gets the EndTxn call and has to be expired by the 
broker. The impact should be minimal the affected scope is always defined by 
the previous transaction.

  was:
As of today, the Producer transaction manager bookkeeps the partitions involved 
with current transaction. Each time it sees a new partition, it will try to 
send a request to add all the involved partitions to the broker, which results 
in multiple requests. If we could batch the work at the beginning of the 
transaction, we save unnecessary round trips.

The idea is that most times the output partitions for a Producer is constant 
overtime, so we could leverage the last transactions affected partitions to do 
a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of 
partitions actually being added. The transaction coordinator will only send 
markers to the partitions included in the EndTxn. If the first batch is not a 
superset of affected partitions as we are producing data, we would still need a 
second AddPartition call. 


> Transactional Producer could pre-add partitions
> ---
>
> Key: KAFKA-9860
> URL: https://issues.apache.org/jira/browse/KAFKA-9860
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> As of today, the Producer transaction manager bookkeeps the partitions 
> involved with current transaction. Each time it sees a new partition, it will 
> try to send a request to add all the involved partitions to the broker, which 
> results in multiple requests. If we could batch the work at the beginning of 
> the transaction, we save unnecessary round trips.
> The idea is that most times the output partitions for a Producer is constant 
> overtime, so we could leverage the last transactions affected partitions to 
> do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a 
> field of partitions actually being added. The transaction coordinator will 
> only send markers to the partitions included in the EndTxn. If the first 
> batch is not a superset of affected partitions as we are producing data, we 
> would still need a second AddPartition call. 
> The cons for this approach is unnecessary marker requests will be sent out 
> when an ongoing transaction never gets the EndTxn call and has to be expired 
> by the broker. The impact should be minimal the affected scope is always 
> defined by the previous transaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-04-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-9066:


Assignee: Chris Egerton

> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085237#comment-17085237
 ] 

ASF GitHub Bot commented on KAFKA-9066:
---

C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed 
tasks
URL: https://github.com/apache/kafka/pull/8502
 
 
   Right now, sink and source task JMX metrics are dropped as soon as the task 
fails. The changes here cause these metrics to be retained even if the task 
fails, and instead only be removed when the task has been shut down or 
abandoned during shut down by the worker.
   
   Existing unit tests are modified to account for this tweak in the worker 
logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9867) Log cleaner throws InvalidOffsetException for some partitions

2020-04-16 Thread Bradley Peterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085254#comment-17085254
 ] 

Bradley Peterson commented on KAFKA-9867:
-

After upgrade to 2.4.1, I see the same error logged slightly differently.

{noformat}
[2020-04-16 20:53:14,748] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log Log(dir=/var/lib/kafka/x-10, 
topic=x, partition=10, highWatermark=1191729811, 
lastStableOffset=1191729811, logStartOffset=0, logEndOffset=1191729811). 
Marking its partition (x-10) as uncleanable (kafka.log.LogCleaner)
kafka.log.LogCleaningException: Attempt to append an offset (279327173) to 
position 4077 no larger than the last offset appended (279327173) to 
/var/lib/kafka/x-10/000278373839.index.cleaned.
at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:349)
at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:325)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:314)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
Caused by: org.apache.kafka.common.errors.InvalidOffsetException: Attempt to 
append an offset (279327173) to position 4077 no larger than the last offset 
appended (279327173) to 
/var/lib/kafka/x-10/000278373839.index.cleaned.
{noformat}

> Log cleaner throws InvalidOffsetException for some partitions
> -
>
> Key: KAFKA-9867
> URL: https://issues.apache.org/jira/browse/KAFKA-9867
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.3.0
> Environment: AWS EC2 with data on EBS volumes.
>Reporter: Bradley Peterson
>Priority: Major
>
> About half the partitions for one topic are marked "uncleanable". This is 
> consistent across broker replicas – if a partition is uncleanable on one 
> broker, its replicas are also uncleanable.
> The log-cleaner log seems to suggest out of order offsets. We've seen corrupt 
> indexes before, so I removed the indexes from the affected segments and let 
> Kafka rebuild them, but it hit the same error.
> I don't know when the error first occurred because we've restarted the 
> brokers and rotated logs, but it is possible the broker's crashed at some 
> point.
> How can I repair these partitions?
> {noformat}
> [2020-04-09 00:14:16,979] INFO Cleaner 0: Cleaning segment 223293473 in log 
> x-9 (largest timestamp Wed Nov 13 20:35:57 UTC 2019
> ) into 223293473, retaining deletes. (kafka.log.LogCleaner)
> [2020-04-09 00:14:29,486] INFO Cleaner 0: Cleaning segment 226762315 in log 
> x-9 (largest timestamp Wed Nov 06 18:17:10 UTC 2019
> ) into 223293473, retaining deletes. (kafka.log.LogCleaner)
> [2020-04-09 00:14:29,502] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log Log(/var/lib/kafka/x
> -9). Marking its partition (x-9) as uncleanable (kafka.log.LogCleaner)
> org.apache.kafka.common.errors.InvalidOffsetException: Attempt to append an 
> offset (226765178) to position 941 no larger than the last offset appended 
> (228774155) to /var/lib/kafka/x-9/000223293473.index.cleaned.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085270#comment-17085270
 ] 

John Roesler commented on KAFKA-9224:
-

Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided 
some of the clarity I was seeking. I'm still not sure I understand what EOS 
transactions have to do with IQ results.

In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here:
{quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ 
reads V1, instance A fails and any failure/rebalancing will leave the standy 
instance B rewinding offsets and reprocessing, during which time IQ can again 
see V0 or V1 or any number of previous values for the same key.
{quote}
that two queries, both against the active replica, could see non-monotonic 
system states, as V2 was queried from a non-committed transaction, then the 
node failed, and the subsequent active node recovered up to V1 and serves 
_that_ as the most recent result. In that scenario, the new node would of 
course proceed to process the input and subsequently re-create system state V2.

So the problem isn't specifically that we saw state V2 the first time, but that 
we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can 
solve it in the EOS case by querying only state corresponding to committed 
transactions, then it's at best a band-aid.
 # It still allows IQ to serve non-monotonic system states under at-least-once 
semantics, although we could equivalently solve it by querying only after the 
changelog write is acked.
 # And it still allows IQ to serve non-monotonic system states if you query the 
active and then a standby replica

I'm not saying that we shouldn't solve the non-monotonic-query-result problem. 
I'm just asking whether querying only committed transactions is the solution. 
Thanks!

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085270#comment-17085270
 ] 

John Roesler edited comment on KAFKA-9224 at 4/16/20, 9:32 PM:
---

Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided 
some of the clarity I was seeking. I'm still not sure I understand what EOS 
transactions have to do with IQ results.

In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here:
{quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ 
reads V1, instance A fails and any failure/rebalancing will leave the standy 
instance B rewinding offsets and reprocessing, during which time IQ can again 
see V0 or V1 or any number of previous values for the same key.
{quote}
that two queries, both against the active replica, could see non-monotonic 
system states, as V2 was queried from a non-committed transaction, then the 
node failed, and the subsequent active node recovered up to V1 and serves 
_that_ as the most recent result. In that scenario, the new node would of 
course proceed to process the input and subsequently re-create system state V2.

So the problem isn't specifically that we saw state V2 the first time, but that 
we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can 
solve it in the EOS case by querying only state corresponding to committed 
transactions, then it's at best a band-aid.
 # It still allows IQ to serve non-monotonic system states under at-least-once 
semantics, although we could equivalently solve it by querying only after a 
successful commit (which involves the changelog getting acked and the incoming 
offsets getting committed).
 # And it still allows IQ to serve non-monotonic system states if you query the 
active and then a standby replica

I'm not saying that we shouldn't solve the non-monotonic-query-result problem. 
I'm just asking whether querying only committed transactions is the solution. 
Thanks!


was (Author: vvcephei):
Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided 
some of the clarity I was seeking. I'm still not sure I understand what EOS 
transactions have to do with IQ results.

In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here:
{quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ 
reads V1, instance A fails and any failure/rebalancing will leave the standy 
instance B rewinding offsets and reprocessing, during which time IQ can again 
see V0 or V1 or any number of previous values for the same key.
{quote}
that two queries, both against the active replica, could see non-monotonic 
system states, as V2 was queried from a non-committed transaction, then the 
node failed, and the subsequent active node recovered up to V1 and serves 
_that_ as the most recent result. In that scenario, the new node would of 
course proceed to process the input and subsequently re-create system state V2.

So the problem isn't specifically that we saw state V2 the first time, but that 
we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can 
solve it in the EOS case by querying only state corresponding to committed 
transactions, then it's at best a band-aid.
 # It still allows IQ to serve non-monotonic system states under at-least-once 
semantics, although we could equivalently solve it by querying only after the 
changelog write is acked.
 # And it still allows IQ to serve non-monotonic system states if you query the 
active and then a standby replica

I'm not saying that we shouldn't solve the non-monotonic-query-result problem. 
I'm just asking whether querying only committed transactions is the solution. 
Thanks!

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9753) Add task-level active-process-ratio to Streams metrics

2020-04-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9753:
---
Description: 
This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5).

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams]

  was:This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5).


> Add task-level active-process-ratio to Streams metrics
> --
>
> Key: KAFKA-9753
> URL: https://issues.apache.org/jira/browse/KAFKA-9753
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5).
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface

2020-04-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9230:
---
Labels: kip  (was: )

> Change User Customizable Metrics API in StreamsMetrics interface
> 
>
> Key: KAFKA-9230
> URL: https://issues.apache.org/jira/browse/KAFKA-9230
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
> Fix For: 2.5.0
>
>
> As proposed in KIP-444, the user-customizable metrics API in the 
> StreamsMetrics interface shall be improved. For more details, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9753) Add task-level active-process-ratio to Streams metrics

2020-04-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9753:
---
Labels: kip  (was: )

> Add task-level active-process-ratio to Streams metrics
> --
>
> Key: KAFKA-9753
> URL: https://issues.apache.org/jira/browse/KAFKA-9753
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.6.0
>
>
> This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5).
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null

2020-04-16 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-9883:


 Summary: Connect request to restart task can result in 
IllegalArgumentError: "uriTemplate" parameter is null
 Key: KAFKA-9883
 URL: https://issues.apache.org/jira/browse/KAFKA-9883
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.4.0
Reporter: Randall Hauch


When attempting to restart a connector, the following is logged by Connect:



 
{code:java}
ERROR Uncaught exception in REST call to 
/connectors/my-connector/tasks/0/restart 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)
java.lang.IllegalArgumentException: "uriTemplate" parameter is null.
at 
org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189)
at 
org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72)
at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96)
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263)
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298)
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218)
{code}
Resubmitting the restart REST request will usually resolve the problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-16 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085311#comment-17085311
 ] 

Guozhang Wang commented on KAFKA-9224:
--

My understanding for this JIRA is more related to:

Value for key K can go from V0 -> V1 -> V2 and we do not have any standbys, IQ 
reads V1 which is not committed yet, and then the task crashed, and upon 
restoring it would only resume to V0, and hence when it is queried again V0 
would be returned. The key here is that with EOS the restoration would complete 
(and IQ can be served) when V0 is restored, not requiring up to V1.

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null

2020-04-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9883:
-
Priority: Minor  (was: Major)

> Connect request to restart task can result in IllegalArgumentError: 
> "uriTemplate" parameter is null
> ---
>
> Key: KAFKA-9883
> URL: https://issues.apache.org/jira/browse/KAFKA-9883
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Randall Hauch
>Priority: Minor
>
> When attempting to restart a connector, the following is logged by Connect:
>  
> {code:java}
> ERROR Uncaught exception in REST call to 
> /connectors/my-connector/tasks/0/restart 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)
> java.lang.IllegalArgumentException: "uriTemplate" parameter is null.
> at 
> org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189)
> at 
> org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72)
> at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218)
> {code}
> Resubmitting the restart REST request will usually resolve the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology

2020-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398
 ] 

ASF GitHub Bot commented on KAFKA-9298:
---

bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error 
in joins
URL: https://github.com/apache/kafka/pull/8504
 
 
   When performing a join with a stream that needs repartitioning, Kafka 
Streams automatically creates a repartition topic.  If the user does not use 
`StreamJoined` to name to repartition topic, Kafka Streams uses the generated 
name of the KStream instance for the repartition topic name.  
   
   If the KStream instance requiring the repartition participates in another 
join, the second repartition topic is created using the name of the operator. 
This name reuse is what causes the `InvalidTopologyException.`  The error 
occurs because the `InternalTopologyBuilder` has already registered the 
repartition source name previously.
   
   For example, this topology will cause an error because Kafka Streams will 
attempt to create two repartition topics (which is correct behavior) but using 
the _**same name**_ each time which causes the error.
   ``` java
   KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
   ```
   However this topology, which is the same except the user has provided 
repartition topic names, is fine.  Note the use of `StreamJoined.withName` here
   ```java
   KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, 
k));
   final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
   newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
   newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
   ```
   This bug has been present for some time as I tested this out on `2.0` before 
we added the optimization layer.
   
   Ideally, the fix should be to generate a repartition topic name each time to 
avoid such issues.  But IMHO that ship has already sailed because by 
introducing a new name generation will cause compatibility issues for existing 
topologies.  So generating new names is out for now, at least.
   
   The proposed fix is:
   
   1.  For KStream objects needing repartitioning _**and  using generated 
names**, reuse the repartition topic node in any additional joins.
   2. For KStream instances needing repartitioning _**using user-provided 
names**_ always create a new repartition topic node for each join as each one 
will have a unique name
   
   
   
   I've added tests confirming the expected behavior.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reuse of a mapped stream causes an Invalid Topology
> ---
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final KStream stream1 = builder.stream("topic", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream2 = builder.stream("topic2", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream stream3 = builder.stream("topic3", 
> Consumed.with(Serdes.String(), Serdes.String()));
>   final KStream newStream = stream1.map((k, v) -> new 
> KeyValue<>(v, k));
>   newStream.join(stream2, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   newStream.join(stream3, (value1, value2) -> value1 + value2, 
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), 
> Serdes.String(), Serdes.String()));
>   System.err.println(builder.build().describe().toString());
> }
>  
> 

[jira] [Commented] (KAFKA-9863) update the deprecated --zookeeper option in the documentation into --bootstrap-server

2020-04-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085487#comment-17085487
 ] 

Luke Chen commented on KAFKA-9863:
--

PR: https://github.com/apache/kafka/pull/8482

> update the deprecated --zookeeper option in the documentation into 
> --bootstrap-server
> -
>
> Key: KAFKA-9863
> URL: https://issues.apache.org/jira/browse/KAFKA-9863
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, documentation
>Affects Versions: 2.4.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Since V2.2.0, the -zookeeper option turned into deprecated because Kafka can 
> directly connect to brokers with --bootstrap-server (KIP-377). But in the 
> official documentation, there are many example commands use --zookeeper 
> instead of --bootstrap-server. Follow the command in the documentation, 
> you'll get this warning, which is not good.
> {code:java}
> Warning: --zookeeper is deprecated and will be removed in a future version of 
> Kafka.
> Use --bootstrap-server instead to specify a broker to connect to.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)