[GitHub] [kafka] cadonna merged pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1

2021-09-13 Thread GitBox


cadonna merged pull request #11317:
URL: https://github.com/apache/kafka/pull/11317


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13287) Upgrade RocksDB to 6.22.1.1

2021-09-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13287.
---
Resolution: Fixed

> Upgrade RocksDB to 6.22.1.1
> ---
>
> Key: KAFKA-13287
> URL: https://issues.apache.org/jira/browse/KAFKA-13287
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.1.0
>
> Attachments: compat_report.html
>
>
> RocksDB 6.22.1.1 is source compatible with RocksDB 6.19.3 that Streams 
> currently used  (see attached compatibility report).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414070#comment-17414070
 ] 

Elliot West commented on KAFKA-10338:
-

[~rsivaram], I've created KAFKA-13293 to address this.

 

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)
Elliot West created KAFKA-13293:
---

 Summary: Support client reload of PEM certificates
 Key: KAFKA-13293
 URL: https://issues.apache.org/jira/browse/KAFKA-13293
 Project: Kafka
  Issue Type: Improvement
  Components: clients, security
Affects Versions: 2.7.1, 2.8.0, 2.7.0
Reporter: Elliot West


Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
client configuration properties in addition to JKS file based key stores 
(KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
string based key-value properties, alongside existing client configuration. 
This offers a number of benefits: it provides a JVM agnostic security mechanism 
from the perspective of clients, removes the client's dependency on the local 
filesystem, and allows the the encapsulation of the entire client configuration 
into a single payload.

However, the current client PEM implement has a feature regression when 
compared with the JKS implementation. With the JKS approach, clients would 
automatically reload certificates when the key stores were modified on disk. 
This enables a seamless approach for the replacement of certificates when they 
are due to expire; no further configuration or explicit interference with the 
client lifecycle is needed for the client to migrate to renewed certificates.

Such a capability does not currently exist for PEM. One supplies key chains 
when instantiating clients only - there is no mechanism available to either 
directly reconfigure the client, or for the client to observe changes to the 
original properties set reference used in construction. Additionally, no 
work-arounds are documented that might given users alternative strategies for 
dealing with expiring certificates. Given that expiration and renewal of 
certificates is an industry standard practice, it could be argued that the 
current PEM client implementation is not fit for purpose.

In summary, a mechanism should be provided such that clients can automatically 
detect, load, and use updated PEM key chains from some non-file based source 
(object ref, method invocation, listener, etc.)

Finally, It is suggested that in the short-term Kafka documentation be updated 
to describe any viable mechanism for updating client PEM certs (perhaps closing 
existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9569) RemoteStorageManager implementation for HDFS storage.

2021-09-13 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-9569.
---
Resolution: Fixed

> RemoteStorageManager implementation for HDFS storage.
> -
>
> Key: KAFKA-9569
> URL: https://issues.apache.org/jira/browse/KAFKA-9569
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Ying Zheng
>Priority: Major
>
> This is about implementing `RemoteStorageManager` for HDFS to verify the 
> proposed SPIs are sufficient. It looks like the existing RSM interface should 
> be sufficient. If needed, we will discuss any required changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] msillence opened a new pull request #11322: smt to rename schemas based on regex

2021-09-13 Thread GitBox


msillence opened a new pull request #11322:
URL: https://github.com/apache/kafka/pull/11322


   smt to rename schemas based on regex
   
   example configuration:
   
   ```
   "transforms.regexSchema.regex": ".*\\.([^.]*)\\.(Value|Key)",
   "transforms.regexSchema.replacement": "com.company.schema.$1.$2",
   ```
   
   We need this as our debezium connector creates schemas with the name of the 
host and database schema 
   When we move from dev to production these names change
   Using Avro the the name in the schema is used to pick the class and thus 
will fail when we move between environmnets


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414137#comment-17414137
 ] 

Rajini Sivaram commented on KAFKA-13293:


[~teabot] Which client (and configs) do you use to dynamically reload JKS 
keystores in clients today? As far as I understand, default implementation in 
Apache Kafka supports reloading of keystores and truststores from files only 
for brokers. Clients can override `ssl.engine.factory.class`, but not sure if 
that is what you were referring to for dynamic reloading in clients.

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13294) Upgrade Netty to 4.1.68

2021-09-13 Thread Utkarsh Khare (Jira)
Utkarsh Khare created KAFKA-13294:
-

 Summary: Upgrade Netty to 4.1.68
 Key: KAFKA-13294
 URL: https://issues.apache.org/jira/browse/KAFKA-13294
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.8.0
Reporter: Utkarsh Khare


netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
SnappyFrameDecoder. 

Reference :

[CVE-2021-37136 - 
https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]

[CVE-2021-37137 - 
https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]

 

Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13294) Upgrade Netty to 4.1.68

2021-09-13 Thread Utkarsh Khare (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414176#comment-17414176
 ] 

Utkarsh Khare commented on KAFKA-13294:
---

I can push a PR for updating netty. 

> Upgrade Netty to 4.1.68
> ---
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Priority: Minor
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-09-13 Thread Utkarsh Khare (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Utkarsh Khare updated KAFKA-13294:
--
Summary: Upgrade Netty to 4.1.68 for CVE fixes  (was: Upgrade Netty to 
4.1.68)

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Priority: Minor
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state

2021-09-13 Thread priya Vijay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414181#comment-17414181
 ] 

priya Vijay commented on KAFKA-12751:
-

[~rsivaram] We are currently using kafka 2.8  in production. Can you please let 
me know what are the impacts of this bug?

also, when will 2.8.1 be released?

appreciate your guidance on this

thanks

Priya Vijay

> ISRs remain in in-flight state if proposed state is same as actual state
> 
>
> Key: KAFKA-12751
> URL: https://issues.apache.org/jira/browse/KAFKA-12751
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.7.2, 2.8.1
>
>
> If proposed ISR state in an AlterIsr request is the same as the actual state, 
> Controller returns a successful response without performing any updates. But 
> the broker code that processes the response leaves the ISR state in in-flight 
> state without committing. This prevents further ISR updates until the next 
> leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192
 ] 

Elliot West commented on KAFKA-13293:
-

Hey [~rsivaram] - thank you for your reply.  I was certain that JKS reload was 
the case, but must admit that having looked through the Kafka source code 
again, I cannot find an obvious code path that would provide a JKS reload 
capability for clients. I believe this functionality would be beneficial (both 
for JKS and PEM) and can update the ticket accordingly if you concur.

In the meantime, could you advise what the current best practice approach would 
be? I resume pulling new certs and then restarting the clients?

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192
 ] 

Elliot West edited comment on KAFKA-13293 at 9/13/21, 1:12 PM:
---

Hey [~rsivaram] - thank you for your reply.  I believe you are correct - I was 
certain that JKS reload was the case, but must admit that having looked through 
the Kafka source code again, I cannot find an obvious code path that would 
provide a JKS reload capability for clients. I believe this functionality would 
be beneficial (both for JKS and PEM) and can update the ticket accordingly if 
you concur.

In the meantime, could you advise what the current best practice approach would 
be? I resume pulling new certs and then restarting the clients?


was (Author: teabot):
Hey [~rsivaram] - thank you for your reply.  I was certain that JKS reload was 
the case, but must admit that having looked through the Kafka source code 
again, I cannot find an obvious code path that would provide a JKS reload 
capability for clients. I believe this functionality would be beneficial (both 
for JKS and PEM) and can update the ticket accordingly if you concur.

In the meantime, could you advise what the current best practice approach would 
be? I resume pulling new certs and then restarting the clients?

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414192#comment-17414192
 ] 

Elliot West edited comment on KAFKA-13293 at 9/13/21, 1:13 PM:
---

Hey [~rsivaram] - thank you for your reply.  I believe you are correct - I was 
certain that JKS reload was the case, but must admit that having looked through 
the Kafka source code again, I cannot find an obvious code path that would 
provide a JKS reload capability for clients. I believe this functionality would 
be beneficial (both for JKS and PEM) and can update the ticket accordingly if 
you concur.

In the meantime, could you advise what the current best practice approach would 
be? I presume pulling new certs and then restarting the clients?


was (Author: teabot):
Hey [~rsivaram] - thank you for your reply.  I believe you are correct - I was 
certain that JKS reload was the case, but must admit that having looked through 
the Kafka source code again, I cannot find an obvious code path that would 
provide a JKS reload capability for clients. I believe this functionality would 
be beneficial (both for JKS and PEM) and can update the ticket accordingly if 
you concur.

In the meantime, could you advise what the current best practice approach would 
be? I resume pulling new certs and then restarting the clients?

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante closed pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-13 Thread GitBox


C0urante closed pull request #10112:
URL: https://github.com/apache/kafka/pull/10112


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state

2021-09-13 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414212#comment-17414212
 ] 

Rajini Sivaram commented on KAFKA-12751:


The bug only happens in an unusual case where the proposed ISR is the same as 
the expected ISR and the chances of hitting that are very low. And it is an 
issue only if `inter.broker.protocol.version >= 2.7`. If it does occur, further 
ISR updates don't occur, so the broker will need to be restarted. Release plan 
for 2.8.1 is here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.8.1,] work is 
under way and the release is expected in within the next few weeks.

> ISRs remain in in-flight state if proposed state is same as actual state
> 
>
> Key: KAFKA-12751
> URL: https://issues.apache.org/jira/browse/KAFKA-12751
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.7.2, 2.8.1
>
>
> If proposed ISR state in an AlterIsr request is the same as the actual state, 
> Controller returns a successful response without performing any updates. But 
> the broker code that processes the response leaves the ISR state in in-flight 
> state without committing. This prevents further ISR updates until the next 
> leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414217#comment-17414217
 ] 

Rajini Sivaram commented on KAFKA-13293:


[~teabot] Yes, agree that it will be useful to support certificate updates 
without restart for clients.

1) At the moment, as you said you will need to pull in new certs and restart 
clients if you are using Kafka without custom plugins.

2) If you are willing to add a custom plugin, you can add a custom 
implementation of `ssl.engine.factory.class` that recreates the 
SslEngineFactory when certs change by watching key/truststore files or pulling 
in PEM configs from an external source.

3) For the longer term, 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-649%3A+Dynamic+Client+Configuration]
 will be useful to perform dynamic updates for clients similar to how we do 
dynamic broker config updates.

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227
 ] 

Elliot West commented on KAFKA-13293:
-

[~rsivaram] on point 2, would that actually work for clients though? I ask 
because it seems as though the \{{DefaultSslEngineFactory}} [already does this|
https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113].

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227
 ] 

Elliot West edited comment on KAFKA-13293 at 9/13/21, 2:15 PM:
---

[~rsivaram] on point 2, would that actually work for clients though? I ask 
because it seems as though the \{{DefaultSslEngineFactory}} [already does 
this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113].


was (Author: teabot):
[~rsivaram] on point 2, would that actually work for clients though? I ask 
because it seems as though the \{{DefaultSslEngineFactory}} [already does this|
https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113].

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414227#comment-17414227
 ] 

Elliot West edited comment on KAFKA-13293 at 9/13/21, 2:16 PM:
---

[~rsivaram] on point 2, would that actually work for clients though? I ask 
because it seems as though the \{{DefaultSslEngineFactory}} [already does 
this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113]
 for JKS?


was (Author: teabot):
[~rsivaram] on point 2, would that actually work for clients though? I ask 
because it seems as though the \{{DefaultSslEngineFactory}} [already does 
this|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L102-L113].

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-09-13 Thread Utkarsh Khare (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Utkarsh Khare reassigned KAFKA-13294:
-

Assignee: Utkarsh Khare

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Utkarsh Khare
>Priority: Minor
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414263#comment-17414263
 ] 

Rajini Sivaram commented on KAFKA-13293:


[~teabot] Sorry, I should have said `recreates SSLContext` rather than 
`SslEngineFactory`. I haven't tried it out, but I think you can implement a 
custom factory that has a mutable SSLContext which is updated when required. 
Basically we want to make sure `CustomSslEngineFactory` always has a valid 
`SSLContext` that is used by SslEngineFactory#createClientSslEngine() to create 
a new SSLEngine whenever a new connection is established, but we can swap out 
the SSLContext with a new one if we can detect a change. As you said, we cannot 
rely on `SslEngineFactory#shouldBeRebuilt()` since that is invoked only for 
brokers when ALTER_CONFIGS is processed.

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13293) Support client reload of PEM certificates

2021-09-13 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414272#comment-17414272
 ] 

Elliot West commented on KAFKA-13293:
-

[~rsivaram] thank you for the clarification.

> Support client reload of PEM certificates
> -
>
> Key: KAFKA-13293
> URL: https://issues.apache.org/jira/browse/KAFKA-13293
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, security
>Affects Versions: 2.7.0, 2.8.0, 2.7.1
>Reporter: Elliot West
>Priority: Major
>
> Since Kafka 2.7.0, clients are able to authenticate using PEM certificates as 
> client configuration properties in addition to JKS file based key stores 
> (KAFKA-10338). With PEM, certificate chains are passed into clients as simple 
> string based key-value properties, alongside existing client configuration. 
> This offers a number of benefits: it provides a JVM agnostic security 
> mechanism from the perspective of clients, removes the client's dependency on 
> the local filesystem, and allows the the encapsulation of the entire client 
> configuration into a single payload.
> However, the current client PEM implement has a feature regression when 
> compared with the JKS implementation. With the JKS approach, clients would 
> automatically reload certificates when the key stores were modified on disk. 
> This enables a seamless approach for the replacement of certificates when 
> they are due to expire; no further configuration or explicit interference 
> with the client lifecycle is needed for the client to migrate to renewed 
> certificates.
> Such a capability does not currently exist for PEM. One supplies key chains 
> when instantiating clients only - there is no mechanism available to either 
> directly reconfigure the client, or for the client to observe changes to the 
> original properties set reference used in construction. Additionally, no 
> work-arounds are documented that might given users alternative strategies for 
> dealing with expiring certificates. Given that expiration and renewal of 
> certificates is an industry standard practice, it could be argued that the 
> current PEM client implementation is not fit for purpose.
> In summary, a mechanism should be provided such that clients can 
> automatically detect, load, and use updated PEM key chains from some non-file 
> based source (object ref, method invocation, listener, etc.)
> Finally, It is suggested that in the short-term Kafka documentation be 
> updated to describe any viable mechanism for updating client PEM certs 
> (perhaps closing existing client and then recreating?).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-13 Thread GitBox


cmccabe merged pull request #11312:
URL: https://github.com/apache/kafka/pull/11312


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414431#comment-17414431
 ] 

Matthias J. Sax commented on KAFKA-13292:
-

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler]
 might help – it's part of AK 2.8 though.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the wh

[jira] [Resolved] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13292.
-
Resolution: Invalid

Closing this as "invalid" as it seems to be a question, not a bug report. 
Please use the mailing lists to ask questions. Thanks.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the whole application.
>  
> I understand that the transactional.id.expi

[GitHub] [kafka] cmccabe commented on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-13 Thread GitBox


cmccabe commented on pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#issuecomment-918440670






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe removed a comment on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-13 Thread GitBox


cmccabe removed a comment on pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#issuecomment-918440709


   Updates:
   - Use deprecated converter class in order to support Scala 2.12


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

2021-09-13 Thread GitBox


C0urante opened a new pull request #11323:
URL: https://github.com/apache/kafka/pull/11323


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12226)
   
   Replaces https://github.com/apache/kafka/pull/10112
   
   Replaces the current batch-based logic for offset commits with a dynamic, 
non-blocking approach outlined in discussion on #10112 
[here](https://github.com/apache/kafka/pull/10112#issuecomment-910510910), 
[here](https://github.com/apache/kafka/pull/10112#issuecomment-910540773), 
[here](https://github.com/apache/kafka/pull/10112#issuecomment-914348989), 
[here](https://github.com/apache/kafka/pull/10112#issuecomment-914547745), and 
[here](https://github.com/apache/kafka/pull/10112#issuecomment-915350922).
   
   Essentially, a deque is kept for every source partition that a source task 
produces records for, and each element in that deque is a `SubmittedRecord` 
with a flag to track whether the producer has ack'd the delivery of that source 
record to Kafka yet. Periodically, the worker (on the same thread that polls 
the source task for records and transforms, converts, and dispatches them to 
the producer) polls acknowledged elements from the beginning of each of these 
deques and collects the latest offsets from these elements, storing them in a 
snapshot that is then committed on the separate source task offset thread.
   
   The behavior of the `offset.flush.timeout.ms` property is retained, but 
essentially now only applies to the actual writing of offset data to the 
internal offsets topic (if running in distributed mode) or the offsets file (if 
running in standalone mode). No time is spent during 
`WorkerSourceTask::commitOffsets` blocking on the acknowledgment of records by 
the producer.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-13 Thread GitBox


C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-918481881


   @rhauch (and, if interested, @hachikuji) new PR is up: 
https://github.com/apache/kafka/pull/11323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #11323: KAFKA-12226: Commit source task offsets without blocking on batch delivery

2021-09-13 Thread GitBox


C0urante commented on pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#issuecomment-918482406


   CC @rhauch; hopefully this is fairly close to what you had in mind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-13 Thread GitBox


hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918537415


   @guozhangwang @ableegoldman unfortunately I could never reproduce the CI 
failures however I have pushed up a refactor of the method which I think was 
responsible for the flakiness.  
   
   The original version of the the method was scanning backwards through the 
changelog topic searching for the top record so I could cross check that 
record's offset with the checkpointed offset. It had an implicit assumption 
that the consumer it was driving backwards would always get some records after 
a 50ms `poll` - thinking this through it's obviously a false assumption. 
   
   I switched the logic around so it just consumes forwards until it finds the 
end of the topic there are no assumptions about timing in the new logic so I'm 
hoping that will fix the flakiness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-09-13 Thread GitBox


mumrah commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707654639



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
 try {
+  debug(s"Publishing delta $delta with highest offset 
$newHighestMetadataOffset")

Review comment:
   Are there any concerns with the MetadataDelta having a lot of records in 
it? I'm wondering if this could potentially dump out a lot of text into the 
debug log.
   
   Also, from a security perspective, is it okay to write the contents of _any_ 
metadata record to the debug log? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-09-13 Thread GitBox


hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707673948



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
 try {
+  debug(s"Publishing delta $delta with highest offset 
$newHighestMetadataOffset")

Review comment:
   I found it useful when debugging the failures here, so I thought it 
would come in handy when debugging system test failures as well. I guess if 
there were any concerns from a security perspective, it would be about the 
`ConfigurationDelta`. Perhaps if we want to err on the safe side, we could 
print the keys only? That might be good enough since it would probably be clear 
from the test context what the change was. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-09-13 Thread GitBox


mumrah commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707674451



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
 try {
+  debug(s"Publishing delta $delta with highest offset 
$newHighestMetadataOffset")

Review comment:
   Yea, configs is what I was thinking of. Logging the keys only sounds 
like a good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-13 Thread GitBox


guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918582404


   I checked the failures of the new run and they are not related to the new 
tests. Merging to trunk now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-13 Thread GitBox


guozhangwang merged pull request #11283:
URL: https://github.com/apache/kafka/pull/11283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-13 Thread GitBox


guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918584383


   cc @kkonstantine , this is a critical bug fix hence I'm cherry-picking to 
3.0 as well. If RC2 is voted through then it will fall on 3.0.1, otherwise if 
we vote for another RC3 I think it'd be a great-to-have in 3.0.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-13 Thread GitBox


guozhangwang commented on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-918598714


   > Hope that's clear.
   > I also updated in the PR description.
   > Thank you.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-13 Thread GitBox


guozhangwang merged pull request #11292:
URL: https://github.com/apache/kafka/pull/11292


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-13 Thread GitBox


guozhangwang commented on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-918599583


   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-13 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r707726453



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {

Review comment:
   I wasn't sure if there was a case where the broker could be temporarily 
shut down? Though maybe it doesn't matter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

2021-09-13 Thread GitBox


hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Apologies for the delay here. I don't see a problem with the change. I 
believe that @ijuma is right that the fetch response may still return 
incomplete data, but I believe this is handled in `ByteBufferLogInputStream`. 
We stop batch iteration early if there is incomplete data, so we would never 
reach the `readFrom` here which is called for each record in the batch. It's 
worth noting also that the only caller of this method (in 
`DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
   try {
   return DefaultRecord.readFrom(buffer, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
   } catch (BufferUnderflowException e) {
   throw new InvalidRecordException("Incorrect declared 
batch size, premature EOF reached");
   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

2021-09-13 Thread GitBox


hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Apologies for the delay here. I don't see a problem with the change. I 
believe that @ijuma is right that the fetch response may still return 
incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We 
stop batch iteration early if there is incomplete data, so we would never reach 
the `readFrom` here which is called for each record in the batch. It's worth 
noting also that the only caller of this method (in 
`DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
   try {
   return DefaultRecord.readFrom(buffer, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
   } catch (BufferUnderflowException e) {
   throw new InvalidRecordException("Incorrect declared 
batch size, premature EOF reached");
   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

2021-09-13 Thread GitBox


hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Apologies for the delay here. I don't see a problem with the change. I 
believe that @ijuma is right that the fetch response may still return 
incomplete data, but I think this is handled in `ByteBufferLogInputStream`. We 
stop batch iteration early if there is incomplete data, so we would never reach 
the `readFrom` here which is called for each record in the batch. It's worth 
noting also that the only caller of this method (in 
`DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
   try {
 return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, 
baseSequence, logAppendTime);
   } catch (BufferUnderflowException e) {
 throw new InvalidRecordException("Incorrect declared batch size, premature 
EOF reached");
   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-13 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r707742261



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  val log = partition.localLogOrException
+  val fetchOffset = initialFetchOffset(log)
+  partition.topicPartition -> 
InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, 
fetchOffset)
+}.toMap
+
+
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

Review comment:
   There are a few ways to do this and I'll be interested to see which is 
the best. It is a little tricky to go from topic partition -> fetcher -> fetch 
state where we can update the topic ID. 
   
   One way is to use `find` like `fetchState` does. Another is to look up the 
fetcher in the map like `addFetcherForPartitions` does. Finally, we could try 
to just add topic IDs to all the partitions in the fetcher by iterating 
through, but that might not be very efficient




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

2021-09-13 Thread GitBox


ijuma commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707781491



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
  Long logAppendTime) {
 int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
 if (buffer.remaining() < sizeOfBodyInBytes)
-return null;
+throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+" bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+" remaining bytes.");

Review comment:
   Thanks for checking @hachikuji.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11080: KAFKA-13149: fix NPE for record==null when handling a produce request

2021-09-13 Thread GitBox


hachikuji commented on pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#issuecomment-918658119


   @ccding I kicked off a new build since it has been a while since the PR was 
submitted. Assuming tests are ok, I will merge shortly. Thanks for your 
patience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-13 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r707810706



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedTimestampedKeyAndJoinSideSerializerTest.java
##
@@ -24,49 +24,49 @@
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThrows;
 
-public class KeyAndJoinSideSerializerTest {
+public class TimestampedTimestampedKeyAndJoinSideSerializerTest {

Review comment:
   Ack.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStore.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * A wrapper key-value store that serializes the record values bytes as a list.
+ * As a result put calls would be interpreted as a get-append-put to the 
underlying RocksDB store.

Review comment:
   Ack.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class ChangeLoggingTimeOrderedKeyValueBytesStore extends 
ChangeLoggingKeyValueBytesStore {
+
+ChangeLoggingTimeOrderedKeyValueBytesStore(final KeyValueStore inner) {
+super(inner);
+}
+
+@Override
+public void put(final Bytes key,
+final byte[] value) {
+wrapped().put(key, value);
+// we need to log the new value, which is different from the put value;

Review comment:
   Ack

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimeOrderedKeyValueBytesStore.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class ChangeLogg

[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-13 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r707812546



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.junit.Assert.assertEquals;
+
+public class ListValueStoreTest {
+private static final String STORE_NAME = "rocksDB list value store";
+
+MockRecordCollector recordCollector;
+KeyValueStore listStore;
+InternalMockProcessorContext context;
+
+final File baseDir = TestUtils.tempDirectory("test");
+
+@Before
+public void setup() {
+listStore = buildStore(Serdes.Integer(), Serdes.String());
+
+recordCollector = new MockRecordCollector();
+context = new InternalMockProcessorContext<>(
+baseDir,
+Serdes.String(),
+Serdes.Integer(),
+recordCollector,
+new ThreadCache(
+new LogContext("testCache"),
+0,
+new MockStreamsMetrics(new Metrics(;
+context.setTime(1L);
+
+listStore.init((StateStoreContext) context, listStore);
+}
+
+@After
+public void after() {
+listStore.close();
+}
+
+ KeyValueStore buildStore(final Serde keySerde,
+  final Serde valueSerde) {
+return new ListValueStoreBuilder<>(
+new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false),
+keySerde,
+valueSerde,
+Time.SYSTEM)
+.build();
+}
+
+@Test
+public void shouldGetAll() {
+listStore.put(0, "zero");
+// should retain duplicates
+listStore.put(0, "zero again");
+listStore.put(1, "one");
+listStore.put(2, "two");
+
+final KeyValue zero = KeyValue.pair(0, "zero");
+final KeyValue zeroAgain = KeyValue.pair(0, "zero 
again");
+final KeyValue one = KeyValue.pair(1, "one");
+final KeyValue two = KeyValue.pair(2, "two");
+
+assertEquals(
+asList(zero, zeroAgain, one, two),
+toList(listStore.all())
+);
+}
+
+@Test
+public void shouldGetAllNonDeletedRecords() {
+// Add some records
+listStore.put(0, "zero");
+listStore.put(1, "one");
+listStore.put(1, "one again");
+listStore.put(2, "two");
+listStore.put(3, "three");
+listStore.put(4, "four");
+
+// Delete some records
+listStore.put(1, null);
+listStore.put(3, null);
+
+// Only non-deleted records should appear in the all() iterator
+final KeyValue zero = KeyValue.pair(0, "zero");
+final KeyValue two = KeyValue.pair(2, "two");
+final KeyValue four = KeyValue.pair(4, "four");
+
+assertEquals(
+asList(zero, two, four),
+toList(listStore.all())
+);
+}
+
+@Test
+public void shouldGetAllReturnTimestampOrderedRecords() {
+// Add some records in different order
+listStore.put(4, "four");
+listStore.put(0, "zero");
+listStore.put(2, "two1");
+listStore.put(

[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-13 Thread GitBox


mjsax commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r707817999



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.junit.Assert.assertEquals;
+
+public class ListValueStoreTest {
+private static final String STORE_NAME = "rocksDB list value store";
+
+MockRecordCollector recordCollector;
+KeyValueStore listStore;
+InternalMockProcessorContext context;
+
+final File baseDir = TestUtils.tempDirectory("test");
+
+@Before
+public void setup() {
+listStore = buildStore(Serdes.Integer(), Serdes.String());
+
+recordCollector = new MockRecordCollector();
+context = new InternalMockProcessorContext<>(
+baseDir,
+Serdes.String(),
+Serdes.Integer(),
+recordCollector,
+new ThreadCache(
+new LogContext("testCache"),
+0,
+new MockStreamsMetrics(new Metrics(;
+context.setTime(1L);
+
+listStore.init((StateStoreContext) context, listStore);
+}
+
+@After
+public void after() {
+listStore.close();
+}
+
+ KeyValueStore buildStore(final Serde keySerde,
+  final Serde valueSerde) {
+return new ListValueStoreBuilder<>(
+new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false),
+keySerde,
+valueSerde,
+Time.SYSTEM)
+.build();
+}
+
+@Test
+public void shouldGetAll() {
+listStore.put(0, "zero");
+// should retain duplicates
+listStore.put(0, "zero again");
+listStore.put(1, "one");
+listStore.put(2, "two");
+
+final KeyValue zero = KeyValue.pair(0, "zero");
+final KeyValue zeroAgain = KeyValue.pair(0, "zero 
again");
+final KeyValue one = KeyValue.pair(1, "one");
+final KeyValue two = KeyValue.pair(2, "two");
+
+assertEquals(
+asList(zero, zeroAgain, one, two),
+toList(listStore.all())

Review comment:
   Potential memory leak? `toList` does not close the iterator -- should we 
update `toList` ? Or make sure we close the iterator in the using 
try-with-resources?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-13 Thread GitBox


mjsax commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r707817999



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.junit.Assert.assertEquals;
+
+public class ListValueStoreTest {
+private static final String STORE_NAME = "rocksDB list value store";
+
+MockRecordCollector recordCollector;
+KeyValueStore listStore;
+InternalMockProcessorContext context;
+
+final File baseDir = TestUtils.tempDirectory("test");
+
+@Before
+public void setup() {
+listStore = buildStore(Serdes.Integer(), Serdes.String());
+
+recordCollector = new MockRecordCollector();
+context = new InternalMockProcessorContext<>(
+baseDir,
+Serdes.String(),
+Serdes.Integer(),
+recordCollector,
+new ThreadCache(
+new LogContext("testCache"),
+0,
+new MockStreamsMetrics(new Metrics(;
+context.setTime(1L);
+
+listStore.init((StateStoreContext) context, listStore);
+}
+
+@After
+public void after() {
+listStore.close();
+}
+
+ KeyValueStore buildStore(final Serde keySerde,
+  final Serde valueSerde) {
+return new ListValueStoreBuilder<>(
+new RocksDbKeyValueBytesStoreSupplier(STORE_NAME, false),
+keySerde,
+valueSerde,
+Time.SYSTEM)
+.build();
+}
+
+@Test
+public void shouldGetAll() {
+listStore.put(0, "zero");
+// should retain duplicates
+listStore.put(0, "zero again");
+listStore.put(1, "one");
+listStore.put(2, "two");
+
+final KeyValue zero = KeyValue.pair(0, "zero");
+final KeyValue zeroAgain = KeyValue.pair(0, "zero 
again");
+final KeyValue one = KeyValue.pair(1, "one");
+final KeyValue two = KeyValue.pair(2, "two");
+
+assertEquals(
+asList(zero, zeroAgain, one, two),
+toList(listStore.all())

Review comment:
   Potential memory leak? `toList` does not close the iterator -- should we 
update `toList` ? Or make sure we close the iterator in the using 
try-with-resources? (similar below -- not sure about existing tests using 
`toList`? -- might be worth to double check)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-09-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13295:
--

 Summary: Long restoration times for new tasks can lead to 
transaction timeouts
 Key: KAFKA-13295
 URL: https://issues.apache.org/jira/browse/KAFKA-13295
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.1.0


In some EOS applications with relatively long restoration times we've noticed a 
series of ProducerFencedExceptions occurring during/immediately after 
restoration. The broker logs were able to confirm these were due to 
transactions timing out.

In Streams, it turns out we automatically begin a new txn when calling {{send}} 
(if there isn’t already one in flight). A {{send}} occurs often outside a 
commit during active processing (eg writing to the changelog), leaving the txn 
open until the next commit. And if a StreamThread has been actively processing 
when a rebalance results in a new stateful task without revoking any existing 
tasks, the thread won’t actually commit this open txn before it goes back into 
the restoration phase while it builds up state for the new task. So the 
in-flight transaction is left open during restoration, during which the 
StreamThread only consumes from the changelog without committing, leaving it 
vulnerable to timing out when restoration times exceed the configured 
transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-09-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414652#comment-17414652
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13295:


Note that https://issues.apache.org/jira/browse/KAFKA-13249, while not directly 
related, makes it more likely for an application to hit this bug since it can 
result in Streams restoring additional records and thus longer restoration times

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos
> Fix For: 3.1.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-13 Thread GitBox


ableegoldman commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-918706779


   @guozhangwang can you cherrypick this back to 2.8 at least? Maybe also 2.7 
if there aren't any conflicts (seems like it should be a smooth merge but 🤷‍♀️ )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732
 ] 

NEERAJ VAIDYA commented on KAFKA-13292:
---

Thanks [~mjsax] for the reference to the KIP. It might not be possible for my 
application to be upgraded soon.

However, I still feel this is a defect because I believed that stream 
processing should continue even when transactional.id.expiration.ms has elapsed 
and should not bring down the entire application.

I noticed from the logs that the application does try to get a new epoch and 
producer Id, but then still suspends and stops all threads. As you will observe 
below towards the end of the log messages, for the task[ 0_2], the call to 
"*Invoking InitProducerId*" is made, but still the flow ends up in the 
UnCaughtExceptionHandler.

I was of the understanding that once the producerId epoch is bumped, stream 
processing should resume.

However, it starts shutting down the entire application by an uncaught 
exception.

*FYI - I am using exactly_once processing and enable.idempotence=true*

 
{code:java}
2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre
amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable 
error state due to org.apache.kafka.common.errors.InvalidPidMappingException: 
The producer attempted to use a producer id which is not currently assigned to 
its tra
nsactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT
hread-1] task [0_2] Error encountered sending record to topic 
mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception
 during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PE
NDING_SHUTDOWN
2021-09-10T12:21:59.741 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down
2021-09-10T12:21:59.743 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2021-09-10T12:21:59.744 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamTask -

[jira] [Comment Edited] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732
 ] 

NEERAJ VAIDYA edited comment on KAFKA-13292 at 9/14/21, 5:49 AM:
-

Thanks [~mjsax] for the reference to the KIP. It might not be possible for my 
application to be upgraded soon.

However, I still feel this is a defect because I believed that stream 
processing should continue even when transactional.id.expiration.ms has elapsed 
and should not bring down the entire application.

I noticed from the logs that the application does try to get a new epoch and 
producer Id, but then still suspends and stops all threads. As you will observe 
below towards the end of the log messages (at timestamp : 
2021-09-10T12:21:59.766) , for the task[ 0_2], the call to "*Invoking 
InitProducerId*" is made, but still the flow ends up in the 
UnCaughtExceptionHandler.

I was of the understanding that once the producerId epoch is bumped, stream 
processing should resume.

However, it starts shutting down the entire application by an uncaught 
exception.

*FYI - I am using exactly_once processing and enable.idempotence=true*

 
{code:java}
2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre
amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable 
error state due to org.apache.kafka.common.errors.InvalidPidMappingException: 
The producer attempted to use a producer id which is not currently assigned to 
its tra
nsactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT
hread-1] task [0_2] Error encountered sending record to topic 
mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception
 during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PE
NDING_SHUTDOWN
2021-09-10T12:21:59.741 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down
2021-09-10T12:21:59.743 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2021-09-10T12:21:59.744 
[mtx-ca

[jira] [Reopened] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA reopened KAFKA-13292:
---

As indicated in my previous comments.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the whole application.
>  
> I understand that the transactional.id.expiration.ms = 7 days (default) will 
> likely cause the application thread from getting expired, but why does this 
> spec

[GitHub] [kafka] dongjinleekr opened a new pull request #11324: KAFKA-13294: Upgrade Netty to 4.1.68 for CVE fixes

2021-09-13 Thread GitBox


dongjinleekr opened a new pull request #11324:
URL: https://github.com/apache/kafka/pull/11324


   `netty-codec` `4.1.62.Final` has the following security vulnerabilities, 
which in turn effects `netty-transport-native-epoll` Apache Kafka depends on.
   
   - 
[CVE-2021-37136](https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv)
   - 
[CVE-2021-37137](https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc

2021-09-13 Thread GitBox


dongjinleekr commented on pull request #10678:
URL: https://github.com/apache/kafka/pull/10678#issuecomment-918854190


   Rebased onto the latest trunk. cc/ @cadonna


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13296) Verify old assignment within StreamsPartitionAssignor

2021-09-13 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13296:
---

 Summary: Verify old assignment within StreamsPartitionAssignor
 Key: KAFKA-13296
 URL: https://issues.apache.org/jira/browse/KAFKA-13296
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


`StreamsPartitionAssignor` is responsible to assign partitions and tasks to all 
StreamsThreads within an application.

While it ensures to not assign a single partition/task to two threads, there is 
limited verification about it. In particular, we had one incident for with a 
zombie thread/consumer did not cleanup its own internal state correctly due to 
KAFKA-12983. This unclean zombie-state implied that the _old assignment_ 
reported to `StreamsPartitionAssignor` contained a single partition for two 
consumers. As a result, both threads/consumers later revoked the same partition 
and the zombie-thread could commit it's unclean work (even if it should have 
been fenced), leading to duplicate output under EOS_v2.

We should consider to add a check to `StreamsPartitionAssignor` if the _old 
assignment_ is valid, ie, no partition should be missing and no partition 
should be assigned to two consumers. For this case, we should log the invalid 
_old assignment_ and send an error code back to all consumer that indicates 
that they should shut down "unclean" (ie, without and flushing and no 
committing any offsets or transactions).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)