[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414771#comment-17414771 ] Matthias J. Sax commented on KAFKA-13292: - Fair enough. In the end, {{InvalidPidMappingException}} is not a fatal exception according to [https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling] that proposed to improve error handling. However as it's a public API change it won't be contained in any bug-fix release. It's also very unlikely that there will be a 2.7.2 bug-fix release, do in the end, upgrading might be your only option after KIP-691 is done. Thus, I still would propose to close this ticket at "fixed by" https://issues.apache.org/jira/browse/KAFKA-10733 > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transac
[jira] [Commented] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414777#comment-17414777 ] Matthias J. Sax commented on KAFKA-13267: - According to [KIP-671|https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler], {{InvalidPidMappingException}} should actually be considered non-fatal, and we might want to try to handle it within Kafka Streams. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13267 > URL: https://issues.apache.org/jira/browse/KAFKA-13267 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Gilles Philippart >Priority: Major > > We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these > errors pop up in apps using EOS: > {code:java} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > {code} > Full stack trace: > {code:java} > Error encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > RecordCollectorImpl.java 196 lambda$send$0(...) > RecordCollectorImpl.java:196:in `lambda$send$0' > KafkaProducer.java 1365 onCompletion(...) > KafkaProducer.java:1365:in `onCompletion' > ProducerBatch.java 231 completeFutureAndFireCallbacks(...) > ProducerBatch.java:231:in `completeFutureAndFireCallbacks' > ProducerBatch.java 159 abort(...) > ProducerBatch.java:159:in `abort' > RecordAccumulator.java 763 abortBatches(...) > RecordAccumulator.java:763:in `abortBatches' > More (5 lines) > Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error > encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > {code} > I've seen that KAFKA-6821 described the same problem on an earlier version of > Kafka and was closed due to the subsequent works on EOS. > Another ticket raised recently shows that the exception is still occurring > (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc
cadonna commented on pull request #10678: URL: https://github.com/apache/kafka/pull/10678#issuecomment-918910442 @ijuma Are you fine with me merging this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13297) smt to rename schemas based on regex
Martin Sillence created KAFKA-13297: --- Summary: smt to rename schemas based on regex Key: KAFKA-13297 URL: https://issues.apache.org/jira/browse/KAFKA-13297 Project: Kafka Issue Type: New Feature Reporter: Martin Sillence Needed as debezium connector creates schemas with the name of the host and database schema When we move from dev to production these names change Using Avro the the name in the schema is used to pick the class and thus will fail when we move between environments smt to rename schemas based on regex example configuration: {{"transforms.regexSchema.regex": ".*\\.([^.]*)\\.(Value|Key)", "transforms.regexSchema.replacement": "com.company.schema.$1.$2",}} https://github.com/apache/kafka/pull/11322 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #11325: MINOR: Update version in documentation to 2.8.1
dajac opened a new pull request #11325: URL: https://github.com/apache/kafka/pull/11325 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414854#comment-17414854 ] Chris Borckholder commented on KAFKA-13191: --- We are seeing similar issues after upgrading to version 2.8.0 (similarly the cluster ran reliably for > 1 year in the current setup with former 2.x versions). We are hosting kafka in EC2 (6 broker, 3 zookeeper). In a test environment, we are performing ec2 terminations of broker/zookeeper instances to test resilience. These terminations now lead to an inconsistent cluster state as described. We found that restarting the kafka controller (forcing a controller change) fixes the situation. >From our PoV this is a critical bug, as a single node failure can take down >the cluster. Let me know if there is specific information (logs or similar) >that can help in investigating/debugging this. > Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken > cluster > - > > Key: KAFKA-13191 > URL: https://issues.apache.org/jira/browse/KAFKA-13191 > Project: Kafka > Issue Type: Bug > Components: protocol >Affects Versions: 2.8.0 >Reporter: CS User >Priority: Major > > We're using confluent platform 6.2, running in a Kubernetes environment. The > cluster has been running for a couple of years with zero issues, starting > from version 1.1, 2.5 and now 2.8. > We've very recently upgraded to kafka 2.8 from kafka 2.5. > Since upgrading, we have seen issues when kafka and zookeeper pods restart > concurrently. > We can replicate the issue by restarting either the zookeeper statefulset > first or the kafka statefulset first, either way appears to result with the > same failure scenario. > We've attempted to mitigate by preventing the kafka pods from stopping if any > zookeeper pods are being restarted, or a rolling restart of the zookeeper > cluster is underway. > We've also added a check to stop the kafka pods from starting until all > zookeeper pods are ready, however under the following scenario we still see > the issue: > In a 3 node kafka cluster with 5 zookeeper servers > # kafka-2 starts to terminate - all zookeeper pods are running, so it > proceeds > # zookeeper-4 terminates > # kafka-2 starts-up, and waits until the zookeeper rollout completes > # kafka-2 eventually fully starts, kafka comes up and we see the errors > below on other pods in the cluster. > Without mitigation and in the above scenario we see errors on pods kafka-0 > (repeatedly spamming the log) : > {noformat} > [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch > (670014914375), retrying. (kafka.server.DefaultAlterIsrManager){noformat} > Kafka-1 seems ok > When kafka-2 starts, it has this log entry with regards to its own broker > epoch: > {noformat} > [2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 > with addresses: > INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094, > czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat} > This is despite kafka-2 appearing to start fine, this is what you see in > kafka-2's logs, nothing else seems to be added to the log, it just seems to > hang here: > {noformat} > [2021-08-11 11:44:48,911] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=2] Started socket server acceptors and processors > (kafka.network.SocketServer) > [2021-08-11 11:44:48,913] INFO Kafka version: 6.2.0-ccs > (org.apache.kafka.common.utils.AppInfoParser) > [2021-08-11 11:44:48,913] INFO Kafka commitId: 1a5755cf9401c84f > (org.apache.kafka.common.utils.AppInfoParser) > [2021-08-11 11:44:48,913] INFO Kafka startTimeMs: 1628682288911 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-08-11 11:44:48,914] INFO [KafkaServer id=2] started > (kafka.server.KafkaServer) {noformat} > This never appears to recover. > If you then restart kafka-2, you'll see these errors: > {noformat} > org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication > factor: 3 larger than available brokers: 0. {noformat} > This seems to completely break the cluster, partitions do not failover as > expected. > > Checking zookeeper, and getting the values of the brokers look fine > {noformat} > get /brokers/ids/0{noformat} > etc, all looks fine there, each broker is present. > > This error message appears to have been added to kafka in the last 11 months > {noformat} > Broker had a stale broker epoch {noformat} > Via this PR: > [https://github.com/apache/kafka/pull/9100] > I see also this comment around the leader getting stuck: > [https://github.com/apache/kafka/pull/9100/files#r494480847] > > Recovery is possible by continuing to restart the rem
[GitHub] [kafka] dongjinleekr commented on pull request #11324: KAFKA-13294: Upgrade Netty to 4.1.68 for CVE fixes
dongjinleekr commented on pull request #11324: URL: https://github.com/apache/kafka/pull/11324#issuecomment-919050807 @kkonstantine Should it be merged into 3.0.0? or only for 2.8.1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-7360) Code example in "Accessing Processor Context" misses a closing parenthesis
[ https://issues.apache.org/jira/browse/KAFKA-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413701#comment-17413701 ] Vijay edited comment on KAFKA-7360 at 9/14/21, 11:35 AM: - [~seknop] Please review [https://github.com/apache/kafka/pull/10873] was (Author: vijaykriishna): Please review [https://github.com/apache/kafka/pull/10873] > Code example in "Accessing Processor Context" misses a closing parenthesis > -- > > Key: KAFKA-7360 > URL: https://issues.apache.org/jira/browse/KAFKA-7360 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 2.0.0 >Reporter: Sven Erik Knop >Assignee: Vijay >Priority: Minor > > https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html#accessing-processor-context > Code example has some issues: > public void process(String key, String value) { > > // add a header to the elements > context().headers().add.("key", "key" > } > Should be > public void process(String key, String value) { > > // add a header to the elements > context().headers().add("key", "value") > } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13298) Improve documentation on EOS KStream requirements
F Méthot created KAFKA-13298: Summary: Improve documentation on EOS KStream requirements Key: KAFKA-13298 URL: https://issues.apache.org/jira/browse/KAFKA-13298 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.8.0 Reporter: F Méthot After posting question on a kafka forum, the following was revealed by kafka developer: {quote}Is there minimum replication factor required to enable exactly_once for topic involved in transactions? {quote} "Well, technically you can configure the system to use EOS with any replication factor. However, using a lower replication factor than 3 effectively voids EOS. Thus, it’s strongly recommended to use a replication factor of 3." This should be clearly documented in the stream doc: [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html] Which refers to this broker link [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html#streams-developer-guide-processing-guarantedd] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #11325: MINOR: Update version in documentation to 2.8.1
dajac merged pull request #11325: URL: https://github.com/apache/kafka/pull/11325 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on pull request #11227: URL: https://github.com/apache/kafka/pull/11227#issuecomment-919127815 @patrickstuedi , the PR(#11292) is merged into trunk. I've rebased this PR, so it is good to review now. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
ccding commented on pull request #11080: URL: https://github.com/apache/kafka/pull/11080#issuecomment-919272706 The tests Jason kicked in failed two tests: ``` Build / JDK 8 and Scala 2.12 / testDescribeTopicsWithIds() – kafka.api.PlaintextAdminIntegrationTest Build / JDK 11 and Scala 2.13 / shouldQueryStoresAfterAddingAndRemovingStreamThread – org.apache.kafka.streams.integration.StoreQueryIntegrationTest ``` both worked on my local run with merging trunk to this branch. Pushing the trunk merge to this branch and let Jenkins to run it again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11310: KAFKA-13279: Implement CreateTopicsPolicy for KRaft
dengziming commented on pull request #11310: URL: https://github.com/apache/kafka/pull/11310#issuecomment-919288093 Should we also change the integration test `CreateTopicsRequestWithPolicyTest` to support KRaft mode? the change LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13272: Component/s: (was: streams) core > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop the kstream app, > - restarted kafka brokers cleanly > - Restarting the Kstream app, > Those logs messages showed up on the kstream app log: > > {code:java} > 2021-08-27 18:34:42,413 INFO [Consumer > clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer, > groupId=commands-processor] The following partitions still have unstable > offsets which are not cleared on the broker side: [commands-9], this could be > either transactional offsets waiting for completion, or normal offsets > waiting for replication after appending to local log > [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > > {code} > This would cause our processor to not consume from that specific source > topic-partition. > Deleting downstream topic and replaying data would NOT fix the issue > (EXACTLY_ONCE or AT_LEAST_ONCE) > Workaround found: > Deleted the group associated with the processor, and re
[jira] [Comment Edited] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414777#comment-17414777 ] Matthias J. Sax edited comment on KAFKA-13267 at 9/14/21, 4:35 PM: --- According to [KIP-691|https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling], {{InvalidPidMappingException}} should actually be considered non-fatal, and we might want to try to handle it within Kafka Streams. was (Author: mjsax): According to [KIP-671|https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler], {{InvalidPidMappingException}} should actually be considered non-fatal, and we might want to try to handle it within Kafka Streams. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13267 > URL: https://issues.apache.org/jira/browse/KAFKA-13267 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Gilles Philippart >Priority: Major > > We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these > errors pop up in apps using EOS: > {code:java} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > {code} > Full stack trace: > {code:java} > Error encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > RecordCollectorImpl.java 196 lambda$send$0(...) > RecordCollectorImpl.java:196:in `lambda$send$0' > KafkaProducer.java 1365 onCompletion(...) > KafkaProducer.java:1365:in `onCompletion' > ProducerBatch.java 231 completeFutureAndFireCallbacks(...) > ProducerBatch.java:231:in `completeFutureAndFireCallbacks' > ProducerBatch.java 159 abort(...) > ProducerBatch.java:159:in `abort' > RecordAccumulator.java 763 abortBatches(...) > RecordAccumulator.java:763:in `abortBatches' > More (5 lines) > Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error > encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > {code} > I've seen that KAFKA-6821 described the same problem on an earlier version of > Kafka and was closed due to the subsequent works on EOS. > Another ticket raised recently shows that the exception is still occurring > (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13289: Priority: Minor (was: Major) > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e > and had a request from [~cadonna] for a reproduction case, so I'm hoping my > example above might make the issue easier to tackle! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13299) Accept listeners that have the same port but use IPv4 vs IPv6
Matthew de Detrich created KAFKA-13299: -- Summary: Accept listeners that have the same port but use IPv4 vs IPv6 Key: KAFKA-13299 URL: https://issues.apache.org/jira/browse/KAFKA-13299 Project: Kafka Issue Type: Improvement Reporter: Matthew de Detrich Currently we are going through a process where we want to migrate Kafka brokers from IPv4 to IPv6. The simplest way for us to do this would be to allow Kafka to have 2 listeners of the same port however one listener has an IPv4 address allocated and another listener has an IPv6 address allocated. Currently this is not possible in Kafka because it validates that all of the listeners have a unique port. With some rudimentary testing if this validation is removed (so we are able to have 2 listeners of the same port but with different IP versions) there doesn't seem to be any immediate problems, the kafka clusters works without any problems. Is there some fundamental reason behind this limitation of having unique ports? Consequently would there be any problems in loosening this limitation (i.e. duplicate ports are allowed if the IP versions are different) or just altogether removing the restriction -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
hachikuji merged pull request #11080: URL: https://github.com/apache/kafka/pull/11080 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13149) Null Pointer Exception for record==null when handling a produce request
[ https://issues.apache.org/jira/browse/KAFKA-13149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13149. - Fix Version/s: 3.0.1 Resolution: Fixed > Null Pointer Exception for record==null when handling a produce request > --- > > Key: KAFKA-13149 > URL: https://issues.apache.org/jira/browse/KAFKA-13149 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Cong Ding >Priority: Major > Fix For: 3.0.1 > > > In production, we have seen an exception > {code:java} > java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.common.record.Record.hasMagic(byte)" because "record" is > null{code} > which is triggered by > > [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/core/src/main/scala/kafka/log/LogValidator.scala#L191] > when handling a produce request. > The reason is that > [https://github.com/apache/kafka/blob/bfc57aa4ddcd719fc4a646c2ac09d4979c076455/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L294-L296] > returns record==null, which is possibly caused by a bad client. However, we > have no idea about the client in our multi-tenant environment. > We should let the broker throw an invalid record exception and notify clients. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Xia reassigned KAFKA-13261: Assignee: Victoria Xia > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Xia reassigned KAFKA-13268: Assignee: Victoria Xia > Add more integration tests for Table Table FK joins with repartitioning > --- > > Key: KAFKA-13268 > URL: https://issues.apache.org/jira/browse/KAFKA-13268 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Victoria Xia >Priority: Major > > We should add to the FK join multipartition integration test with a > Repartitioned for: > 1) just the new partition count > 2) a custom partitioner > This is to test if there's a bug where the internal topics don't pick up a > partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415051#comment-17415051 ] Jason Gustafson commented on KAFKA-7408: [~jagsancio] The basic idea was to truncate to the LSO and then rewrite markers for any transactions which were in progress. For example, suppose we have this in the log: x1 x2 y1 y2 z1 z2 xC yA So the transaction beginning at z1 is where the LSO would be. The idea is then to truncate to that point and rewrite the markers that came after: x1 x2 y1 y2 xC yA But I realize now that this approach was probably unnecessarily aggressive. I wasn't taking into account transactions which started and ended after the LSO. For example: x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC Now we have another x transaction which begins after the LSO. We'd lose this if we have to truncate. So I'm not thinking your 2) suggestion is probably a better approach. Rather than truncating, we can abort. This is simpler to implement since it does not involve any rewriting of markers. The example above becomes this: x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC zA I had previously thought that it was a stronger guarantee if we could say that no transaction outcomes were changed. But I realize now that there is probably not a practical difference between dropping the data from a transaction and just aborting it. > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
venkatesh010 commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-919344102 We are facing this issue with Confluent - 5.5.1 (apache kafka - 2.5.x) Which is superior to 2.2.1 Actually issue is not easily reproducable and difficult to provide sample Please help in it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r708533935 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, delta: MetadataDelta, newImage: MetadataImage): Unit = { try { + debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") Review comment: I decided to make this trace level since it could be a bit noisy in some cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
guozhangwang commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-919419275 > hey @guozhangwang , that makes perfect sense to me. Based upon the discussion me and @patrickstuedi had on this thread, I think to reap the real benefits, the optimisations listed here and on the JIRA makes sense. > We can hold off on this PR till then. Thanks! Great, thanks @vamossagar12 ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415109#comment-17415109 ] Guozhang Wang commented on KAFKA-13268: --- I just did that in a recent PR: https://github.com/apache/kafka/pull/11252/files#diff-1060876874aba52f23e63e730ca6d26aac41e9be5f43fde4e5dcfaad5f76f927R65 I do not have a concrete framework in mind for how to extend that for new operators / configuration permutations though, do you have something in mind? How to use it to cover newly added operators? > Add more integration tests for Table Table FK joins with repartitioning > --- > > Key: KAFKA-13268 > URL: https://issues.apache.org/jira/browse/KAFKA-13268 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Victoria Xia >Priority: Major > > We should add to the FK join multipartition integration test with a > Repartitioned for: > 1) just the new partition count > 2) a custom partitioner > This is to test if there's a bug where the internal topics don't pick up a > partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415181#comment-17415181 ] NEERAJ VAIDYA commented on KAFKA-13292: --- Thanks [~mjsax] So, to summarize : Will KIP-671 and upgrading to 2.8.0 be sufficient to avoid this _InvalidPidMappingException_ ? Or do I need to wait for KIP-691 and 3.x to be available ? Do I need to just upgrade the kafka-clients, kafka-streams, etc. library dependencies to use the 2.8 version or do I need to update the Brokers as well from 2.7.0 to 2.8.0 ? > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1]
[jira] [Commented] (KAFKA-9861) Process Simplification - Community Validation of Kafka Release Candidates
[ https://issues.apache.org/jira/browse/KAFKA-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415182#comment-17415182 ] Israel Ekpo commented on KAFKA-9861: To encourage other community members to participate in the release candidate validations and voting, I have set up the following resource as part of the work for KAFKA-9861 https://github.com/izzyacademy/apache-kafka-release-party Please take a look at the resource and share any feedback that you may have. > Process Simplification - Community Validation of Kafka Release Candidates > - > > Key: KAFKA-9861 > URL: https://issues.apache.org/jira/browse/KAFKA-9861 > Project: Kafka > Issue Type: Improvement > Components: build, documentation, system tests > Environment: Linux, Java 8/11, Scala 2.x >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Minor > > When new KAFKA release candidates are published and there is a solicitation > for the community to get involved in testing and verifying the release > candidates, it would be great to have the test process thoroughly documented > for newcomers to participate effectively. > For new contributors, this can be very daunting and it would be great to have > this process clearly documented in a way that lowers the level of effort > necessary to get started. > The goal of this task is to create the documentation and supporting artifacts > that would make this goal a reality. > Going forward for future releases, it would be great to have the link to this > documentation included in the RC announcements so that the community > (especially end users) can help test and participate in the voting process > effectively. > These are the items that I believe should be included in this documentation > * How to set up test environment for unit and functional tests > * Java version(s) needed for the tests > * Scala version(s) needed for the tests > * Gradle version needed > * Sample script for running sanity checks and unit tests > * Sample Helm Charts for running all the basic components on a Kubernetes > * Sample Ansible Script for running all the basic components on Virtual > Machines > The first 4 items will be part of the documentation that shows how to install > these dependencies in a Linux VM. The 5th item is a script that will download > PGP keys, check signatures, validate checksums and run unit/integration > tests. The 6th item is a Helm chart with basic components necessary to > validate critical components in the ecosystem (Zookeeper, Brokers, Streams > etc) within a Kubernetes cluster. The last item is similar to the 6th item > but installs these components on virtual machines instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9861) Process Simplification - Community Validation of Kafka Release Candidates
[ https://issues.apache.org/jira/browse/KAFKA-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415183#comment-17415183 ] Israel Ekpo commented on KAFKA-9861: I would like it to be merged into the main kafka code base. Please share suggestions on which folder (possible a new one) we can place this resource into. Thanks. > Process Simplification - Community Validation of Kafka Release Candidates > - > > Key: KAFKA-9861 > URL: https://issues.apache.org/jira/browse/KAFKA-9861 > Project: Kafka > Issue Type: Improvement > Components: build, documentation, system tests > Environment: Linux, Java 8/11, Scala 2.x >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Minor > > When new KAFKA release candidates are published and there is a solicitation > for the community to get involved in testing and verifying the release > candidates, it would be great to have the test process thoroughly documented > for newcomers to participate effectively. > For new contributors, this can be very daunting and it would be great to have > this process clearly documented in a way that lowers the level of effort > necessary to get started. > The goal of this task is to create the documentation and supporting artifacts > that would make this goal a reality. > Going forward for future releases, it would be great to have the link to this > documentation included in the RC announcements so that the community > (especially end users) can help test and participate in the voting process > effectively. > These are the items that I believe should be included in this documentation > * How to set up test environment for unit and functional tests > * Java version(s) needed for the tests > * Scala version(s) needed for the tests > * Gradle version needed > * Sample script for running sanity checks and unit tests > * Sample Helm Charts for running all the basic components on a Kubernetes > * Sample Ansible Script for running all the basic components on Virtual > Machines > The first 4 items will be part of the documentation that shows how to install > these dependencies in a Linux VM. The 5th item is a script that will download > PGP keys, check signatures, validate checksums and run unit/integration > tests. The 6th item is a Helm chart with basic components necessary to > validate critical components in the ecosystem (Zookeeper, Brokers, Streams > etc) within a Kubernetes cluster. The last item is similar to the 6th item > but installs these components on virtual machines instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied
Adriano Jesus created KAFKA-13300: - Summary: Kafka ACL Restriction Group Is not being applied Key: KAFKA-13300 URL: https://issues.apache.org/jira/browse/KAFKA-13300 Project: Kafka Issue Type: Bug Affects Versions: 2.6.2 Reporter: Adriano Jesus Hi, I am creating a KAFKA ACL with a fake group restriction as above: {code:java} ./kafka-acls.sh \ --authorizer-properties zookeeper.connect=$ZOOKEEPER \ --remove --allow-principal User:'Kafka-tools' \ --consumer --group fake-group \ --topic delete-me-2 {code} When I try to consume a message with the same user, 'Kafka-tools', and with another group I am still able to consume the messages: {code:java} // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 --consumer.config user-auth.properties --from-beginning --group teste {code} According to documentation this property can be used as consumer group ([https://docs.confluent.io/platform/current/kafka/authorization.html):] "*Group* Groups in the brokers. All protocol calls that work with groups, such as joining a group, must have corresponding privileges with the group in the subject. Group ({{group.id}}) can mean Consumer Group, Stream Group ({{application.id}}), Connect Worker Group, or any other group that uses the Consumer Group protocol, like Schema Registry cluster." I did another test adding a consumer act permission with this command: {code:java} ./kafka-acls.sh \ --authorizer-properties zookeeper.connect=$ZOOKEEPER \ --add --allow-principal User:'Kafka-tools' \ --consumer --group fake-group \ --topic delete-me-2 {code} After that I removed the ACL authorization to READ operation for Group resource. I tried again to consume from this topic. And still being able to consume message from this topic even though without READ group permission. Maybe my interpretation is wrong. But it seens that Kafka ACL is validating the group permissions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415195#comment-17415195 ] Guozhang Wang commented on KAFKA-13295: --- As we talked about the potential fix in the near term, one possibility is that `onPartitionAssigned` when new tasks are created, we always blindly do a commitOffsetOrTransaction as well so that if there's any ongoing transactions, it would be committed before we potentially go on to restore the newly assigned tasks. > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: eos > Fix For: 3.1.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415198#comment-17415198 ] Guozhang Wang commented on KAFKA-7408: -- Hey [~hachikuji], a quick question: If we just abort the dangling txn while the producer is still trying to commit it, could we notify the producer still? More concretely, back to your example, if we have x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC originally, and then upon unclean leader election the partition host write a `zA` at the end, hence becoming x1 x2 y1 y2 z1 z2 xC yA x1 x2 xC and later the txn coordinator tries to write a `zC` on behalf of the producer, do we guaranteed this `zC` would be rejected? If we accept it (and ignore that effectively) we would end up where producer think its transaction is committed while it is actually aborted. Maybe that's the best we can do, as I'm not sure which case is worse for users: 1) having a committed txn to actually be aborted, 2) having a committed txn to actually lose some data, i.e. only partially committed. > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-13249: - Assignee: Oliver Hutchison > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Assignee: Oliver Hutchison >Priority: Major > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13249. --- Fix Version/s: 3.1.0 Resolution: Fixed > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Assignee: Oliver Hutchison >Priority: Major > Fix For: 3.1.0 > > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-919580172 I resolved some conflicts and cherry-picked to 2.8; there are too many conflicts in 2.7 though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13249: -- Fix Version/s: 2.8.2 > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Assignee: Oliver Hutchison >Priority: Major > Fix For: 3.1.0, 2.8.2 > > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415244#comment-17415244 ] Guozhang Wang commented on KAFKA-13267: --- I agree, and as you mentioned in the other PR it seems more related to https://issues.apache.org/jira/browse/KAFKA-10733 (KIP-691) > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13267 > URL: https://issues.apache.org/jira/browse/KAFKA-13267 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Gilles Philippart >Priority: Major > > We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these > errors pop up in apps using EOS: > {code:java} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > {code} > Full stack trace: > {code:java} > Error encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > RecordCollectorImpl.java 196 lambda$send$0(...) > RecordCollectorImpl.java:196:in `lambda$send$0' > KafkaProducer.java 1365 onCompletion(...) > KafkaProducer.java:1365:in `onCompletion' > ProducerBatch.java 231 completeFutureAndFireCallbacks(...) > ProducerBatch.java:231:in `completeFutureAndFireCallbacks' > ProducerBatch.java 159 abort(...) > ProducerBatch.java:159:in `abort' > RecordAccumulator.java 763 abortBatches(...) > RecordAccumulator.java:763:in `abortBatches' > More (5 lines) > Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error > encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > {code} > I've seen that KAFKA-6821 described the same problem on an earlier version of > Kafka and was closed due to the subsequent works on EOS. > Another ticket raised recently shows that the exception is still occurring > (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415251#comment-17415251 ] Matthias J. Sax commented on KAFKA-13292: - KIP-671 won't avoid the exception, but it gives you the ability to avoid that the thread dies, by using the newly added exception handler. (Well, technically the thread dies, but you can just restart a new one.) Also KIP-691 won't really _avoid_ the exception, but we plan to have build-it support to handle the exception within Kafka Streams such that the thread won't die and thus it does not surface to the exception handler. If you upgrade client side, you need to bump all versions for producer/consumer/admin/KafkaStreams within the same application, because you cannot mix-and-match those, but they must always be the same. – You don't need to upgrade the brokers though. Clients are (forward and) backward compatible to (newer) older brokers in general (for details, read the docs – there are some limitation what version work together – for your particular case, there should be no limitations IIRC). > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sende
[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415256#comment-17415256 ] Guozhang Wang commented on KAFKA-10643: --- [~eran-levy] Just for the static membership issue (i.e. the second one you summarized above), I'm wondering if this is related: https://issues.apache.org/jira/browse/KAFKA-10284. Just to verify you may want to consider upgrading to 2.6.1 and see if it is resolved. Another wild thought on top of my head is https://issues.apache.org/jira/browse/KAFKA-12363 It's a known issue we've realized while augmenting the consumer protocol but thus far I have not personally encounter symptoms due to this. From what you described I'd not bet on that one being the root cause, but just want to bring this to your radar. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache
guozhangwang commented on pull request #11278: URL: https://github.com/apache/kafka/pull/11278#issuecomment-919601918 > @guozhangwang I think there are defiantly questions to be answered about this and really what we want to the user to achieve. I think the maxbufferedbytes is really being used for two purposes and perhaps we can split it out into 2 different things that will give a better result. I think it bounds the heap space and it reserves an amount of space for each thread. > It might make sense to have the users set a bound of the heap on the cluster. Then for each topology reserve some fraction of that space for that topology (either in percentages or num of bytes), then any unclaimed space can be split among the tasks (or just the tasks who have not reserved space). I think this would clear up some confusion about what this config is for. WDYT? cc/ @ableegoldman Hey @wcarlson5 sorry I'm late replying here. I think on the high level what you proposed as two purposes make sense to me, it's just that in practice it may be hard to implement it in a simple and elegant way. Just throw some scenarios to deep dive here, say the instance's total cache size configured as 100, with no topology added yet. 1. If a topology A is added with that config overridden as 150, should we just silently accepts it but only give it 100 or should we reject that topology? 2. Assume we accept A as in 1) above, if a topology B without config overridden is added, how should we distribute the cache between the two topology? If we just consider B as 100 (the instance's config value), should we distribute the cache as 60:40 among A and B? And if another C with config overridden as 250 is added, should we redistribute the total cache size as 30:20:50 among A / B / C? 3. Assume we reject A as in 1) above, and suppose now B without config overridden is added first, which would get all of 100, and then later D with config overridden as 40 is added, would we distribute 100 as 30:70 (i.e. D first get 40, and then B/D split the remaining 60)? 4. Assume that we have multiple threads, do we dynamically change the cache size allocation to each thread upon rebalance based on which tasks of A / B /C each thread hosts? Thinking about all that, I'm a bit concerned that this config would be very hard for users to understand: it seems in either way "you do not get what you specified", and the overridden value would just be used as a relative ratio. Plus if we distribute the cache among threads according to the topologies specified distribution, that would get even more cumbersome to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
guozhangwang commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r708747814 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -611,6 +611,7 @@ public synchronized Topology build() { */ public synchronized Topology build(final Properties props) { internalStreamsBuilder.buildAndOptimizeTopology(props); +internalTopologyBuilder.setTopologyProperties(props); Review comment: Yeah something like https://github.com/apache/kafka/pull/11272/files#diff-0e5e608831150c058e2ad1b45d38ad941739562588ec0fdb97cc9f742919fb1fR145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
guozhangwang commented on pull request #11272: URL: https://github.com/apache/kafka/pull/11272#issuecomment-919603159 `Checkstyle rule violations were found.` :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
junrao commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r708747148 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { -val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Unit = { +val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) { var discardBatchRecords: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. -discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) +val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) + +if (batch.isControlBatch) + discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime +else + discardBatchRecords = canDiscardBatch Review comment: This is an existing issue. The following comment on line 1136 seems out of place since the code does that check is inside isBatchLastRecordOfProducer() below. ``` // We may retain a record from an aborted transaction if it is the last entry // written by a given producerId. ``` ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -493,19 +496,19 @@ private[log] class Cleaner(val id: Int, * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { +doClean(cleanable, time.milliseconds()) + } + + private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = { +info("Beginning cleaning of log %s".format(cleanable.log.name)) + // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment -val deleteHorizonMs = +val legacyDeleteHorizonMs = Review comment: Perhaps mention in the comment above that this is only used for the old message format? ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -1060,7 +1082,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) { /** * Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position, - * and whether it needs compaction immediately. + * the reason why it is being cleaned, and whether it needs compaction immediately. Review comment: We no longer pass in the reason. ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -163,17 +163,18 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * Choose the log to clean next and add it to the in-progress set. We recompute this * each time from the full set of logs to allow logs to be dynamically added to the pool of logs * the log manager maintains. +* Returns a tuple of an Option of the log selected t
[GitHub] [kafka] guozhangwang commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
guozhangwang commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r708751965 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -112,10 +113,42 @@ private KafkaStreamsNamedTopologyWrapper(final Collection topolog ); } +/** + * Provides a high-level DSL for specifying the processing logic of your application and building it into an + * independent topology that can be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ +public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName, final Properties topologyConfigs) { +if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) { +throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name"); +} +return new NamedTopologyBuilder(topologyName, applicationConfigs, topologyConfigs); +} + +/** + * Returns an empty topology for full control over the graph of streams and processor nodes that define the processing + * logic to be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ +public NamedTopology newNamedTopology(final String topologyName, final Properties topologyConfigs) { Review comment: This one seems not used? And even in external callers like ksql, I think we would go through the newNamedTopologyBuilder().build() right? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -359,15 +363,23 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } -public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { -Objects.requireNonNull(config, "config can't be null"); -this.config = config; +public synchronized final void setTopologyOverrides(final Properties props) { Review comment: This function seems not used in non-testing code any more? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ## @@ -927,10 +931,54 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { } @Test -public void shouldSetStreamsConfigOnRewriteTopology() { +public void shouldSetTopologyConfigOnRewriteTopology() { +final Properties globalProps = StreamsTestUtils.getStreamsConfig(); +globalProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L); +final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps); +final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig); +assertThat(topologyBuilder.topologyConfig(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties(; + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, equalTo(100L)); +} + +@Test +public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { +final Properties topologyOverrides = new Properties(); +topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); +topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); + topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); + topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); +builder.setNamedTopology(new NamedTopology("test-topology", new Properties())); +builder.setTopologyOverrides(topologyOverrides); + +final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); +final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(config); + + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, equalTo(500L)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().taskTimeoutMs, equalTo(1000L)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxBufferedSize, equalTo(15)); + assertThat(topologyBuilde
[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
mjsax commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r708758100 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -209,37 +210,39 @@ private void emitNonJoinedOuterRecords(final WindowStore, Left // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; -try (final KeyValueIterator>, LeftOrRightValue> it = store.all()) { +try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { while (it.hasNext()) { -final KeyValue>, LeftOrRightValue> record = it.next(); +final KeyValue, LeftOrRightValue> record = it.next(); -final Windowed> windowedKey = record.key; -final LeftOrRightValue value = record.value; -sharedTimeTracker.minTime = windowedKey.window().start(); +final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = record.key; +final LeftOrRightValue value = record.value; +final K key = timestampedKeyAndJoinSide.getKey(); +final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); +sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { break; } -final K key = windowedKey.key().getKey(); -final long time = windowedKey.window().start(); - final R nullJoinedValue; if (isLeftSide) { nullJoinedValue = joiner.apply(key, -(V1) value.getLeftValue(), -(V2) value.getRightValue()); +value.getLeftValue(), +value.getRightValue()); } else { nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +(V1) value.getRightValue(), +(V2) value.getLeftValue()); } -context().forward(key, nullJoinedValue, To.all().withTimestamp(time)); +context().forward(key, nullJoinedValue, To.all().withTimestamp(timestamp)); -// Delete the key from the outer window store now it is emitted -store.put(record.key.key(), null, record.key.window().start()); +// blind-delete the key from the outer window store now it is emitted; +// we may delete some values of the same key which has not been iterated yet, +// but since the iterator would still return that key this is fine. +// we do not use the delete() call since that would incur an extra get +store.put(timestampedKeyAndJoinSide, null); Review comment: If we do the blind delete of the full list, is there not potential data loss scenario? Assume you have a list if 2+ left-join candidates in the list. We process the first one, and do the delete, and we crash before processing the others in the list. After restart, we might never emit the left-join result for those record? Thus, would we need to do the delete only after we have exhausted the list, ie, move from key-timestamp to a different key or timestamp ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
mjsax commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r708758796 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. + * Range iterators would also flatten the value lists and return the values one-by-one. + * + * This store is used for cases where we do not want to de-duplicate values of the same keys but want to retain all such values. + */ +@SuppressWarnings("unchecked") +public class ListValueStore +extends WrappedStateStore, Bytes, byte[]> +implements KeyValueStore { + +static private final Serde> LIST_SERDE = Serdes.ListSerde(ArrayList.class, Serdes.ByteArray()); + +ListValueStore(final KeyValueStore bytesStore) { +super(bytesStore); +} + +@Override +public void put(final Bytes key, final byte[] value) { +// if the value is null we can skip the get and blind delete +if (value == null) { +wrapped().put(key, null); +} else { +final byte[] oldValue = wrapped().get(key); + +if (oldValue == null) { +wrapped().put(key, LIST_SERDE.serializer().serialize(null, Collections.singletonList(value))); +} else { +final List list = LIST_SERDE.deserializer().deserialize(null, oldValue); +list.add(value); + +wrapped().put(key, LIST_SERDE.serializer().serialize(null, list)); +} +} +} + +@Override +public byte[] putIfAbsent(final Bytes key, final byte[] value) { +throw new UnsupportedOperationException("putIfAbsent not supported"); +} + +@Override +public void putAll(final List> entries) { +throw new UnsupportedOperationException("putAll not supported"); +} + +@Override +public byte[] delete(final Bytes key) { +// we intentionally disable delete calls since the returned bytes would +// represent a list, not a single value; we need to have a new API for delete if we do need it +throw new UnsupportedOperationException("delete not supported"); Review comment: Seems you did not disable `get()` yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415337#comment-17415337 ] Victoria Xia commented on KAFKA-13261: -- Hey [~vvcephei] [~abellemare] [~guozhang] I had a look at the code and it seems to support Adam's theory that the custom partitioners from the repartition() step aren't taken into account by the foreign key join. In particular, both the subscription sink topic and the response sink topic are created without partitioners specified in the StreamSinkNode: [https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1051] [https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1122] IIUC, this means the default partitioner is used for both topics despite the custom partitioners on the source tables, which explains the missing join results. One thing I don't understand: even if we fix this bug by propagating the partitioner information from the repartition() step to the foreign key join, wouldn't we still have an analogous bug if either of the topics for the source tables had custom partitioning logic created from outside Streams (i.e., without a repartition() step in the Streams topology)? In this case, Streams has no way of determining the partitioning of the source tables, which means we need an update to the interface for foreign key joins so that users can specify a partitioner to use in order to ensure copartitioning of the subscription and response topics with the relevant tables. Is this reasoning sound? If so, does it make sense to add logic into Streams to propagate information about the partitioner from the repartition() step to the foreign key join, or would it be better to require users to use the new interface to pass the same partitioner from the repartition() step(s) to the foreign key join as well? The latter seems more consistent with how copartitioning for joins is typically the user's responsibility, and also avoids the need to update Streams with logic for tracking partitioners throughout the topology. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materi