[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084602#comment-17084602 ] Théo commented on KAFKA-7077: - i'm going to take a close look on it ! > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9543) Consumer offset reset after new segment rolling
[ https://issues.apache.org/jira/browse/KAFKA-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084605#comment-17084605 ] Jason Gustafson commented on KAFKA-9543: In KAFKA-9838, I fixed what I thought was a minor race condition with truncation. Since there is no truncation involved with a segment roll, I thought it was unrelated. After thinking a little more about the edge case it uncovered, it seems possible it can explain this issue as well. When we roll a segment, if the log end offset is equal to the high watermark, then both will be updated to point to the new segment. The problem is that the leader also keeps a cache of pointers to offsets which may refer to the older segment. It is possible that a leader update following the roll would leave the high watermark pointing to the older segment. This was not a problem prior to 2.4 because we always looked up the position of the high watermark on every fetch. As an example, let's say we have a single segment which begins at offset 0 and suppose that we have log end offset = high watermark = 5. The sequence of events is like this: t1: Initial state log end offset = (offset = 10, segment = 0) high watermark = (offset = 5, segment = 0) t2: Log is rolled log end offset = (offset = 10, segment = 10) high watermark = (offset = 5, segment = 0) t3: Leader calls `maybeIncrementHighWatermark` to update high watermark to 10, but with a reference to the old segment: log end offset = (offset = 10, segment = 10) high watermark = (offset = 10, segment = 0) I verified with a simple test case that the log can get into this state. Prior to the fix in KAFKA-9838, a fetch from the high watermark with the log in this state would result in a read of all the data from segment 10. So that opens the door to a race condition like the following: 1. Say one record is appended and we update the log end offset: log end offset = (offset = 11, segment = 10) high watermark = (offset = 10, segment = 0) 2. Now say that one more record is appended to segment 10 at offset 11, but log end offset is not immediately updated 3. A fetch at offset 10 returns the two new record at offset 10 and 11 because of the bug above. 4. A second fetch at offset 12 now returns out of range error because log end offset is still 11 5. Finally log end offset is updated to 12. This is a devilishly tricky scenario to hit in a test case. I was able to do it, but only by introducing an artificial delay into the append logic. Still I think this is probably on the right track since it explains how it is possible to hit an out of range error with only a segment roll and also why versions older than 2.4 are not affected. Unfortunately the patch for KAFKA-9838 did not get into 2.5.0, which was just released today. However, I've merged it into the [2.4|https://github.com/apache/kafka/commit/e1f18df7f6615109b6cc77b66d9be37b09256a0a] and [2.5|https://github.com/apache/kafka/commit/3b17fecc9b6af5f896ce2df4b8b6ce23cfd40f17] branches. If anyone who is running into this problem is willing to try one of these patches and let me know, I'd appreciate it. I will think a little bit more whether the patch fixes all problems that can be caused by this case when the high watermark points to the wrong segment (I think it does, but not 100% sure). > Consumer offset reset after new segment rolling > --- > > Key: KAFKA-9543 > URL: https://issues.apache.org/jira/browse/KAFKA-9543 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Rafał Boniecki >Priority: Major > Attachments: Untitled.png, image-2020-04-06-17-10-32-636.png > > > After upgrade from kafka 2.1.1 to 2.4.0, I'm experiencing unexpected consumer > offset resets. > Consumer: > {code:java} > 2020-02-12T11:12:58.402+01:00 hostname 4a2a39a35a02 > [2020-02-12T11:12:58,402][INFO > ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer > clientId=logstash-1, groupId=logstash] Fetch offset 1632750575 is out of > range for partition stats-5, resetting offset > {code} > Broker: > {code:java} > 2020-02-12 11:12:58:400 CET INFO > [data-plane-kafka-request-handler-1][kafka.log.Log] [Log partition=stats-5, > dir=/kafka4/data] Rolled new log segment at offset 1632750565 in 2 ms.{code} > All resets are perfectly correlated to rolling new segments at the broker - > segment is rolled first, then, couple of ms later, reset on the consumer > occurs. Attached is grafana graph with consumer lag per partition. All sudden > spikes in lag are offset resets due to this bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols
[ https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-9320: -- Assignee: (was: Nikolay Izhikov) > Enable TLSv1.3 by default and disable some of the older protocols > - > > Key: KAFKA-9320 > URL: https://issues.apache.org/jira/browse/KAFKA-9320 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Priority: Major > Labels: needs-kip > > KAFKA-7251 added support for TLSv1.3. We should include this in the list of > protocols that are enabled by default. We should also disable some of the > older protocols that are not secure. This change requires a KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084602#comment-17084602 ] Théo edited comment on KAFKA-7077 at 4/16/20, 9:04 AM: --- i'm going to take a close look on it ! I didn't find pull request so if nobody is on it i'm ready to take it was (Author: schmidt96u): i'm going to take a close look on it ! > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface
[ https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-9230: - Fix Version/s: 2.5.0 > Change User Customizable Metrics API in StreamsMetrics interface > > > Key: KAFKA-9230 > URL: https://issues.apache.org/jira/browse/KAFKA-9230 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.5.0 > > > As proposed in KIP-444, the user-customizable metrics API in the > StreamsMetrics interface shall be improved. For more details, see > https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9117) Add active-process-ratio Metric
[ https://issues.apache.org/jira/browse/KAFKA-9117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-9117. -- Fix Version/s: (was: 2.6.0) Resolution: Duplicate > Add active-process-ratio Metric > > > Key: KAFKA-9117 > URL: https://issues.apache.org/jira/browse/KAFKA-9117 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > > A metric that measures the percentage of time the hosting thread is spending > with an active task shall be added. This metric is described in KIP-444. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9590) Add configuration to limit number of partitions
[ https://issues.apache.org/jira/browse/KAFKA-9590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084728#comment-17084728 ] ASF GitHub Bot commented on KAFKA-9590: --- gokul2411s commented on pull request #8499: [WIP] KAFKA-9590: Add configuration to limit number of partitions URL: https://github.com/apache/kafka/pull/8499 This is a prototype (which works) for the Admin API path for creating topics and adding partitions to existing topics. The goal is to motivate the discussion of the KIP 578 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-578%3A+Add+configuration+to+limit+number+of+partitions), so that reviewers have some more concrete details on approach. For now, I have tested manually. Once the KIP is approved, I will write some unit and integration tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add configuration to limit number of partitions > --- > > Key: KAFKA-9590 > URL: https://issues.apache.org/jira/browse/KAFKA-9590 > Project: Kafka > Issue Type: Improvement > Components: admin, core >Reporter: Gokul Ramanan Subramanian >Assignee: Gokul Ramanan Subramanian >Priority: Major > > Tracks > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-578%3A+Add+configuration+to+limit+number+of+partitions]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions
[ https://issues.apache.org/jira/browse/KAFKA-9137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084756#comment-17084756 ] Philip Bourke commented on KAFKA-9137: -- What version is this fixed in? 2.5 was released today, I don't see this Jira in the release notes. > Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live > sessions > - > > Key: KAFKA-9137 > URL: https://issues.apache.org/jira/browse/KAFKA-9137 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Lucas Bradstreet >Priority: Major > > We have recently seen cases where brokers end up in a bad state where fetch > session evictions occur at a high rate (> 16 per second) after a roll. This > increase in eviction rate included the following pattern in our logs: > > {noformat} > broker 6: October 31st 2019, 17:52:45.496 Created a new incremental > FetchContext for session id 2046264334, epoch 9790: added (), updated (), > removed () > broker 6: October 31st 2019, 17:52:45.496 Created a new incremental > FetchContext for session id 2046264334, epoch 9791: added (), updated (), > removed () broker 6: October 31st 2019, 17:52:45.500 Created a new > incremental FetchContext for session id 2046264334, epoch 9792: added (), > updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () > broker 6: October 31st 2019, 17:52:45.501 Created a new incremental > FetchContext for session id 2046264334, epoch 9793: added (), updated > (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, > lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, > lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), > removed () > broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession > 2046264334. > broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no > such session ID found. > broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, > leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with > (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND. > {noformat} > This pattern appears to be problematic for two reasons. Firstly, the replica > fetcher for broker 4 was clearly able to send multiple incremental fetch > requests to broker 6, and receive replies, and did so right up to the point > where broker 6 evicted its fetch session within milliseconds of multiple > fetch requests. The second problem is that replica fetchers are considered > privileged for the fetch session cache, and should not be evicted by consumer > fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache > slots (the default for max.incremental.fetch.session.cache.slots), and it > thus very unlikely that this session should have been evicted by another > replica fetcher session. > This cluster also appears to be causing cycles of fetch session evictions > where the cluster never stabilizes into a state where fetch sessions are not > evicted. The above logs are the best example I could find of a case where a > session clearly should not have been evicted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9879) How kafka deletes tombstone messages?
VIkram created KAFKA-9879: - Summary: How kafka deletes tombstone messages? Key: KAFKA-9879 URL: https://issues.apache.org/jira/browse/KAFKA-9879 Project: Kafka Issue Type: Bug Reporter: VIkram I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager) [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. (kafka.log.Log)}} Compaction has been triggered {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l og.LogCleaner)}} Here, above log message says {{discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing timestamp ({{2020-04-02 07:12:09,908}}) - {{delete.retention.ms}} (10 seconds) of my topic. I'm not able to figure out the link between these. I want to understand at what time does kafka trigger deletion of tombstone messages. Can someone explain the tombstone deletion algorithm in simpler terms and the reasoning behind it? It's not a bug but I need more information on this. I have posted this in other forums like stackoverflow but did not get any reply. The kafka official documentation doesn't have this information. If this is not the correct platform for this, kindly guide me to the relevant platform. Thanks in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9879) How kafka deletes tombstone messages?
[ https://issues.apache.org/jira/browse/KAFKA-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] VIkram updated KAFKA-9879: -- Description: I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0 Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} +*cleaner log*+ {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment +*cleaner log*+ {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} +*cleaner log*+ {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 33868 (kafka.log.ProducerStateManager) [2020-04-02 07:56:50,406] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 33868 in 2 ms. (kafka.log.Log)}} Compaction has been triggered +*cleaner log*+ {{[2020-04-02 07:56:53,180] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:56:50 EDT 2020, discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020)... (kafka.l og.LogCleaner)}} Here, above log message says {{discarding tombstones prior to Thu Apr 02 07:11:59 EDT 2020}}. This timestamp is exactly equal to first segment closing timestamp ({{2020-04-02 07:12:09,908}}) - {{delete.retention.ms}} (10 seconds) of my topic. I'm not able to figure out the link between these. I want to understand at what time does kafka trigger deletion of tombstone messages. Can someone explain the tombstone deletion algorithm in simpler terms and the reasoning behind it? It's not a bug but I need more information on this. I have posted this in other forums like stackoverflow but did not get any reply. The kafka official documentation doesn't have this information. If this is not the correct platform for this, kindly guide me to the relevant platform. Thanks in advance. was: I was able to delete records in kafka using tombstone messages after few attempts. However the algorithm (or logic) that kafka uses to delete these tombstone messages is still unclear to me. >From my observations, I could figure out that there is some relation between >last modified time of a segment and deletion of tombstone messages. I have >looked into this [https://stackoverflow.com/a/48325699/6940279] but it's a bit >complex to understand. *Topic details* {{Topic:reddyvel_13 PartitionCount:1ReplicationFactor:3 Configs:cleanup.policy=compact,segment.bytes=200,delete.retention.ms=1 Topic: reddyvel_13 Partition: 0Leader: 1 Replicas: 1,5,2 Isr: 1,5,2}} I have set {{cleanup.policy=compact}}, {{segment.bytes=200}}, {{delete.retention.ms=1}} *Timeline of events* * First segment (baseOffset = 0) was closed at {{2020-04-02 07:12:09,908}} {{[2020-04-02 07:12:09,908] INFO [ProducerStateManager partition=reddyvel_13-0] Writing producer snapshot at offset 16623 (kafka.log.ProducerStateManager) [2020-04-02 07:12:09,908] INFO [Log partition=reddyvel_13-0, dir=/local/kafka/data] Rolled new log segment at offset 16623 in 1 ms. (kafka.log.Log)}} Compaction has been triggered immediately on this closed segment {{[2020-04-02 07:12:22,989] INFO Cleaner 0: Cleaning log reddyvel_13-0 (cleaning prior to Thu Apr 02 07:12:09 EDT 2020, discarding tombstones prior to Wed Dec 31 19:00:00 EST 1969)... (kafka.l og.LogCleaner)}} * Sent few more messages along with few tombstones (to delete messages present in first segment) and Second segment was closed at {{2020-04-02 07:56:50,405}} {{[2020-04-02 07:56:50,405] INFO [ProducerStateManager partition=reddy
[jira] [Created] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
Nicolas Carlot created KAFKA-9880: - Summary: Error while range compacting during bulk loading of FIFO compacted RocksDB Store Key: KAFKA-9880 URL: https://issues.apache.org/jira/browse/KAFKA-9880 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.1 Reporter: Nicolas Carlot When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} exception thrown by the KStream process is:org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} @Override@Override @SuppressWarnings("deprecation") public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting while restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Carlot updated KAFKA-9880: -- Description: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} // code placeholder public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. {code:java} {code} was: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} exception thrown by the KStream process is:org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestore
[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Carlot updated KAFKA-9880: -- Description: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: I'm not very proud of this workaround, but it suits my use cases well. {code:java} // code placeholder public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} was: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Carlot updated KAFKA-9880: -- Description: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} // org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} // CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} // public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. was: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[k
[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Carlot updated KAFKA-9880: -- Description: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} // CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} // code placeholder public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. was: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.j
[jira] [Updated] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Carlot updated KAFKA-9880: -- Description: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} // CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} // public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. was: When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafk
[jira] [Commented] (KAFKA-9854) Re-authenticating causes mismatched parse of response
[ https://issues.apache.org/jira/browse/KAFKA-9854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084822#comment-17084822 ] ASF GitHub Bot commented on KAFKA-9854: --- rajinisivaram commented on pull request #8471: KAFKA-9854 Re-authenticating causes mismatched parse of response URL: https://github.com/apache/kafka/pull/8471 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Re-authenticating causes mismatched parse of response > - > > Key: KAFKA-9854 > URL: https://issues.apache.org/jira/browse/KAFKA-9854 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Critical > > the schema of LIST_OFFSETS consists of > # throttle_time_ms:INT32 and > # responses:ARRAY > > If throttle_time_ms is zero and size of responses is small enough, its > binary is compatible to schema of SASL_HANDSHAKE composed of > # error_code:INT16 and > # mechanisms:ARRAY(STRING) > > Hence, there is no Schema error when SASL_HANDSHAKE tries to parse response > of LIST_OFFSETS but the check of correction id throws IllegalStateException > due to mismatched error. The IllegalStateException is NOT caught and the > mismatched response is not sent back to Selector so the cascading error > happens that all following responses are parsed by incorrect Schema. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9854) Re-authenticating causes mismatched parse of response
[ https://issues.apache.org/jira/browse/KAFKA-9854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-9854. --- Fix Version/s: 2.5.1 2.6.0 Reviewer: Rajini Sivaram Resolution: Fixed > Re-authenticating causes mismatched parse of response > - > > Key: KAFKA-9854 > URL: https://issues.apache.org/jira/browse/KAFKA-9854 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Critical > Fix For: 2.6.0, 2.5.1 > > > the schema of LIST_OFFSETS consists of > # throttle_time_ms:INT32 and > # responses:ARRAY > > If throttle_time_ms is zero and size of responses is small enough, its > binary is compatible to schema of SASL_HANDSHAKE composed of > # error_code:INT16 and > # mechanisms:ARRAY(STRING) > > Hence, there is no Schema error when SASL_HANDSHAKE tries to parse response > of LIST_OFFSETS but the check of correction id throws IllegalStateException > due to mismatched error. The IllegalStateException is NOT caught and the > mismatched response is not sent back to Selector so the cascading error > happens that all following responses are parsed by incorrect Schema. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833 ] Théo commented on KAFKA-7077: - I just submit a PR with those little changes and i'm waiting to any new proposition to bed add https://github.com/apache/kafka/pull/8500 > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833 ] Théo edited comment on KAFKA-7077 at 4/16/20, 12:48 PM: I just submit a [Pull Request]([https://github.com/apache/kafka/pull/8500]) with those little changes and i'm waiting to any new proposition to bed add was (Author: schmidt96u): I just submit a PR with those little changes and i'm waiting to any new proposition to bed add https://github.com/apache/kafka/pull/8500 > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084833#comment-17084833 ] Théo edited comment on KAFKA-7077 at 4/16/20, 12:48 PM: I just submit a Pull Request([https://github.com/apache/kafka/pull/8500]) with those little changes and i'm waiting to any new proposition to bed add was (Author: schmidt96u): I just submit a [Pull Request]([https://github.com/apache/kafka/pull/8500]) with those little changes and i'm waiting to any new proposition to bed add > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of a Intergration Test
Bruno Cadonna created KAFKA-9881: Summary: Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of a Intergration Test Key: KAFKA-9881 URL: https://issues.apache.org/jira/browse/KAFKA-9881 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.5.0 Reporter: Bruno Cadonna The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to complete. The main part of the runtime is spent in the two tests that verify whether the rocksDB metrics get actual measurements from RocksDB. Those tests need to wait for the thread that collects the measurements of the RocksDB metrics to trigger the first recordings of the metrics. These tests do not need to run as integration tests and thus they shall be converted into unit tests to save runtime. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test
[ https://issues.apache.org/jira/browse/KAFKA-9881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-9881: - Summary: Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test (was: Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of a Intergration Test) > Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test > instead of an Intergration Test > -- > > Key: KAFKA-9881 > URL: https://issues.apache.org/jira/browse/KAFKA-9881 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Major > > The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to > complete. The main part of the runtime is spent in the two tests that verify > whether the rocksDB metrics get actual measurements from RocksDB. Those tests > need to wait for the thread that collects the measurements of the RocksDB > metrics to trigger the first recordings of the metrics. These tests do not > need to run as integration tests and thus they shall be converted into unit > tests to save runtime. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test
[ https://issues.apache.org/jira/browse/KAFKA-9881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084881#comment-17084881 ] ASF GitHub Bot commented on KAFKA-9881: --- cadonna commented on pull request #8501: KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test URL: https://github.com/apache/kafka/pull/8501 The integration test RocksDBMetricsIntegrationTest takes pretty long to complete. Most of the runtime is spent in the two tests that verify whether the RocksDB metrics get actual measurements from RocksDB. Those tests need to wait for the thread that collects the measurements of the RocksDB metrics to trigger the first recordings of the metrics. This PR adds a unit test that verifies whether the Kafka Streams metrics get the measurements from RocksDB and removes the two integration tests that verified it before. The verification of the creation and scheduling of the RocksDB metrics recording trigger thread is already contained in KafkaStreamsTest and consequently it is not part of this PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test > instead of an Intergration Test > -- > > Key: KAFKA-9881 > URL: https://issues.apache.org/jira/browse/KAFKA-9881 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Major > > The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to > complete. The main part of the runtime is spent in the two tests that verify > whether the rocksDB metrics get actual measurements from RocksDB. Those tests > need to wait for the thread that collects the measurements of the RocksDB > metrics to trigger the first recordings of the metrics. These tests do not > need to run as integration tests and thus they shall be converted into unit > tests to save runtime. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084985#comment-17084985 ] zhongyushuo commented on KAFKA-7500: [~ryannedolan] I got your point. Thanks for your reply. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9860: --- Summary: Transactional Producer could pre-add partitions (was: Transactional Producer could add partitions by batch at the end) > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work by the end of the > transaction, we save unnecessary round trips. > The proposed solution is to bump the EndTxn request with a field of > partitions being added. > We need to discuss the edge cases and correctness however, for sure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9860: --- Description: As of today, the Producer transaction manager bookkeeps the partitions involved with current transaction. Each time it sees a new partition, it will try to send a request to add all the involved partitions to the broker, which results in multiple requests. If we could batch the work at the beginning of the transaction, we save unnecessary round trips. The idea is that most times the output partitions for a Producer is constant overtime, so we could leverage the last transactions affected partitions to do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of partitions actually being added. The transaction coordinator will only send markers to the partitions included in the EndTxn. If the first batch is not a superset of affected partitions as we are producing data, we would still need a second AddPartition call. was: As of today, the Producer transaction manager bookkeeps the partitions involved with current transaction. Each time it sees a new partition, it will try to send a request to add all the involved partitions to the broker, which results in multiple requests. If we could batch the work by the end of the transaction, we save unnecessary round trips. The proposed solution is to bump the EndTxn request with a field of partitions being added. We need to discuss the edge cases and correctness however, for sure. > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9860: --- Labels: need-kip (was: ) > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: need-kip > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9860: --- Component/s: producer > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9882) Add Block getAssignments()
Jesse Anderson created KAFKA-9882: - Summary: Add Block getAssignments() Key: KAFKA-9882 URL: https://issues.apache.org/jira/browse/KAFKA-9882 Project: Kafka Issue Type: New Feature Components: clients Affects Versions: 2.5.0 Reporter: Jesse Anderson In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a poll(Duration). The poll(Duration) does not block for consumer assignments. Now, there isn't a blocking method that can get consumer assignments. A new KafkaConsumer method needs to be added that blocks while getting consumer assignments. The current workaround is to poll for a short amount of time in a while loop and check the size of assignment(). This isn't a great method of verifying the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085058#comment-17085058 ] ASF GitHub Bot commented on KAFKA-9796: --- rajinisivaram commented on pull request #8448: KAFKA-9796; Broker shutdown could be stuck forever under certain conditions URL: https://github.com/apache/kafka/pull/8448 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker shutdown could be stuck forever under certain conditions > --- > > Key: KAFKA-9796 > URL: https://issues.apache.org/jira/browse/KAFKA-9796 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > During the broker initialisation, the Acceptor threads are started early to > know the bound port and delays starting the processors to the end of the > initialisation sequence. We have found out that the shutdown of a broker > could be stuck forever under the following conditions: > - the shutdown procedure is started before the processors are started; > - the `newConnections` queues of the processors are full; and > - an extra new connection has been accepted but can't be queued up in a > processor. > For instance, this could happen if a `NodeExistsException` is raised when the > broker tries to register itself in ZK. > When the above conditions happens, the shutting down triggers the shutdown of > the acceptor threads and waits until they are (first thread dump bellow). If > an acceptor as a pending connection which can't be queued up in a processor, > it ends up waiting until space is made is new queue to accept the new > connection (second thread dump bellow). As the processors are not started, > the new connection queues are not drained so it never releases the acceptor > thread. > *Shutdown wait on acceptor to shutdown* > {noformat} > "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s > tid=0x7f625001c800 nid=0x272 waiting on condition [0x7f6257ca4000] >java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) > - parking to wait for <0x000689a61800> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232) > at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430) > at kafka.network.Acceptor.shutdown(SocketServer.scala:521) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267) > at > kafka.network.SocketServer$$Lambda$604/0x000840540840.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213) > at > kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267) > - locked <0x000689a61ac0> (a kafka.network.SocketServer) > at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806) > at > kafka.server.KafkaServer$$Lambda$602/0x00084052b040.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806) > at kafka.server.KafkaServer.startup(KafkaServer.scala:522) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {noformat} > *Acceptor waits on processor to accept the new connection* > {noformat} > "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 > prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s ti
[jira] [Resolved] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions
[ https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-9796. --- Fix Version/s: 2.6.0 Reviewer: Rajini Sivaram Resolution: Fixed > Broker shutdown could be stuck forever under certain conditions > --- > > Key: KAFKA-9796 > URL: https://issues.apache.org/jira/browse/KAFKA-9796 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.6.0 > > > During the broker initialisation, the Acceptor threads are started early to > know the bound port and delays starting the processors to the end of the > initialisation sequence. We have found out that the shutdown of a broker > could be stuck forever under the following conditions: > - the shutdown procedure is started before the processors are started; > - the `newConnections` queues of the processors are full; and > - an extra new connection has been accepted but can't be queued up in a > processor. > For instance, this could happen if a `NodeExistsException` is raised when the > broker tries to register itself in ZK. > When the above conditions happens, the shutting down triggers the shutdown of > the acceptor threads and waits until they are (first thread dump bellow). If > an acceptor as a pending connection which can't be queued up in a processor, > it ends up waiting until space is made is new queue to accept the new > connection (second thread dump bellow). As the processors are not started, > the new connection queues are not drained so it never releases the acceptor > thread. > *Shutdown wait on acceptor to shutdown* > {noformat} > "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s > tid=0x7f625001c800 nid=0x272 waiting on condition [0x7f6257ca4000] >java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) > - parking to wait for <0x000689a61800> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232) > at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430) > at kafka.network.Acceptor.shutdown(SocketServer.scala:521) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267) > at > kafka.network.SocketServer$$Lambda$604/0x000840540840.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213) > at > kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267) > - locked <0x000689a61ac0> (a kafka.network.SocketServer) > at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806) > at > kafka.server.KafkaServer$$Lambda$602/0x00084052b040.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806) > at kafka.server.KafkaServer.startup(KafkaServer.scala:522) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {noformat} > *Acceptor waits on processor to accept the new connection* > {noformat} > "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 > prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x7f62523b5000 > nid=0x2ca waiting on condition [0x7f615713] >java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) > - parking to wait for <0x000689a7cad8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) > at > java
[jira] [Assigned] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9860: -- Assignee: (was: Boyang Chen) > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Boyang Chen >Priority: Major > Labels: need-kip > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085121#comment-17085121 ] John Roesler commented on KAFKA-9224: - Hi [~bchen225242] , this ticket has just come to my attention again. In the description, it says that some writes to the store may not be guaranteed to happen after the transaction commits, but it's not clear to me how that can be the case. Since this is the whole basis for this feature request, can you elaborate, please? > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the write to state store is not guaranteed to happen > after the ongoing transaction is finished. This means interactive query could > see uncommitted data within state store which is not ideal for users relying > on state stores for strong consistency. Ideally, we should have an option to > include state store commit as part of ongoing transaction, however an > immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085121#comment-17085121 ] John Roesler edited comment on KAFKA-9224 at 4/16/20, 5:51 PM: --- Hi [~bchen225242] , this ticket has just come to my attention again. In the description, it says that some writes to the store may not be guaranteed to happen after the transaction commits, but it's not clear to me how that can be the case. For example, Streams deterministically produces state updates given a fixed sequence of input records. If a transaction gets aborted because the instance loses connection to the broker or something, it's true that the local state store would get rolled back, but the new owner (either another instance or the same instance after recovery) should recompute exactly the same state. Since this is the whole basis for this feature request, can you elaborate, please? was (Author: vvcephei): Hi [~bchen225242] , this ticket has just come to my attention again. In the description, it says that some writes to the store may not be guaranteed to happen after the transaction commits, but it's not clear to me how that can be the case. Since this is the whole basis for this feature request, can you elaborate, please? > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the write to state store is not guaranteed to happen > after the ongoing transaction is finished. This means interactive query could > see uncommitted data within state store which is not ideal for users relying > on state stores for strong consistency. Ideally, we should have an option to > include state store commit as part of ongoing transaction, however an > immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8856) Add Streams Config for Backward-compatible Metrics
[ https://issues.apache.org/jira/browse/KAFKA-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8856: --- Description: With KIP-444 the tag names and names of streams metrics change. To allow users having a grace period of changing their corresponding monitoring / alerting eco-systems, a config shall be added that specifies which version of the metrics names will be exposed. The definition of the new config is: name: built.in.metrics.version type: Enum values: {"0.10.0", "0.10.1", ... "2.3", "2.4"} default: "2.4" [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams] was: With KIP-444 the tag names and names of streams metrics change. To allow users having a grace period of changing their corresponding monitoring / alerting eco-systems, a config shall be added that specifies which version of the metrics names will be exposed. The definition of the new config is: name: built.in.metrics.version type: Enum values: {"0.10.0", "0.10.1", ... "2.3", "2.4"} default: "2.4" > Add Streams Config for Backward-compatible Metrics > -- > > Key: KAFKA-8856 > URL: https://issues.apache.org/jira/browse/KAFKA-8856 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: kip > Fix For: 2.4.0 > > > With KIP-444 the tag names and names of streams metrics change. To allow > users having a grace period of changing their corresponding monitoring / > alerting eco-systems, a config shall be added that specifies which version of > the metrics names will be exposed. > The definition of the new config is: > name: built.in.metrics.version > type: Enum > values: > {"0.10.0", "0.10.1", ... "2.3", "2.4"} > default: "2.4" > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8856) Add Streams Config for Backward-compatible Metrics
[ https://issues.apache.org/jira/browse/KAFKA-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8856: --- Labels: kip (was: ) > Add Streams Config for Backward-compatible Metrics > -- > > Key: KAFKA-8856 > URL: https://issues.apache.org/jira/browse/KAFKA-8856 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: kip > Fix For: 2.4.0 > > > With KIP-444 the tag names and names of streams metrics change. To allow > users having a grace period of changing their corresponding monitoring / > alerting eco-systems, a config shall be added that specifies which version of > the metrics names will be exposed. > The definition of the new config is: > name: built.in.metrics.version > type: Enum > values: {"0.10.0", "0.10.1", ... "2.3", "2.4"} > default: "2.4" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085141#comment-17085141 ] Guozhang Wang commented on KAFKA-9224: -- Since it is considered targeting the same issue of 8870, I'd guess it is a typo in the description? Probably it says "some writes to the store should not be reflected until the transaction commits"? > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the write to state store is not guaranteed to happen > after the ongoing transaction is finished. This means interactive query could > see uncommitted data within state store which is not ideal for users relying > on state stores for strong consistency. Ideally, we should have an option to > include state store commit as part of ongoing transaction, however an > immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9224: --- Description: Currently under EOS, the uncommitted write could be reflected in the state store before the ongoing transaction is finished. This means interactive query could see uncommitted data within state store which is not ideal for users relying on state stores for strong consistency. Ideally, we should have an option to include state store commit as part of ongoing transaction, however an immediate step towards a better reasoned system is to `write after transaction commit`, which means we always buffer data within stream cache for EOS until the ongoing transaction is committed. (was: Currently under EOS, the write to state store is not guaranteed to happen after the ongoing transaction is finished. This means interactive query could see uncommitted data within state store which is not ideal for users relying on state stores for strong consistency. Ideally, we should have an option to include state store commit as part of ongoing transaction, however an immediate step towards a better reasoned system is to `write after transaction commit`, which means we always buffer data within stream cache for EOS until the ongoing transaction is committed.) > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the uncommitted write could be reflected in the state > store before the ongoing transaction is finished. This means interactive > query could see uncommitted data within state store which is not ideal for > users relying on state stores for strong consistency. Ideally, we should have > an option to include state store commit as part of ongoing transaction, > however an immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085163#comment-17085163 ] Boyang Chen commented on KAFKA-9224: [~vvcephei] [~guozhang]Yea, it's a typo, and I just fixed it. > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the uncommitted write could be reflected in the state > store before the ongoing transaction is finished. This means interactive > query could see uncommitted data within state store which is not ideal for > users relying on state stores for strong consistency. Ideally, we should have > an option to include state store commit as part of ongoing transaction, > however an immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085166#comment-17085166 ] Guozhang Wang commented on KAFKA-9860: -- I think this is a good idea. Probably we should list all the pros and cons in more details in the description above. For example, in this model if the txn coordinator timed out a txn without see EndTxn request, then it has to send a marker to all the recorded partitions which may be a superset of the partitions that actually have data sent to. > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Boyang Chen >Priority: Major > Labels: need-kip > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9878) Block EndTxn call until the txn markers are committed
[ https://issues.apache.org/jira/browse/KAFKA-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085171#comment-17085171 ] Guozhang Wang commented on KAFKA-9878: -- The original rationale is that if after committing the current txn there are some time gap before the producer send the next record for the new txn, then this time gap can be "pipelined". So theoretically I see both pros and cons of this proposal here. In practice, it is likely that the time gap we were thinking to be pipeline is small. Combining with KAFKA-9860, I think maybe we can instead make the beginTxn call a blocking call which would be sending the batched addPartitions request, and which could be blocked until the previous transaction completes --- so that the time before the previous commitTxn call and the next beginTxn call can still be overlapped. > Block EndTxn call until the txn markers are committed > - > > Key: KAFKA-9878 > URL: https://issues.apache.org/jira/browse/KAFKA-9878 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > Currently the EndTxn call from Producer will immediately return as the > control record is written to the txn coordinator log. The ongoing transaction > will be going to a pending state to wait for all txn markers to be > propagated. In the meantime, producer client will start another new > transaction but being rejected constantly until the pending state gets > resolved, which is unnecessary round trips and more burden to the broker to > handle repetitive requests. > To avoid this situation, we should make the Producer client wait for txn > marker completion instead. This will incur better performance overall, as no > more back-off shall be triggered for a subsequent transaction to begin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9860) Transactional Producer could pre-add partitions
[ https://issues.apache.org/jira/browse/KAFKA-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9860: --- Description: As of today, the Producer transaction manager bookkeeps the partitions involved with current transaction. Each time it sees a new partition, it will try to send a request to add all the involved partitions to the broker, which results in multiple requests. If we could batch the work at the beginning of the transaction, we save unnecessary round trips. The idea is that most times the output partitions for a Producer is constant overtime, so we could leverage the last transactions affected partitions to do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of partitions actually being added. The transaction coordinator will only send markers to the partitions included in the EndTxn. If the first batch is not a superset of affected partitions as we are producing data, we would still need a second AddPartition call. The cons for this approach is unnecessary marker requests will be sent out when an ongoing transaction never gets the EndTxn call and has to be expired by the broker. The impact should be minimal the affected scope is always defined by the previous transaction. was: As of today, the Producer transaction manager bookkeeps the partitions involved with current transaction. Each time it sees a new partition, it will try to send a request to add all the involved partitions to the broker, which results in multiple requests. If we could batch the work at the beginning of the transaction, we save unnecessary round trips. The idea is that most times the output partitions for a Producer is constant overtime, so we could leverage the last transactions affected partitions to do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a field of partitions actually being added. The transaction coordinator will only send markers to the partitions included in the EndTxn. If the first batch is not a superset of affected partitions as we are producing data, we would still need a second AddPartition call. > Transactional Producer could pre-add partitions > --- > > Key: KAFKA-9860 > URL: https://issues.apache.org/jira/browse/KAFKA-9860 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Boyang Chen >Priority: Major > Labels: need-kip > > As of today, the Producer transaction manager bookkeeps the partitions > involved with current transaction. Each time it sees a new partition, it will > try to send a request to add all the involved partitions to the broker, which > results in multiple requests. If we could batch the work at the beginning of > the transaction, we save unnecessary round trips. > The idea is that most times the output partitions for a Producer is constant > overtime, so we could leverage the last transactions affected partitions to > do a batch `AddPartitionToTxn` first, and bump the EndTxn request with a > field of partitions actually being added. The transaction coordinator will > only send markers to the partitions included in the EndTxn. If the first > batch is not a superset of affected partitions as we are producing data, we > would still need a second AddPartition call. > The cons for this approach is unnecessary marker requests will be sent out > when an ongoing transaction never gets the EndTxn call and has to be expired > by the broker. The impact should be minimal the affected scope is always > defined by the previous transaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state
[ https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-9066: Assignee: Chris Egerton > Kafka Connect JMX : source & sink task metrics missing for tasks in failed > state > > > Key: KAFKA-9066 > URL: https://issues.apache.org/jira/browse/KAFKA-9066 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Mikołaj Stefaniak >Assignee: Chris Egerton >Priority: Major > > h2. Overview > Kafka Connect exposes various metrics via JMX. Those metrics can be exported > i.e. by _Prometheus JMX Exporter_ for further processing. > One of crucial attributes is connector's *task status.* > According to official Kafka docs, status is available as +status+ attribute > of following MBean: > {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status > - The status of the connector task. One of 'unassigned', 'running', > 'paused', 'failed', or 'destroyed'. > {quote} > h2. Issue > Generally +connector-task-metrics+ are exposed propery for tasks in +running+ > status but not exposed at all if task is +failed+. > Failed Task *appears* properly with failed status when queried via *REST API*: > > {code:java} > $ curl -X GET -u 'user:pass' > http://kafka-connect.mydomain.com/connectors/customerconnector/status > {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException: > Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"} > $ {code} > > Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when > queried via *JMX*: > > {code:java} > $ echo "beans -d kafka.connect" | java -jar > target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep > connector=customerconnector > kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics > $ > {code} > h2. Expected result > It is expected, that bean with +connector-task-metrics+ type will appear also > for tasks that failed. > Below is example of how beans are properly registered for tasks in Running > state: > > {code:java} > $ echo "get -b > kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics > status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l > localhost:8081 -n -v silent > status = running; > $ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state
[ https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085237#comment-17085237 ] ASF GitHub Bot commented on KAFKA-9066: --- C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks URL: https://github.com/apache/kafka/pull/8502 Right now, sink and source task JMX metrics are dropped as soon as the task fails. The changes here cause these metrics to be retained even if the task fails, and instead only be removed when the task has been shut down or abandoned during shut down by the worker. Existing unit tests are modified to account for this tweak in the worker logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect JMX : source & sink task metrics missing for tasks in failed > state > > > Key: KAFKA-9066 > URL: https://issues.apache.org/jira/browse/KAFKA-9066 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Mikołaj Stefaniak >Assignee: Chris Egerton >Priority: Major > > h2. Overview > Kafka Connect exposes various metrics via JMX. Those metrics can be exported > i.e. by _Prometheus JMX Exporter_ for further processing. > One of crucial attributes is connector's *task status.* > According to official Kafka docs, status is available as +status+ attribute > of following MBean: > {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status > - The status of the connector task. One of 'unassigned', 'running', > 'paused', 'failed', or 'destroyed'. > {quote} > h2. Issue > Generally +connector-task-metrics+ are exposed propery for tasks in +running+ > status but not exposed at all if task is +failed+. > Failed Task *appears* properly with failed status when queried via *REST API*: > > {code:java} > $ curl -X GET -u 'user:pass' > http://kafka-connect.mydomain.com/connectors/customerconnector/status > {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException: > Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"} > $ {code} > > Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when > queried via *JMX*: > > {code:java} > $ echo "beans -d kafka.connect" | java -jar > target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep > connector=customerconnector > kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics > $ > {code} > h2. Expected result > It is expected, that bean with +connector-task-metrics+ type will appear also > for tasks that failed. > Below is example of how beans are properly registered for tasks in Running > state: > > {code:java} > $ echo "get -b > kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics > status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l > localhost:8081 -n -v silent > status = running; > $ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9867) Log cleaner throws InvalidOffsetException for some partitions
[ https://issues.apache.org/jira/browse/KAFKA-9867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085254#comment-17085254 ] Bradley Peterson commented on KAFKA-9867: - After upgrade to 2.4.1, I see the same error logged slightly differently. {noformat} [2020-04-16 20:53:14,748] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/x-10, topic=x, partition=10, highWatermark=1191729811, lastStableOffset=1191729811, logStartOffset=0, logEndOffset=1191729811). Marking its partition (x-10) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: Attempt to append an offset (279327173) to position 4077 no larger than the last offset appended (279327173) to /var/lib/kafka/x-10/000278373839.index.cleaned. at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:349) at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:325) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:314) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) Caused by: org.apache.kafka.common.errors.InvalidOffsetException: Attempt to append an offset (279327173) to position 4077 no larger than the last offset appended (279327173) to /var/lib/kafka/x-10/000278373839.index.cleaned. {noformat} > Log cleaner throws InvalidOffsetException for some partitions > - > > Key: KAFKA-9867 > URL: https://issues.apache.org/jira/browse/KAFKA-9867 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.3.0 > Environment: AWS EC2 with data on EBS volumes. >Reporter: Bradley Peterson >Priority: Major > > About half the partitions for one topic are marked "uncleanable". This is > consistent across broker replicas – if a partition is uncleanable on one > broker, its replicas are also uncleanable. > The log-cleaner log seems to suggest out of order offsets. We've seen corrupt > indexes before, so I removed the indexes from the affected segments and let > Kafka rebuild them, but it hit the same error. > I don't know when the error first occurred because we've restarted the > brokers and rotated logs, but it is possible the broker's crashed at some > point. > How can I repair these partitions? > {noformat} > [2020-04-09 00:14:16,979] INFO Cleaner 0: Cleaning segment 223293473 in log > x-9 (largest timestamp Wed Nov 13 20:35:57 UTC 2019 > ) into 223293473, retaining deletes. (kafka.log.LogCleaner) > [2020-04-09 00:14:29,486] INFO Cleaner 0: Cleaning segment 226762315 in log > x-9 (largest timestamp Wed Nov 06 18:17:10 UTC 2019 > ) into 223293473, retaining deletes. (kafka.log.LogCleaner) > [2020-04-09 00:14:29,502] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log Log(/var/lib/kafka/x > -9). Marking its partition (x-9) as uncleanable (kafka.log.LogCleaner) > org.apache.kafka.common.errors.InvalidOffsetException: Attempt to append an > offset (226765178) to position 941 no larger than the last offset appended > (228774155) to /var/lib/kafka/x-9/000223293473.index.cleaned. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085270#comment-17085270 ] John Roesler commented on KAFKA-9224: - Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided some of the clarity I was seeking. I'm still not sure I understand what EOS transactions have to do with IQ results. In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here: {quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, instance A fails and any failure/rebalancing will leave the standy instance B rewinding offsets and reprocessing, during which time IQ can again see V0 or V1 or any number of previous values for the same key. {quote} that two queries, both against the active replica, could see non-monotonic system states, as V2 was queried from a non-committed transaction, then the node failed, and the subsequent active node recovered up to V1 and serves _that_ as the most recent result. In that scenario, the new node would of course proceed to process the input and subsequently re-create system state V2. So the problem isn't specifically that we saw state V2 the first time, but that we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can solve it in the EOS case by querying only state corresponding to committed transactions, then it's at best a band-aid. # It still allows IQ to serve non-monotonic system states under at-least-once semantics, although we could equivalently solve it by querying only after the changelog write is acked. # And it still allows IQ to serve non-monotonic system states if you query the active and then a standby replica I'm not saying that we shouldn't solve the non-monotonic-query-result problem. I'm just asking whether querying only committed transactions is the solution. Thanks! > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the uncommitted write could be reflected in the state > store before the ongoing transaction is finished. This means interactive > query could see uncommitted data within state store which is not ideal for > users relying on state stores for strong consistency. Ideally, we should have > an option to include state store commit as part of ongoing transaction, > however an immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085270#comment-17085270 ] John Roesler edited comment on KAFKA-9224 at 4/16/20, 9:32 PM: --- Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided some of the clarity I was seeking. I'm still not sure I understand what EOS transactions have to do with IQ results. In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here: {quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, instance A fails and any failure/rebalancing will leave the standy instance B rewinding offsets and reprocessing, during which time IQ can again see V0 or V1 or any number of previous values for the same key. {quote} that two queries, both against the active replica, could see non-monotonic system states, as V2 was queried from a non-committed transaction, then the node failed, and the subsequent active node recovered up to V1 and serves _that_ as the most recent result. In that scenario, the new node would of course proceed to process the input and subsequently re-create system state V2. So the problem isn't specifically that we saw state V2 the first time, but that we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can solve it in the EOS case by querying only state corresponding to committed transactions, then it's at best a band-aid. # It still allows IQ to serve non-monotonic system states under at-least-once semantics, although we could equivalently solve it by querying only after a successful commit (which involves the changelog getting acked and the incoming offsets getting committed). # And it still allows IQ to serve non-monotonic system states if you query the active and then a standby replica I'm not saying that we shouldn't solve the non-monotonic-query-result problem. I'm just asking whether querying only committed transactions is the solution. Thanks! was (Author: vvcephei): Thanks, all, I've re-read this issue, and also read KAFKA-8870, which provided some of the clarity I was seeking. I'm still not sure I understand what EOS transactions have to do with IQ results. In KAFKA-8870, [~vinoth] gave the specific scenario, which I'll copy/paste here: {quote}Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, instance A fails and any failure/rebalancing will leave the standy instance B rewinding offsets and reprocessing, during which time IQ can again see V0 or V1 or any number of previous values for the same key. {quote} that two queries, both against the active replica, could see non-monotonic system states, as V2 was queried from a non-committed transaction, then the node failed, and the subsequent active node recovered up to V1 and serves _that_ as the most recent result. In that scenario, the new node would of course proceed to process the input and subsequently re-create system state V2. So the problem isn't specifically that we saw state V2 the first time, but that we saw V2 followed by V1. This problem isn't specific to EOS, so even if we can solve it in the EOS case by querying only state corresponding to committed transactions, then it's at best a band-aid. # It still allows IQ to serve non-monotonic system states under at-least-once semantics, although we could equivalently solve it by querying only after the changelog write is acked. # And it still allows IQ to serve non-monotonic system states if you query the active and then a standby replica I'm not saying that we shouldn't solve the non-monotonic-query-result problem. I'm just asking whether querying only committed transactions is the solution. Thanks! > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the uncommitted write could be reflected in the state > store before the ongoing transaction is finished. This means interactive > query could see uncommitted data within state store which is not ideal for > users relying on state stores for strong consistency. Ideally, we should have > an option to include state store commit as part of ongoing transaction, > however an immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9753) Add task-level active-process-ratio to Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-9753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9753: --- Description: This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5). [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams] was:This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5). > Add task-level active-process-ratio to Streams metrics > -- > > Key: KAFKA-9753 > URL: https://issues.apache.org/jira/browse/KAFKA-9753 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5). > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface
[ https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9230: --- Labels: kip (was: ) > Change User Customizable Metrics API in StreamsMetrics interface > > > Key: KAFKA-9230 > URL: https://issues.apache.org/jira/browse/KAFKA-9230 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: kip > Fix For: 2.5.0 > > > As proposed in KIP-444, the user-customizable metrics API in the > StreamsMetrics interface shall be improved. For more details, see > https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9753) Add task-level active-process-ratio to Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-9753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9753: --- Labels: kip (was: ) > Add task-level active-process-ratio to Streams metrics > -- > > Key: KAFKA-9753 > URL: https://issues.apache.org/jira/browse/KAFKA-9753 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: kip > Fix For: 2.6.0 > > > This is described as part of KIP-444 (which is mostly done in 2.4 / 2.5). > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null
Randall Hauch created KAFKA-9883: Summary: Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null Key: KAFKA-9883 URL: https://issues.apache.org/jira/browse/KAFKA-9883 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.4.0 Reporter: Randall Hauch When attempting to restart a connector, the following is logged by Connect: {code:java} ERROR Uncaught exception in REST call to /connectors/my-connector/tasks/0/restart (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) java.lang.IllegalArgumentException: "uriTemplate" parameter is null. at org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189) at org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72) at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96) at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263) at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298) at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218) {code} Resubmitting the restart REST request will usually resolve the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result
[ https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085311#comment-17085311 ] Guozhang Wang commented on KAFKA-9224: -- My understanding for this JIRA is more related to: Value for key K can go from V0 -> V1 -> V2 and we do not have any standbys, IQ reads V1 which is not committed yet, and then the task crashed, and upon restoring it would only resume to V0, and hence when it is queried again V0 would be returned. The key here is that with EOS the restoration would complete (and IQ can be served) when V0 is restored, not requiring up to V1. > State store should not see uncommitted transaction result > - > > Key: KAFKA-9224 > URL: https://issues.apache.org/jira/browse/KAFKA-9224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Currently under EOS, the uncommitted write could be reflected in the state > store before the ongoing transaction is finished. This means interactive > query could see uncommitted data within state store which is not ideal for > users relying on state stores for strong consistency. Ideally, we should have > an option to include state store commit as part of ongoing transaction, > however an immediate step towards a better reasoned system is to `write after > transaction commit`, which means we always buffer data within stream cache > for EOS until the ongoing transaction is committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null
[ https://issues.apache.org/jira/browse/KAFKA-9883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9883: - Priority: Minor (was: Major) > Connect request to restart task can result in IllegalArgumentError: > "uriTemplate" parameter is null > --- > > Key: KAFKA-9883 > URL: https://issues.apache.org/jira/browse/KAFKA-9883 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Randall Hauch >Priority: Minor > > When attempting to restart a connector, the following is logged by Connect: > > {code:java} > ERROR Uncaught exception in REST call to > /connectors/my-connector/tasks/0/restart > (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) > java.lang.IllegalArgumentException: "uriTemplate" parameter is null. > at > org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189) > at > org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72) > at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218) > {code} > Resubmitting the restart REST request will usually resolve the problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9298) Reuse of a mapped stream causes an Invalid Topology
[ https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398 ] ASF GitHub Bot commented on KAFKA-9298: --- bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins URL: https://github.com/apache/kafka/pull/8504 When performing a join with a stream that needs repartitioning, Kafka Streams automatically creates a repartition topic. If the user does not use `StreamJoined` to name to repartition topic, Kafka Streams uses the generated name of the KStream instance for the repartition topic name. If the KStream instance requiring the repartition participates in another join, the second repartition topic is created using the name of the operator. This name reuse is what causes the `InvalidTopologyException.` The error occurs because the `InternalTopologyBuilder` has already registered the repartition source name previously. For example, this topology will cause an error because Kafka Streams will attempt to create two repartition topics (which is correct behavior) but using the _**same name**_ each time which causes the error. ``` java KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); ``` However this topology, which is the same except the user has provided repartition topic names, is fine. Note the use of `StreamJoined.withName` here ```java KStream newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); final StreamJoined streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); ``` This bug has been present for some time as I tested this out on `2.0` before we added the optimization layer. Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. So generating new names is out for now, at least. The proposed fix is: 1. For KStream objects needing repartitioning _**and using generated names**, reuse the repartition topic node in any additional joins. 2. For KStream instances needing repartitioning _**using user-provided names**_ always create a new repartition topic node for each join as each one will have a unique name I've added tests confirming the expected behavior. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reuse of a mapped stream causes an Invalid Topology > --- > > Key: KAFKA-9298 > URL: https://issues.apache.org/jira/browse/KAFKA-9298 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Bill Bejeck >Priority: Minor > Labels: join, streams > > Can be found with in the KStreamKStreamJoinTest.java > {code:java} > @Test > public void optimizerIsEager() { > final StreamsBuilder builder = new StreamsBuilder(); > final KStream stream1 = builder.stream("topic", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream2 = builder.stream("topic2", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream stream3 = builder.stream("topic3", > Consumed.with(Serdes.String(), Serdes.String())); > final KStream newStream = stream1.map((k, v) -> new > KeyValue<>(v, k)); > newStream.join(stream2, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > newStream.join(stream3, (value1, value2) -> value1 + value2, > JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(), > Serdes.String(), Serdes.String())); > System.err.println(builder.build().describe().toString()); > } > >
[jira] [Commented] (KAFKA-9863) update the deprecated --zookeeper option in the documentation into --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-9863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085487#comment-17085487 ] Luke Chen commented on KAFKA-9863: -- PR: https://github.com/apache/kafka/pull/8482 > update the deprecated --zookeeper option in the documentation into > --bootstrap-server > - > > Key: KAFKA-9863 > URL: https://issues.apache.org/jira/browse/KAFKA-9863 > Project: Kafka > Issue Type: Bug > Components: docs, documentation >Affects Versions: 2.4.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Since V2.2.0, the -zookeeper option turned into deprecated because Kafka can > directly connect to brokers with --bootstrap-server (KIP-377). But in the > official documentation, there are many example commands use --zookeeper > instead of --bootstrap-server. Follow the command in the documentation, > you'll get this warning, which is not good. > {code:java} > Warning: --zookeeper is deprecated and will be removed in a future version of > Kafka. > Use --bootstrap-server instead to specify a broker to connect to.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)