[GitHub] [kafka] cadonna merged pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1
cadonna merged pull request #11317: URL: https://github.com/apache/kafka/pull/11317 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13287) Upgrade RocksDB to 6.22.1.1
[ https://issues.apache.org/jira/browse/KAFKA-13287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-13287. --- Resolution: Fixed > Upgrade RocksDB to 6.22.1.1 > --- > > Key: KAFKA-13287 > URL: https://issues.apache.org/jira/browse/KAFKA-13287 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.1.0 > > Attachments: compat_report.html > > > RocksDB 6.22.1.1 is source compatible with RocksDB 6.19.3 that Streams > currently used (see attached compatibility report). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414070#comment-17414070 ] Elliot West commented on KAFKA-10338: - [~rsivaram], I've created KAFKA-13293 to address this. > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13293) Support client reload of PEM certificates
Elliot West created KAFKA-13293: --- Summary: Support client reload of PEM certificates Key: KAFKA-13293 URL: https://issues.apache.org/jira/browse/KAFKA-13293 Project: Kafka Issue Type: Improvement Components: clients, security Affects Versions: 2.7.1, 2.8.0, 2.7.0 Reporter: Elliot West Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as client configuration properties in addition to JKS file based key stores (KAFKA-10338). With PEM, certificate chains are passed into clients as simple string based key-value properties, alongside existing client configuration. This offers a number of benefits: it provides a JVM agnostic security mechanism from the perspective of clients, removes the client's dependency on the local filesystem, and allows the the encapsulation of the entire client configuration into a single payload. However, the current client PEM implement has a feature regression when compared with the JKS implementation. With the JKS approach, clients would automatically reload certificates when the key stores were modified on disk. This enables a seamless approach for the replacement of certificates when they are due to expire; no further configuration or explicit interference with the client lifecycle is needed for the client to migrate to renewed certificates. Such a capability does not currently exist for PEM. One supplies key chains when instantiating clients only - there is no mechanism available to either directly reconfigure the client, or for the client to observe changes to the original properties set reference used in construction. Additionally, no work-arounds are documented that might given users alternative strategies for dealing with expiring certificates. Given that expiration and renewal of certificates is an industry standard practice, it could be argued that the current PEM client implementation is not fit for purpose. In summary, a mechanism should be provided such that clients can automatically detect, load, and use updated PEM key chains from some non-file based source (object ref, method invocation, listener, etc.) Finally, It is suggested that in the short-term Kafka documentation be updated to describe any viable mechanism for updating client PEM certs (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9569) RemoteStorageManager implementation for HDFS storage.
[ https://issues.apache.org/jira/browse/KAFKA-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-9569. --- Resolution: Fixed > RemoteStorageManager implementation for HDFS storage. > - > > Key: KAFKA-9569 > URL: https://issues.apache.org/jira/browse/KAFKA-9569 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Ying Zheng >Priority: Major > > This is about implementing `RemoteStorageManager` for HDFS to verify the > proposed SPIs are sufficient. It looks like the existing RSM interface should > be sufficient. If needed, we will discuss any required changes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] msillence opened a new pull request #11322: smt to rename schemas based on regex
msillence opened a new pull request #11322: URL: https://github.com/apache/kafka/pull/11322 smt to rename schemas based on regex example configuration: ``` "transforms.regexSchema.regex": ".*\\.([^.]*)\\.(Value|Key)", "transforms.regexSchema.replacement": "com.company.schema.$1.$2", ``` We need this as our debezium connector creates schemas with the name of the host and database schema When we move from dev to production these names change Using Avro the the name in the schema is used to pick the class and thus will fail when we move between environmnets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414137#comment-17414137 ] Rajini Sivaram commented on KAFKA-13293: [~teabot] Which client (and configs) do you use to dynamically reload JKS keystores in clients today? As far as I understand, default implementation in Apache Kafka supports reloading of keystores and truststores from files only for brokers. Clients can override `ssl.engine.factory.class`, but not sure if that is what you were referring to for dynamic reloading in clients. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13294) Upgrade Netty to 4.1.68
Utkarsh Khare created KAFKA-13294: - Summary: Upgrade Netty to 4.1.68 Key: KAFKA-13294 URL: https://issues.apache.org/jira/browse/KAFKA-13294 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.8.0 Reporter: Utkarsh Khare netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and SnappyFrameDecoder. Reference : [CVE-2021-37136 - https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv] [CVE-2021-37137 - https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363] Can we upgrade Netty to version 4.1.68.Final to fix this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13294) Upgrade Netty to 4.1.68
[ https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414176#comment-17414176 ] Utkarsh Khare commented on KAFKA-13294: --- I can push a PR for updating netty. > Upgrade Netty to 4.1.68 > --- > > Key: KAFKA-13294 > URL: https://issues.apache.org/jira/browse/KAFKA-13294 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Utkarsh Khare >Priority: Minor > > netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and > SnappyFrameDecoder. > Reference : > [CVE-2021-37136 - > https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv] > [CVE-2021-37137 - > https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363] > > Can we upgrade Netty to version 4.1.68.Final to fix this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes
[ https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Utkarsh Khare updated KAFKA-13294: -- Summary: Upgrade Netty to 4.1.68 for CVE fixes (was: Upgrade Netty to 4.1.68) > Upgrade Netty to 4.1.68 for CVE fixes > - > > Key: KAFKA-13294 > URL: https://issues.apache.org/jira/browse/KAFKA-13294 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Utkarsh Khare >Priority: Minor > > netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and > SnappyFrameDecoder. > Reference : > [CVE-2021-37136 - > https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv] > [CVE-2021-37137 - > https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363] > > Can we upgrade Netty to version 4.1.68.Final to fix this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state
[ https://issues.apache.org/jira/browse/KAFKA-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414181#comment-17414181 ] priya Vijay commented on KAFKA-12751: - [~rsivaram] We are currently using kafka 2.8 in production. Can you please let me know what are the impacts of this bug? also, when will 2.8.1 be released? appreciate your guidance on this thanks Priya Vijay > ISRs remain in in-flight state if proposed state is same as actual state > > > Key: KAFKA-12751 > URL: https://issues.apache.org/jira/browse/KAFKA-12751 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.7.2, 2.8.1 > > > If proposed ISR state in an AlterIsr request is the same as the actual state, > Controller returns a successful response without performing any updates. But > the broker code that processes the response leaves the ISR state in in-flight > state without committing. This prevents further ISR updates until the next > leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192 ] Elliot West commented on KAFKA-13293: - Hey [~rsivaram] - thank you for your reply. I was certain that JKS reload was the case, but must admit that having looked through the Kafka source code again, I cannot find an obvious code path that would provide a JKS reload capability for clients. I believe this functionality would be beneficial (both for JKS and PEM) and can update the ticket accordingly if you concur. In the meantime, could you advise what the current best practice approach would be? I resume pulling new certs and then restarting the clients? > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192 ] Elliot West edited comment on KAFKA-13293 at 9/13/21, 1:12 PM: --- Hey [~rsivaram] - thank you for your reply. I believe you are correct - I was certain that JKS reload was the case, but must admit that having looked through the Kafka source code again, I cannot find an obvious code path that would provide a JKS reload capability for clients. I believe this functionality would be beneficial (both for JKS and PEM) and can update the ticket accordingly if you concur. In the meantime, could you advise what the current best practice approach would be? I resume pulling new certs and then restarting the clients? was (Author: teabot): Hey [~rsivaram] - thank you for your reply. I was certain that JKS reload was the case, but must admit that having looked through the Kafka source code again, I cannot find an obvious code path that would provide a JKS reload capability for clients. I believe this functionality would be beneficial (both for JKS and PEM) and can update the ticket accordingly if you concur. In the meantime, could you advise what the current best practice approach would be? I resume pulling new certs and then restarting the clients? > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192 ] Elliot West edited comment on KAFKA-13293 at 9/13/21, 1:13 PM: --- Hey [~rsivaram] - thank you for your reply. I believe you are correct - I was certain that JKS reload was the case, but must admit that having looked through the Kafka source code again, I cannot find an obvious code path that would provide a JKS reload capability for clients. I believe this functionality would be beneficial (both for JKS and PEM) and can update the ticket accordingly if you concur. In the meantime, could you advise what the current best practice approach would be? I presume pulling new certs and then restarting the clients? was (Author: teabot): Hey [~rsivaram] - thank you for your reply. I believe you are correct - I was certain that JKS reload was the case, but must admit that having looked through the Kafka source code again, I cannot find an obvious code path that would provide a JKS reload capability for clients. I believe this functionality would be beneficial (both for JKS and PEM) and can update the ticket accordingly if you concur. In the meantime, could you advise what the current best practice approach would be? I resume pulling new certs and then restarting the clients? > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante closed pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante closed pull request #10112: URL: https://github.com/apache/kafka/pull/10112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state
[ https://issues.apache.org/jira/browse/KAFKA-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414212#comment-17414212 ] Rajini Sivaram commented on KAFKA-12751: The bug only happens in an unusual case where the proposed ISR is the same as the expected ISR and the chances of hitting that are very low. And it is an issue only if `inter.broker.protocol.version >= 2.7`. If it does occur, further ISR updates don't occur, so the broker will need to be restarted. Release plan for 2.8.1 is here: [https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.8.1,] work is under way and the release is expected in within the next few weeks. > ISRs remain in in-flight state if proposed state is same as actual state > > > Key: KAFKA-12751 > URL: https://issues.apache.org/jira/browse/KAFKA-12751 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.7.2, 2.8.1 > > > If proposed ISR state in an AlterIsr request is the same as the actual state, > Controller returns a successful response without performing any updates. But > the broker code that processes the response leaves the ISR state in in-flight > state without committing. This prevents further ISR updates until the next > leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414217#comment-17414217 ] Rajini Sivaram commented on KAFKA-13293: [~teabot] Yes, agree that it will be useful to support certificate updates without restart for clients. 1) At the moment, as you said you will need to pull in new certs and restart clients if you are using Kafka without custom plugins. 2) If you are willing to add a custom plugin, you can add a custom implementation of `ssl.engine.factory.class` that recreates the SslEngineFactory when certs change by watching key/truststore files or pulling in PEM configs from an external source. 3) For the longer term, [https://cwiki.apache.org/confluence/display/KAFKA/KIP-649%3A+Dynamic+Client+Configuration] will be useful to perform dynamic updates for clients similar to how we do dynamic broker config updates. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227 ] Elliot West commented on KAFKA-13293: - [~rsivaram] on point 2, would that actually work for clients though? I ask because it seems as though the \{{DefaultSslEngineFactory}} [already does this| https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113]. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227 ] Elliot West edited comment on KAFKA-13293 at 9/13/21, 2:15 PM: --- [~rsivaram] on point 2, would that actually work for clients though? I ask because it seems as though the \{{DefaultSslEngineFactory}} [already does this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113]. was (Author: teabot): [~rsivaram] on point 2, would that actually work for clients though? I ask because it seems as though the \{{DefaultSslEngineFactory}} [already does this| https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113]. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227 ] Elliot West edited comment on KAFKA-13293 at 9/13/21, 2:16 PM: --- [~rsivaram] on point 2, would that actually work for clients though? I ask because it seems as though the \{{DefaultSslEngineFactory}} [already does this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113] for JKS? was (Author: teabot): [~rsivaram] on point 2, would that actually work for clients though? I ask because it seems as though the \{{DefaultSslEngineFactory}} [already does this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113]. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes
[ https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Utkarsh Khare reassigned KAFKA-13294: - Assignee: Utkarsh Khare > Upgrade Netty to 4.1.68 for CVE fixes > - > > Key: KAFKA-13294 > URL: https://issues.apache.org/jira/browse/KAFKA-13294 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Utkarsh Khare >Assignee: Utkarsh Khare >Priority: Minor > > netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and > SnappyFrameDecoder. > Reference : > [CVE-2021-37136 - > https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv] > [CVE-2021-37137 - > https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363] > > Can we upgrade Netty to version 4.1.68.Final to fix this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414263#comment-17414263 ] Rajini Sivaram commented on KAFKA-13293: [~teabot] Sorry, I should have said `recreates SSLContext` rather than `SslEngineFactory`. I haven't tried it out, but I think you can implement a custom factory that has a mutable SSLContext which is updated when required. Basically we want to make sure `CustomSslEngineFactory` always has a valid `SSLContext` that is used by SslEngineFactory#createClientSslEngine() to create a new SSLEngine whenever a new connection is established, but we can swap out the SSLContext with a new one if we can detect a change. As you said, we cannot rely on `SslEngineFactory#shouldBeRebuilt()` since that is invoked only for brokers when ALTER_CONFIGS is processed. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates
[ https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414272#comment-17414272 ] Elliot West commented on KAFKA-13293: - [~rsivaram] thank you for the clarification. > Support client reload of PEM certificates > - > > Key: KAFKA-13293 > URL: https://issues.apache.org/jira/browse/KAFKA-13293 > Project: Kafka > Issue Type: Improvement > Components: clients, security >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Elliot West >Priority: Major > > Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as > client configuration properties in addition to JKS file based key stores > (KAFKA-10338). With PEM, certificate chains are passed into clients as simple > string based key-value properties, alongside existing client configuration. > This offers a number of benefits: it provides a JVM agnostic security > mechanism from the perspective of clients, removes the client's dependency on > the local filesystem, and allows the the encapsulation of the entire client > configuration into a single payload. > However, the current client PEM implement has a feature regression when > compared with the JKS implementation. With the JKS approach, clients would > automatically reload certificates when the key stores were modified on disk. > This enables a seamless approach for the replacement of certificates when > they are due to expire; no further configuration or explicit interference > with the client lifecycle is needed for the client to migrate to renewed > certificates. > Such a capability does not currently exist for PEM. One supplies key chains > when instantiating clients only - there is no mechanism available to either > directly reconfigure the client, or for the client to observe changes to the > original properties set reference used in construction. Additionally, no > work-arounds are documented that might given users alternative strategies for > dealing with expiring certificates. Given that expiration and renewal of > certificates is an industry standard practice, it could be argued that the > current PEM client implementation is not fit for purpose. > In summary, a mechanism should be provided such that clients can > automatically detect, load, and use updated PEM key chains from some non-file > based source (object ref, method invocation, listener, etc.) > Finally, It is suggested that in the short-term Kafka documentation be > updated to describe any viable mechanism for updating client PEM certs > (perhaps closing existing client and then recreating?). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
cmccabe merged pull request #11312: URL: https://github.com/apache/kafka/pull/11312 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414431#comment-17414431 ] Matthias J. Sax commented on KAFKA-13292: - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler] might help – it's part of AK 2.8 though. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the wh
[jira] [Resolved] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13292. - Resolution: Invalid Closing this as "invalid" as it seems to be a question, not a bug report. Please use the mailing lists to ask questions. Thanks. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the whole application. > > I understand that the transactional.id.expi
[GitHub] [kafka] cmccabe commented on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe commented on pull request #11320: URL: https://github.com/apache/kafka/pull/11320#issuecomment-918440670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe removed a comment on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe removed a comment on pull request #11320: URL: https://github.com/apache/kafka/pull/11320#issuecomment-918440709 Updates: - Use deprecated converter class in order to support Scala 2.12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery
C0urante opened a new pull request #11323: URL: https://github.com/apache/kafka/pull/11323 [Jira](https://issues.apache.org/jira/browse/KAFKA-12226) Replaces https://github.com/apache/kafka/pull/10112 Replaces the current batch-based logic for offset commits with a dynamic, non-blocking approach outlined in discussion on #10112 [here](https://github.com/apache/kafka/pull/10112#issuecomment-910510910), [here](https://github.com/apache/kafka/pull/10112#issuecomment-910540773), [here](https://github.com/apache/kafka/pull/10112#issuecomment-914348989), [here](https://github.com/apache/kafka/pull/10112#issuecomment-914547745), and [here](https://github.com/apache/kafka/pull/10112#issuecomment-915350922). Essentially, a deque is kept for every source partition that a source task produces records for, and each element in that deque is a `SubmittedRecord` with a flag to track whether the producer has ack'd the delivery of that source record to Kafka yet. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread. The behavior of the `offset.flush.timeout.ms` property is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` blocking on the acknowledgment of records by the producer. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-918481881 @rhauch (and, if interested, @hachikuji) new PR is up: https://github.com/apache/kafka/pull/11323 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery
C0urante commented on pull request #11323: URL: https://github.com/apache/kafka/pull/11323#issuecomment-918482406 CC @rhauch; hopefully this is fairly close to what you had in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
hutchiko commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-918537415 @guozhangwang @ableegoldman unfortunately I could never reproduce the CI failures however I have pushed up a refactor of the method which I think was responsible for the flakiness. The original version of the the method was scanning backwards through the changelog topic searching for the top record so I could cross check that record's offset with the checkpointed offset. It had an implicit assumption that the consumer it was driving backwards would always get some records after a 50ms `poll` - thinking this through it's obviously a false assumption. I switched the logic around so it just consumes forwards until it finds the end of the topic there are no assumptions about timing in the new logic so I'm hoping that will fix the flakiness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
mumrah commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r707654639 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, delta: MetadataDelta, newImage: MetadataImage): Unit = { try { + debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") Review comment: Are there any concerns with the MetadataDelta having a lot of records in it? I'm wondering if this could potentially dump out a lot of text into the debug log. Also, from a security perspective, is it okay to write the contents of _any_ metadata record to the debug log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r707673948 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, delta: MetadataDelta, newImage: MetadataImage): Unit = { try { + debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") Review comment: I found it useful when debugging the failures here, so I thought it would come in handy when debugging system test failures as well. I guess if there were any concerns from a security perspective, it would be about the `ConfigurationDelta`. Perhaps if we want to err on the safe side, we could print the keys only? That might be good enough since it would probably be clear from the test context what the change was. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
mumrah commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r707674451 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, delta: MetadataDelta, newImage: MetadataImage): Unit = { try { + debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") Review comment: Yea, configs is what I was thinking of. Logging the keys only sounds like a good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-918582404 I checked the failures of the new run and they are not related to the new tests. Merging to trunk now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang merged pull request #11283: URL: https://github.com/apache/kafka/pull/11283 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-918584383 cc @kkonstantine , this is a critical bug fix hence I'm cherry-picking to 3.0 as well. If RC2 is voted through then it will fall on 3.0.1, otherwise if we vote for another RC3 I think it'd be a great-to-have in 3.0.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
guozhangwang commented on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-918598714 > Hope that's clear. > I also updated in the PR description. > Thank you. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
guozhangwang merged pull request #11292: URL: https://github.com/apache/kafka/pull/11292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
guozhangwang commented on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-918599583 Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r707726453 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { Review comment: I wasn't sure if there was a case where the broker could be temporarily shut down? Though maybe it doesn't matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
hachikuji commented on a change in pull request #11080: URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ## @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer, Long logAppendTime) { int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); if (buffer.remaining() < sizeOfBodyInBytes) -return null; +throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes + +" bytes in record payload, but instead the buffer has only " + buffer.remaining() + +" remaining bytes."); Review comment: Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I believe this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic: ```java try { return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } ``` So it is already handle underflows in a similar way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
hachikuji commented on a change in pull request #11080: URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ## @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer, Long logAppendTime) { int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); if (buffer.remaining() < sizeOfBodyInBytes) -return null; +throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes + +" bytes in record payload, but instead the buffer has only " + buffer.remaining() + +" remaining bytes."); Review comment: Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic: ```java try { return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } ``` So it is already handle underflows in a similar way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
hachikuji commented on a change in pull request #11080: URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ## @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer, Long logAppendTime) { int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); if (buffer.remaining() < sizeOfBodyInBytes) -return null; +throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes + +" bytes in record payload, but instead the buffer has only " + buffer.remaining() + +" remaining bytes."); Review comment: Apologies for the delay here. I don't see a problem with the change. I believe that @ijuma is right that the fetch response may still return incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We stop batch iteration early if there is incomplete data, so we would never reach the `readFrom` here which is called for each record in the batch. It's worth noting also that the only caller of this method (in `DefaultRecordBatch.uncompressedIterator`) has the following logic: ```java try { return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } ``` So it is already handle underflows in a similar way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r707742261 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + val log = partition.localLogOrException + val fetchOffset = initialFetchOffset(log) + partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset) +}.toMap + + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) Review comment: There are a few ways to do this and I'll be interested to see which is the best. It is a little tricky to go from topic partition -> fetcher -> fetch state where we can update the topic ID. One way is to use `find` like `fetchState` does. Another is to look up the fetcher in the map like `addFetcherForPartitions` does. Finally, we could try to just add topic IDs to all the partitions in the fetcher by iterating through, but that might not be very efficient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
ijuma commented on a change in pull request #11080: URL: https://github.com/apache/kafka/pull/11080#discussion_r707781491 ## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ## @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer, Long logAppendTime) { int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); if (buffer.remaining() < sizeOfBodyInBytes) -return null; +throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes + +" bytes in record payload, but instead the buffer has only " + buffer.remaining() + +" remaining bytes."); Review comment: Thanks for checking @hachikuji. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request
hachikuji commented on pull request #11080: URL: https://github.com/apache/kafka/pull/11080#issuecomment-918658119 @ccding I kicked off a new build since it has been a while since the PR was submitted. Assuming tests are ok, I will merge shortly. Thanks for your patience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r707810706 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedTimestampedKeyAndJoinSideSerializerTest.java ## @@ -24,49 +24,49 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThrows; -public class KeyAndJoinSideSerializerTest { +public class TimestampedTimestampedKeyAndJoinSideSerializerTest { Review comment: Ack. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serde; + +/** + * A wrapper key-value store that serializes the record values bytes as a list. + * As a result put calls would be interpreted as a get-append-put to the underlying RocksDB store. Review comment: Ack. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueStore; + +public class ChangeLoggingTimeOrderedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore { + +ChangeLoggingTimeOrderedKeyValueBytesStore(final KeyValueStore inner) { +super(inner); +} + +@Override +public void put(final Bytes key, +final byte[] value) { +wrapped().put(key, value); +// we need to log the new value, which is different from the put value; Review comment: Ack ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueStore; + +public class ChangeLogg
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r707812546 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static java.util.Arrays.asList; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.junit.Assert.assertEquals; + +public class ListValueStoreTest { +private static final String STORE_NAME = "rocksDB list value store"; + +MockRecordCollector recordCollector; +KeyValueStore listStore; +InternalMockProcessorContext context; + +final File baseDir = TestUtils.tempDirectory("test"); + +@Before +public void setup() { +listStore = buildStore(Serdes.Integer(), Serdes.String()); + +recordCollector = new MockRecordCollector(); +context = new InternalMockProcessorContext<>( +baseDir, +Serdes.String(), +Serdes.Integer(), +recordCollector, +new ThreadCache( +new LogContext("testCache"), +0, +new MockStreamsMetrics(new Metrics(; +context.setTime(1L); + +listStore.init((StateStoreContext) context, listStore); +} + +@After +public void after() { +listStore.close(); +} + + KeyValueStore buildStore(final Serde keySerde, + final Serde valueSerde) { +return new ListValueStoreBuilder<>( +new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false), +keySerde, +valueSerde, +Time.SYSTEM) +.build(); +} + +@Test +public void shouldGetAll() { +listStore.put(0, "zero"); +// should retain duplicates +listStore.put(0, "zero again"); +listStore.put(1, "one"); +listStore.put(2, "two"); + +final KeyValue zero = KeyValue.pair(0, "zero"); +final KeyValue zeroAgain = KeyValue.pair(0, "zero again"); +final KeyValue one = KeyValue.pair(1, "one"); +final KeyValue two = KeyValue.pair(2, "two"); + +assertEquals( +asList(zero, zeroAgain, one, two), +toList(listStore.all()) +); +} + +@Test +public void shouldGetAllNonDeletedRecords() { +// Add some records +listStore.put(0, "zero"); +listStore.put(1, "one"); +listStore.put(1, "one again"); +listStore.put(2, "two"); +listStore.put(3, "three"); +listStore.put(4, "four"); + +// Delete some records +listStore.put(1, null); +listStore.put(3, null); + +// Only non-deleted records should appear in the all() iterator +final KeyValue zero = KeyValue.pair(0, "zero"); +final KeyValue two = KeyValue.pair(2, "two"); +final KeyValue four = KeyValue.pair(4, "four"); + +assertEquals( +asList(zero, two, four), +toList(listStore.all()) +); +} + +@Test +public void shouldGetAllReturnTimestampOrderedRecords() { +// Add some records in different order +listStore.put(4, "four"); +listStore.put(0, "zero"); +listStore.put(2, "two1"); +listStore.put(
[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
mjsax commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r707817999 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static java.util.Arrays.asList; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.junit.Assert.assertEquals; + +public class ListValueStoreTest { +private static final String STORE_NAME = "rocksDB list value store"; + +MockRecordCollector recordCollector; +KeyValueStore listStore; +InternalMockProcessorContext context; + +final File baseDir = TestUtils.tempDirectory("test"); + +@Before +public void setup() { +listStore = buildStore(Serdes.Integer(), Serdes.String()); + +recordCollector = new MockRecordCollector(); +context = new InternalMockProcessorContext<>( +baseDir, +Serdes.String(), +Serdes.Integer(), +recordCollector, +new ThreadCache( +new LogContext("testCache"), +0, +new MockStreamsMetrics(new Metrics(; +context.setTime(1L); + +listStore.init((StateStoreContext) context, listStore); +} + +@After +public void after() { +listStore.close(); +} + + KeyValueStore buildStore(final Serde keySerde, + final Serde valueSerde) { +return new ListValueStoreBuilder<>( +new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false), +keySerde, +valueSerde, +Time.SYSTEM) +.build(); +} + +@Test +public void shouldGetAll() { +listStore.put(0, "zero"); +// should retain duplicates +listStore.put(0, "zero again"); +listStore.put(1, "one"); +listStore.put(2, "two"); + +final KeyValue zero = KeyValue.pair(0, "zero"); +final KeyValue zeroAgain = KeyValue.pair(0, "zero again"); +final KeyValue one = KeyValue.pair(1, "one"); +final KeyValue two = KeyValue.pair(2, "two"); + +assertEquals( +asList(zero, zeroAgain, one, two), +toList(listStore.all()) Review comment: Potential memory leak? `toList` does not close the iterator -- should we update `toList` ? Or make sure we close the iterator in the using try-with-resources? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
mjsax commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r707817999 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static java.util.Arrays.asList; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.junit.Assert.assertEquals; + +public class ListValueStoreTest { +private static final String STORE_NAME = "rocksDB list value store"; + +MockRecordCollector recordCollector; +KeyValueStore listStore; +InternalMockProcessorContext context; + +final File baseDir = TestUtils.tempDirectory("test"); + +@Before +public void setup() { +listStore = buildStore(Serdes.Integer(), Serdes.String()); + +recordCollector = new MockRecordCollector(); +context = new InternalMockProcessorContext<>( +baseDir, +Serdes.String(), +Serdes.Integer(), +recordCollector, +new ThreadCache( +new LogContext("testCache"), +0, +new MockStreamsMetrics(new Metrics(; +context.setTime(1L); + +listStore.init((StateStoreContext) context, listStore); +} + +@After +public void after() { +listStore.close(); +} + + KeyValueStore buildStore(final Serde keySerde, + final Serde valueSerde) { +return new ListValueStoreBuilder<>( +new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false), +keySerde, +valueSerde, +Time.SYSTEM) +.build(); +} + +@Test +public void shouldGetAll() { +listStore.put(0, "zero"); +// should retain duplicates +listStore.put(0, "zero again"); +listStore.put(1, "one"); +listStore.put(2, "two"); + +final KeyValue zero = KeyValue.pair(0, "zero"); +final KeyValue zeroAgain = KeyValue.pair(0, "zero again"); +final KeyValue one = KeyValue.pair(1, "one"); +final KeyValue two = KeyValue.pair(2, "two"); + +assertEquals( +asList(zero, zeroAgain, one, two), +toList(listStore.all()) Review comment: Potential memory leak? `toList` does not close the iterator -- should we update `toList` ? Or make sure we close the iterator in the using try-with-resources? (similar below -- not sure about existing tests using `toList`? -- might be worth to double check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
A. Sophie Blee-Goldman created KAFKA-13295: -- Summary: Long restoration times for new tasks can lead to transaction timeouts Key: KAFKA-13295 URL: https://issues.apache.org/jira/browse/KAFKA-13295 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.1.0 In some EOS applications with relatively long restoration times we've noticed a series of ProducerFencedExceptions occurring during/immediately after restoration. The broker logs were able to confirm these were due to transactions timing out. In Streams, it turns out we automatically begin a new txn when calling {{send}} (if there isn’t already one in flight). A {{send}} occurs often outside a commit during active processing (eg writing to the changelog), leaving the txn open until the next commit. And if a StreamThread has been actively processing when a rebalance results in a new stateful task without revoking any existing tasks, the thread won’t actually commit this open txn before it goes back into the restoration phase while it builds up state for the new task. So the in-flight transaction is left open during restoration, during which the StreamThread only consumes from the changelog without committing, leaving it vulnerable to timing out when restoration times exceed the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414652#comment-17414652 ] A. Sophie Blee-Goldman commented on KAFKA-13295: Note that https://issues.apache.org/jira/browse/KAFKA-13249, while not directly related, makes it more likely for an application to hit this bug since it can result in Streams restoring additional records and thus longer restoration times > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: eos > Fix For: 3.1.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
ableegoldman commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-918706779 @guozhangwang can you cherrypick this back to 2.8 at least? Maybe also 2.7 if there aren't any conflicts (seems like it should be a smooth merge but 🤷♀️ ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732 ] NEERAJ VAIDYA commented on KAFKA-13292: --- Thanks [~mjsax] for the reference to the KIP. It might not be possible for my application to be upgraded soon. However, I still feel this is a defect because I believed that stream processing should continue even when transactional.id.expiration.ms has elapsed and should not bring down the entire application. I noticed from the logs that the application does try to get a new epoch and producer Id, but then still suspends and stops all threads. As you will observe below towards the end of the log messages, for the task[ 0_2], the call to "*Invoking InitProducerId*" is made, but still the flow ends up in the UnCaughtExceptionHandler. I was of the understanding that once the producerId epoch is bumped, stream processing should resume. However, it starts shutting down the entire application by an uncaught exception. *FYI - I am using exactly_once processing and enable.idempotence=true* {code:java} 2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its tra nsactional id. 2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT hread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PE NDING_SHUTDOWN 2021-09-10T12:21:59.741 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down 2021-09-10T12:21:59.743 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down 2021-09-10T12:21:59.744 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamTask -
[jira] [Comment Edited] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732 ] NEERAJ VAIDYA edited comment on KAFKA-13292 at 9/14/21, 5:49 AM: - Thanks [~mjsax] for the reference to the KIP. It might not be possible for my application to be upgraded soon. However, I still feel this is a defect because I believed that stream processing should continue even when transactional.id.expiration.ms has elapsed and should not bring down the entire application. I noticed from the logs that the application does try to get a new epoch and producer Id, but then still suspends and stops all threads. As you will observe below towards the end of the log messages (at timestamp : 2021-09-10T12:21:59.766) , for the task[ 0_2], the call to "*Invoking InitProducerId*" is made, but still the flow ends up in the UnCaughtExceptionHandler. I was of the understanding that once the producerId epoch is bumped, stream processing should resume. However, it starts shutting down the entire application by an uncaught exception. *FYI - I am using exactly_once processing and enable.idempotence=true* {code:java} 2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its tra nsactional id. 2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT hread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PE NDING_SHUTDOWN 2021-09-10T12:21:59.741 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down 2021-09-10T12:21:59.743 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down 2021-09-10T12:21:59.744 [mtx-ca
[jira] [Reopened] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NEERAJ VAIDYA reopened KAFKA-13292: --- As indicated in my previous comments. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the whole application. > > I understand that the transactional.id.expiration.ms = 7 days (default) will > likely cause the application thread from getting expired, but why does this > spec
[GitHub] [kafka] dongjinleekr opened a new pull request #11324: KAFKA-13294: Upgrade Netty to 4.1.68 for CVE fixes
dongjinleekr opened a new pull request #11324: URL: https://github.com/apache/kafka/pull/11324 `netty-codec` `4.1.62.Final` has the following security vulnerabilities, which in turn effects `netty-transport-native-epoll` Apache Kafka depends on. - [CVE-2021-37136](https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv) - [CVE-2021-37137](https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc
dongjinleekr commented on pull request #10678: URL: https://github.com/apache/kafka/pull/10678#issuecomment-918854190 Rebased onto the latest trunk. cc/ @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13296) Verify old assignment within StreamsPartitionAssignor
Matthias J. Sax created KAFKA-13296: --- Summary: Verify old assignment within StreamsPartitionAssignor Key: KAFKA-13296 URL: https://issues.apache.org/jira/browse/KAFKA-13296 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax `StreamsPartitionAssignor` is responsible to assign partitions and tasks to all StreamsThreads within an application. While it ensures to not assign a single partition/task to two threads, there is limited verification about it. In particular, we had one incident for with a zombie thread/consumer did not cleanup its own internal state correctly due to KAFKA-12983. This unclean zombie-state implied that the _old assignment_ reported to `StreamsPartitionAssignor` contained a single partition for two consumers. As a result, both threads/consumers later revoked the same partition and the zombie-thread could commit it's unclean work (even if it should have been fenced), leading to duplicate output under EOS_v2. We should consider to add a check to `StreamsPartitionAssignor` if the _old assignment_ is valid, ie, no partition should be missing and no partition should be assigned to two consumers. For this case, we should log the invalid _old assignment_ and send an error code back to all consumer that indicates that they should shut down "unclean" (ie, without and flushing and no committing any offsets or transactions). -- This message was sent by Atlassian Jira (v8.3.4#803005)