[jira] [Created] (KAFKA-13518) Update gson and netty-codec in 3.0.0
Pavel Kuznetsov created KAFKA-13518: --- Summary: Update gson and netty-codec in 3.0.0 Key: KAFKA-13518 URL: https://issues.apache.org/jira/browse/KAFKA-13518 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.0.0 Reporter: Pavel Kuznetsov *Describe the bug* I checked kafka_2.13-3.0.0.tgz distribution with WhiteSource and find out that some libraries have vulnerabilities. Here they are: * gson-2.8.6.jar has WS-2021-0419 vulnerability. The way to fix it is to upgrade to com.google.code.gson:gson:2.8.9 * netty-codec-4.1.65.Final.jar has CVE-2021-37136 and CVE-2021-37137 vulnerabilities. The way to fix it is to upgrade to io.netty:netty-codec:4.1.68.Final *To Reproduce* Download kafka_2.13-3.0.0.tgz and find jars, listed above. Check that these jars with corresponding versions are mentioned in corresponding vulnerability description. *Expected behavior* * gson upgraded to 2.8.9 or higher * netty-codec upgraded to 4.1.68.Final or higher *Actual behaviour* * gson is 2.8.6 * netty-codec is 4.1.65.Final -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13519) Same JAAS configuration used for all producers
Sergey Lemekhov created KAFKA-13519: --- Summary: Same JAAS configuration used for all producers Key: KAFKA-13519 URL: https://issues.apache.org/jira/browse/KAFKA-13519 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.0.0 Reporter: Sergey Lemekhov Attachments: AzureAuthCallbackHandler.java, OAuthBearerTokenImpl.java h3. Problem Sending messages to more than one Kafka cluster is impossible when using instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from {{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication configured. Only one {{org.apache.kafka.common.security.authenticator.LoginManager}} is created for all of the clusters ({{{}org.apache.kafka.common.security.authenticator.LoginManager#DYNAMIC_INSTANCES{}}} map contains only one entry). h3. How to reproduce Create two {{KafkaProducer}} instances with the following configuration (producers should use different kafka clusters and have different {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting): {code:java} Properties properties = new Properties(); properties.put("security.protocol", "SASL_SSL"); properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my case it is Azure Event Hubs with Kafka API support properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); //here custom configuration is set for callback handler settings properties.put(AzureAuthCallbackHandler.AUTHORITY_CONFIG, "https://login.microsoftonline.com/"; + tenantId); //azure tenant id properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 2.0 app id properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure oauth 2.0 app secret {code} Here {{AzureAuthCallbackHandler}} is a custom class which takes care of acquiring tokens from Azure. It is configured to fetch tokens from the same host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class instance should be different for each created {{{}KafkaProducer{}}}. However it is created only once by the client library (for the only {{{}LoginManager{}}}) and used for all producers. When using both producers for sending messages this leads to a lot of: {code:java} [Producer clientId=my-client-id] Error while fetching metadata with correlation id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION} {code} and finally to: {code:java} org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in metadata after 6 ms. {code} The second producer tries to fetch metadata from the cluster configured for the first producer and can't find target topic there. h3. Workaround Add a unique jaas config option for each Kafka cluster: {code:java} String options = "cluster=" + clusterName; //clusterName should be unique for each created KafkaProducer instance properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + options + ";"); {code} LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so adding a meaningless option to the string makes client library create different {{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears. This is done in {{org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager}} method: a {{LoginMetadata}} instance is created and configured with {{SASL_JAAS_CONFIG}} value and used as the key in {{LoginManager.DYNAMIC_INSTANCES}} map. h3. Suggested solution Each {{KafkaProducer}} instance should have individual isolated authentication handling objects linked to it regardless of their similarities in configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be instantiated for each producer individually (since its {{org.apache.kafka.common.security.auth.AuthenticateCallbackHandler#configure}} method is invoked with producer's configuration which could be different from one producer to another). h3. Additional details I've attached callback handler and token implementation for reference. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13520) Quickstart does not work at topic creation step
Robin Moffatt created KAFKA-13520: - Summary: Quickstart does not work at topic creation step Key: KAFKA-13520 URL: https://issues.apache.org/jira/browse/KAFKA-13520 Project: Kafka Issue Type: Bug Components: website Reporter: Robin Moffatt Step 3 fails {code:java} $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 Missing required argument "[partitions]" {code} Also needs `replication-factor` Correct statement is: {code:java} $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic quickstart-events {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka-site] bbejeck merged pull request #387: Fixes KAFKA-13520
bbejeck merged pull request #387: URL: https://github.com/apache/kafka-site/pull/387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka-site] bbejeck commented on pull request #387: Fixes KAFKA-13520
bbejeck commented on pull request #387: URL: https://github.com/apache/kafka-site/pull/387#issuecomment-988878505 thanks for the fix @rmoff! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13521) Supress changelog schema version breaks migration
Hector Geraldino created KAFKA-13521: Summary: Supress changelog schema version breaks migration Key: KAFKA-13521 URL: https://issues.apache.org/jira/browse/KAFKA-13521 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.5.1, 2.5.0, 2.4.0 Reporter: Hector Geraldino Hi, We recently updated the kafka-streams library in one of our apps from v2.5.0 to v2.5.1. This upgrade changes the header format of the state store for suppress changelog topics (see https://issues.apache.org/jira/browse/KAFKA-10173 and [https://github.com/apache/kafka/pull/8905)] What we noticed was that, introducing a new version on the binary schema header breaks older clients. I.e. applications running on v2.5.1 can parse the v3, v2, v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, previous versions) cannot read headers in v3 format. The logged exception is: {code:java} java.lang.IllegalArgumentException: Restoring apparently invalid changelog record: ConsumerRecord(topic = msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = 1638828473341, serialized key size = 32, serialized value size = 90, headers = RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = false), key = [B@5cf0e540, value = [B@40abc004) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[msgequator-1.59.3.jar:1.59.3] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[msgequator-1.59.3.jar:1.59.3] {code} There's obviously no clear solution for this other than stopping/starting all instances at once. A rolling bounce that takes some time to complete (in our case, days) will break instances that haven't been upgraded yet after a rebalance that causes older clients to pick up the newly encoded changelog partition(s) I don't know if adding a flag on the client side that lists the supported protocol versions (so it behaves like Kafka Consumers when picking the rebalance protocol - cooperative or eager), or if it just needs to be explicitly stated on the migration guide that a full stop/start migration is required in cases where the protocol version changes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2
Hey Matthias, Thank you for looking into the KIP! We are adding raw versions of typed queries, like `RawRangeQuery` because it simplifies internal query handling since the bytes stores only support raw queries. A typed RangeQuery is handled by the `MeteredStore` which creates a new `RawRangeQuery` to pass down to the wrapped stores. When it gets the result back, it deserializes the data and creates a typed query result to return to the user. So, the store's key serde are used to translate typed `RangeQueries` into `RawRangeQueries` and it's value serde are used to translate the result of the query on the way back. This allows users to provide their own queries even if the MeteredStore has no knowledge of them. I hope this answers your question. Let me know if you have any other questions. Best, Vicky On Tue, Dec 7, 2021 at 12:46 AM Matthias J. Sax wrote: > Thanks for the KIP. Overall, make sense. > > One question: What is the purpose to `RawRangeQuery`? Seems not very > user friendly. > > -Matthias > > > On 11/30/21 12:48 PM, Vasiliki Papavasileiou wrote: > > Thank you John! Yes, that was a typo from copying and I fixed it. > > > > Since there have been no more comments, I will start the vote. > > > > Best, > > Vicky > > > > On Tue, Nov 30, 2021 at 5:22 AM John Roesler > wrote: > > > >> Thanks for the KIP, Vicky! > >> > >> This KIP will help fill in the parity gap between IQ and > >> IQv2. > >> > >> One thing I noticed, which looks like just a typo is that > >> the value type of the proposed RangeQuery should probably be > >> KeyValueIterator, right? > >> > >> Otherwise, it looks good to me! > >> > >> Thanks, > >> -John > >> > >> On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou > >> wrote: > >>> Hello everyone, > >>> > >>> I would like to start the discussion for KIP-805: Add range and scan > >> query > >>> support in IQ v2 > >>> > >>> The KIP can be found here: > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2 > >>> > >>> Any suggestions are more than welcome. > >>> > >>> Many thanks, > >>> Vicky > >> > >> > > >
Re: [DISCUSS] KIP-782: Expandable batch size in producer
Hi Jun, 10. My understanding is that MemoryRecords would under the covers be allocated in chunks, so logically it still would be one MemoryRecords object, it's just instead of allocating one large chunk upfront, smaller chunks are allocated as needed to grow the batch and linked into a list. 11. The reason for 2 sizes is to avoid change of behavior when triggering batch send with large linger.ms. Currently, a batch send is triggered once the batch reaches 16KB by default, if we just raise the default to 256KB, then the batch send will be delayed. Using a separate value would allow keeping the current behavior when sending the batch out, but provide better throughput with high latency + high bandwidth channels. -Artem On Tue, Dec 7, 2021 at 5:29 PM Jun Rao wrote: > Hi, Luke, > > Thanks for the KIP. A few comments below. > > 10. Accumulating small batches could improve memory usage. Will that > introduce extra copying when generating a produce request? Currently, a > produce request takes a single MemoryRecords per partition. > 11. Do we need to introduce a new config batch.max.size? Could we just > increase the default of batch.size? We probably need to have KIP-794 > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > resolved > before increasing the default batch size since the larger the batch size, > the worse the problem in KIP-794. > 12. As for max.request.size, currently it's used for both the max record > size and the max request size, which is unintuitive. Perhaps we could > introduce a new config max.record.size that defaults to 1MB. We could then > increase max.request.size to sth like 10MB. > > Thanks, > > Jun > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits > wrote: > > > Hi Luke, > > > > I don't mind increasing the max.request.size to a higher number, e.g. 2MB > > could be good. I think we should also run some benchmarks to see the > > effects of different sizes. > > > > I agree that changing round robin to random solves an independent > existing > > issue, however the logic in this KIP exacerbates the issue, so there is > > some dependency. > > > > -Artem > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen wrote: > > > > > Hi Artem, > > > Yes, I agree if we go with random selection instead of round-robin > > > selection, the latency issue will be more fair. That is, if there are > 10 > > > partitions, the 10th partition will always be the last choice in each > > round > > > in current design, but with random selection, the chance to be selected > > is > > > more fair. > > > > > > However, I think that's kind of out of scope with this KIP. This is an > > > existing issue, and it might need further discussion to decide if this > > > change is necessary. > > > > > > I agree the default 32KB for "batch.max.size" might be not huge > > improvement > > > compared with 256KB. I'm thinking, maybe default to "64KB" for > > > "batch.max.size", and make the documentation clear that if the > > > "batch.max.size" > > > is increased, there might be chances that the "ready" partitions need > to > > > wait for next request to send to broker, because of the > > "max.request.size" > > > (default 1MB) limitation. "max.request.size" can also be considered to > > > increase to avoid this issue. What do you think? > > > > > > Thank you. > > > Luke > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits > > > wrote: > > > > > > > > maybe I can firstly decrease the "batch.max.size" to 32KB > > > > > > > > I think 32KB is too small. With 5 in-flight and 100ms latency we can > > > > produce 1.6MB/s per partition. With 256KB we can produce 12.8MB/s > per > > > > partition. We should probably set up some testing and see if 256KB > has > > > > problems. > > > > > > > > To illustrate latency dynamics, let's consider a simplified model: 1 > > > > in-flight request per broker, produce latency 125ms, 256KB max > request > > > > size, 16 partitions assigned to the same broker, every second 128KB > is > > > > produced to each partition (total production rate is 2MB/sec). > > > > > > > > If the batch size is 16KB, then the pattern would be the following: > > > > > > > > 0ms - produce 128KB into each partition > > > > 0ms - take 16KB from each partition send (total 256KB) > > > > 125ms - complete first 16KB from each partition, send next 16KB > > > > 250ms - complete second 16KB, send next 16KB > > > > ... > > > > 1000ms - complete 8th 16KB from each partition > > > > > > > > from this model it's easy to see that there are 256KB that are sent > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in > > > > 875ms. > > > > > > > > If the batch size is 256KB, then the pattern would be the following: > > > > > > > > 0ms - produce 128KB into each partition > > > > 0ms - take 128KB each from first 2 partitions and send (total 256KB) > > > > 125ms - complete 2 first partitions, send data from next 2 partitions > > > > ... > > > > 1000ms -
Re: [VOTE] KIP-792: Add "generation" field into consumer protocol
Hi Luke, Thanks for the KIP. One thing I'd like to double check is that, since the ConsumerProtocolSubscription is not auto generated from the json file, we need to make sure the old-versioned leader would be able to ignore the new field during an upgrade e.g. without crashing. Other than that, the KIP lgtm. Guozhang On Tue, Dec 7, 2021 at 6:16 PM Luke Chen wrote: > Hi Colin, > > I'm not quite sure if I understand your thoughts correctly. > If I was wrong, please let me know. > > Also, I'm not quite sure how I could lock this feature to a new IBP > version. > I saw "KIP-584: Versioning scheme for features" is still under development. > Not sure if I need to lock the IBP version, how should I do? > > Thank you. > Luke > > On Tue, Dec 7, 2021 at 9:41 PM Luke Chen wrote: > > > Hi Colin, > > > > Thanks for your comments. I've updated the KIP to mention about the KIP > > won't affect current broker side behavior. > > > > > One scenario that we need to consider is what happens during a rolling > > upgrade. If the coordinator moves back and forth between brokers with > > different IBPs, it seems that the same epoch numbers could be reused for > a > > group, if things are done in the obvious manner (old IBP = don't read or > > write epoch, new IBP = do) > > > > I think this KIP doesn't care about the group epoch number at all. The > > subscription metadata is passed from each member to group coordinator, > and > > then the group coordinator pass all of them back to the consumer lead. So > > even if the epoch number is reused in a group, it should be fine. On the > > other hand, the group coordinator will have no idea if the join group > > request sent from consumer containing the new subscription "generation" > > field or not, because group coordinator won't deserialize the metadata. > > > > I've added also added them into the KIP. > > > > Thank you. > > Luke > > > > On Mon, Dec 6, 2021 at 10:39 AM Colin McCabe wrote: > > > >> Hi Luke, > >> > >> Thanks for the explanation. > >> > >> I don't see any description of how the broker decides to use the new > >> version of ConsumerProtocolSubscription or not. This probably needs to > be > >> locked to a new IBP version. > >> > >> One scenario that we need to consider is what happens during a rolling > >> upgrade. If the coordinator moves back and forth between brokers with > >> different IBPs, it seems that the same epoch numbers could be reused > for a > >> group, if things are done in the obvious manner (old IBP = don't read or > >> write epoch, new IBP = do). > >> > >> best, > >> Colin > >> > >> > >> On Fri, Dec 3, 2021, at 18:46, Luke Chen wrote: > >> > Hi Colin, > >> > Thanks for your comment. > >> > > >> >> How are we going to avoid the situation where the broker restarts, > and > >> > the same generation number is reused? > >> > > >> > Actually, this KIP doesn't have anything to do with the brokers. The > >> > "generation" field I added, is in the subscription metadata, which > will > >> not > >> > be deserialized by brokers. The metadata is only deserialized by > >> consumer > >> > lead. And for the consumer lead, the only thing the lead cared about, > is > >> > the highest generation of the ownedPartitions among all the consumers. > >> With > >> > the highest generation of the ownedPartitions, the consumer lead can > >> > distribute the partitions as sticky as possible, and most importantly, > >> > without errors. > >> > > >> > That is, after this KIP, if the broker restarts, and the same > generation > >> > number is reused, it won't break current rebalance behavior. But it'll > >> help > >> > the consumer lead do the sticky assignments correctly. > >> > > >> > Thank you. > >> > Luke > >> > > >> > On Fri, Dec 3, 2021 at 6:30 AM Colin McCabe > wrote: > >> > > >> >> How are we going to avoid the situation where the broker restarts, > and > >> the > >> >> same generation number is reused? > >> >> > >> >> best, > >> >> Colin > >> >> > >> >> On Tue, Nov 30, 2021, at 16:36, Luke Chen wrote: > >> >> > Hi all, > >> >> > > >> >> > I'd like to start the vote for KIP-792: Add "generation" field into > >> >> > consumer protocol. > >> >> > > >> >> > The goal of this KIP is to allow the assignor/consumer coordinator > to > >> >> have > >> >> > a way to identify the out-of-date members/assignments, to avoid > >> rebalance > >> >> > stuck issues in current protocol. > >> >> > > >> >> > Detailed description can be found here: > >> >> > > >> >> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614 > >> >> > > >> >> > Any feedback is welcome. > >> >> > > >> >> > Thank you. > >> >> > Luke > >> >> > >> > > > -- -- Guozhang
Re: [DISCUSS] KIP-796: Interactive Query v2
Thanks for the clarification, it looks good to me now. On Wed, Nov 17, 2021 at 9:21 PM John Roesler wrote: > Ah, sorry, Guozhang, > > It seem I was a bit too eager with starting the vote thread. > > 13: I think that makes perfect sense. I've updated the KIP. > > 14: Oof, I can't believe I overlooked those newer > exceptions. Some of them will become exceptions in IQv2, > whereas others will beceome individual partition QueryResult > failures. Here is an accounting of what will become of those > proposed exceptions: > > * StreamsNotStartedException: thrown when stream thread > state is CREATED, the user can retry until to RUNNING. > > * StreamsRebalancingException: thrown when stream thread is > not running and stream state is REBALANCING. This exception > is no longer applicable. Regardless of the rebalanceing > state of the store's task, the state will either be up to > the requested bound or not. > > * StateStoreMigratedException: thrown when state store > already closed and stream state is RUNNING. This is a per- > partition failure, so it now maps to the > FailureReason.NOT_PRESENT failure. > > > * StateStoreNotAvailableException: thrown when state store > closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING / > ERROR. I (subjectively) felt the name was ambiguous with > respect to the prior condition in which a store partition is > not locally available. This is replaced with the thrown > exception, StreamsStoppedException (the JavaDoc states the > that it is thrown when Streams is in any terminal state). > > * UnknownStateStoreException: thrown when passing an unknown > state store. This is still a thown exception. > > * InvalidStateStorePartitionException: thrown when user > requested partition is not available on the stream instance. > If the partition actually does exist, then we will now > return a per-partition FailureReason.NOT_PRESENT. If the > requested partition is actually not present in the topology > at all, then we will return the per-partition > FailureReason.DOES_NOT_EXIST. > > Sorry for the oversight. The KIP has been updated. > > Thanks, > -John > > On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote: > > Thanks John. > > > > I made another pass on the KIP and overall it already looks pretty good. > I > > just have a couple more minor comments: > > > > 13: What do you think about just removing the following function in > > QueryResult > > > > // returns a failed query result because caller requested a "latest" > > bound, but the task was > > // not active and running. > > public static QueryResult notActive(String currentState); > > > > Instead just use `notUpToBound` for the case when `latest` bound is > > requested but the node is not the active replica. My main motivation is > > trying to abstract away the notion of active/standby from the public APIs > > itself, and hence capturing both this case as well as just a > > normal "position bound not achieved" in the same return signal, until > later > > when we think it is indeed needed to separate them with different > returns. > > > > 14: Regarding the possible exceptions being thrown from `query`, it seems > > more exception types are possible from KIP-216: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors > , > > should we include all in the javadocs? > > > > > > Guozhang > > > > > > > > On Wed, Nov 17, 2021 at 3:25 PM John Roesler > wrote: > > > > > Thanks for the reply, Guozhang! > > > > > > I have updated the KIP to tie up the remaining points that > > > we have discussed. I really appreciate your time in refining > > > the proposal. I included a quick summary of the final state > > > of our discussion points below. > > > > > > Since it seems like this discussion thread is pretty > > > convergent, I'll go ahead and start the voting thread soon. > > > > > > Thanks again! > > > -John > > > > > > P.S.: the final state of our discussion points: > > > > > > 1. I removed serdesForStore from the proposal (and moved it > > > to Rejected Alternatives) > > > > > > 2. Thanks for that reference. I had overlooked that > > > implementation. I'd note that the ListValuesStore is > > > currently only used in the KStream API, which doesn't > > > support queries at all. Due to its interface, it could > > > theoretically be used to materialize a KTable, though it has > > > no supplier provided in the typical Stores factory class. > > > > > > Regardless, I think that it would still be a similar story > > > to the Segmented store. The ListValues store would simply > > > choose to terminate the query on its own and not delegate to > > > any of the wrapped KeyValue stores. It wouldn't matter that > > > the wrapped stores have a query-handling facility of their > > > own, if the wrapping store doesn't choose to delegate, the > > > wrapped store will not try to execute any queries. > > > > > > Specifically regarding the key transformation that these > > > "forma
Re: [DISCUSS] KIP-801: Implement an Authorizer that stores metadata in __cluster_metadata
Hi Colin, Thanks for the KIP. 1. Can you talk about how the set of ACLs needed to authorize controllers and brokers will get bootstrapped? I am particularly interested in how we are going to configure a new cluster so that the controllers nodes can authorize each other's requests to establish quorum. After a quorum is established, I am interested in how the user would make sure that new brokers will get authorize against the controllers for requests like "register broker" and "fetch". thanks, -Jose
[jira] [Created] (KAFKA-13522) IQv2: Implement position tracking and bounding in API
John Roesler created KAFKA-13522: Summary: IQv2: Implement position tracking and bounding in API Key: KAFKA-13522 URL: https://issues.apache.org/jira/browse/KAFKA-13522 Project: Kafka Issue Type: Sub-task Reporter: John Roesler -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13491) Implement IQv2 Framework
[ https://issues.apache.org/jira/browse/KAFKA-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13491. -- Resolution: Fixed > Implement IQv2 Framework > > > Key: KAFKA-13491 > URL: https://issues.apache.org/jira/browse/KAFKA-13491 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > See https://cwiki.apache.org/confluence/x/85OqCw -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13506) IQv2: Transmit position to standbys
[ https://issues.apache.org/jira/browse/KAFKA-13506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13506. -- Resolution: Fixed > IQv2: Transmit position to standbys > --- > > Key: KAFKA-13506 > URL: https://issues.apache.org/jira/browse/KAFKA-13506 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13523) Implement IQv2 support in global stores
John Roesler created KAFKA-13523: Summary: Implement IQv2 support in global stores Key: KAFKA-13523 URL: https://issues.apache.org/jira/browse/KAFKA-13523 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Global stores pose one significant problem for IQv2: when they start up, they skip the regular ingest pipeline and instead use the "restoration" pipeline to read up until the current end offset. Then, they switch over to the regular ingest pipeline. IQv2 position tracking expects to track the position of each record from the input topic through the ingest pipeline and then get the position headers through the restoration pipeline via the changelog topic. The fact that global stores "restore" the input topic instead of ingesting it violates our expectations. It has also caused other problems, so we may want to consider switching the global store processing to use the normal paradigm rather than adding special-case logic to track positions in global stores. Note: there are two primary reasons that global stores behave this way: # We can write in batches during restoration, so the I/O may be more efficient # The global thread does not transition to RUNNING state until it reaches the (current) end of the input topic, which blocks other threads from joining against it, thereby improving the time synchronization of global KTable joins. If we want to propose changing the bootstrapping pipeline for global threads, we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] KIP-782: Expandable batch size in producer
Hi, Artem, Thanks for the reply. 11. Got it. To me, batch.size is really used for throughput and not for latency guarantees. There is no guarantee when 16KB will be accumulated. So, if users want any latency guarantee, they will need to specify linger.ms accordingly. Then, batch.size can just be used to tune for throughput. 20. Could we also describe the unit of compression? Is it batch.initial.size, batch.size or batch.max.size? Thanks, Jun On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits wrote: > Hi Jun, > > 10. My understanding is that MemoryRecords would under the covers be > allocated in chunks, so logically it still would be one MemoryRecords > object, it's just instead of allocating one large chunk upfront, smaller > chunks are allocated as needed to grow the batch and linked into a list. > > 11. The reason for 2 sizes is to avoid change of behavior when triggering > batch send with large linger.ms. Currently, a batch send is triggered > once > the batch reaches 16KB by default, if we just raise the default to 256KB, > then the batch send will be delayed. Using a separate value would allow > keeping the current behavior when sending the batch out, but provide better > throughput with high latency + high bandwidth channels. > > -Artem > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao wrote: > > > Hi, Luke, > > > > Thanks for the KIP. A few comments below. > > > > 10. Accumulating small batches could improve memory usage. Will that > > introduce extra copying when generating a produce request? Currently, a > > produce request takes a single MemoryRecords per partition. > > 11. Do we need to introduce a new config batch.max.size? Could we just > > increase the default of batch.size? We probably need to have KIP-794 > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > > > resolved > > before increasing the default batch size since the larger the batch size, > > the worse the problem in KIP-794. > > 12. As for max.request.size, currently it's used for both the max record > > size and the max request size, which is unintuitive. Perhaps we could > > introduce a new config max.record.size that defaults to 1MB. We could > then > > increase max.request.size to sth like 10MB. > > > > Thanks, > > > > Jun > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits > > wrote: > > > > > Hi Luke, > > > > > > I don't mind increasing the max.request.size to a higher number, e.g. > 2MB > > > could be good. I think we should also run some benchmarks to see the > > > effects of different sizes. > > > > > > I agree that changing round robin to random solves an independent > > existing > > > issue, however the logic in this KIP exacerbates the issue, so there is > > > some dependency. > > > > > > -Artem > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen wrote: > > > > > > > Hi Artem, > > > > Yes, I agree if we go with random selection instead of round-robin > > > > selection, the latency issue will be more fair. That is, if there are > > 10 > > > > partitions, the 10th partition will always be the last choice in each > > > round > > > > in current design, but with random selection, the chance to be > selected > > > is > > > > more fair. > > > > > > > > However, I think that's kind of out of scope with this KIP. This is > an > > > > existing issue, and it might need further discussion to decide if > this > > > > change is necessary. > > > > > > > > I agree the default 32KB for "batch.max.size" might be not huge > > > improvement > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for > > > > "batch.max.size", and make the documentation clear that if the > > > > "batch.max.size" > > > > is increased, there might be chances that the "ready" partitions need > > to > > > > wait for next request to send to broker, because of the > > > "max.request.size" > > > > (default 1MB) limitation. "max.request.size" can also be considered > to > > > > increase to avoid this issue. What do you think? > > > > > > > > Thank you. > > > > Luke > > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits > > > > wrote: > > > > > > > > > > maybe I can firstly decrease the "batch.max.size" to 32KB > > > > > > > > > > I think 32KB is too small. With 5 in-flight and 100ms latency we > can > > > > > produce 1.6MB/s per partition. With 256KB we can produce 12.8MB/s > > per > > > > > partition. We should probably set up some testing and see if 256KB > > has > > > > > problems. > > > > > > > > > > To illustrate latency dynamics, let's consider a simplified model: > 1 > > > > > in-flight request per broker, produce latency 125ms, 256KB max > > request > > > > > size, 16 partitions assigned to the same broker, every second 128KB > > is > > > > > produced to each partition (total production rate is 2MB/sec). > > > > > > > > > > If the batch size is 16KB, then the pattern would be the following: > > > > > > > > > > 0ms - produce 128KB into each partition > > > > > 0
[jira] [Created] (KAFKA-13524) IQv2: Add option to query from caches
John Roesler created KAFKA-13524: Summary: IQv2: Add option to query from caches Key: KAFKA-13524 URL: https://issues.apache.org/jira/browse/KAFKA-13524 Project: Kafka Issue Type: Sub-task Reporter: John Roesler -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13525) IQv2: Implement KeyQuery from the KIP
John Roesler created KAFKA-13525: Summary: IQv2: Implement KeyQuery from the KIP Key: KAFKA-13525 URL: https://issues.apache.org/jira/browse/KAFKA-13525 Project: Kafka Issue Type: Sub-task Reporter: John Roesler -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
John Roesler created KAFKA-13526: Summary: IQv2: Consider more generic logic for mapping between binary and typed queries Key: KAFKA-13526 URL: https://issues.apache.org/jira/browse/KAFKA-13526 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Right now, typed queries (like KeyQuery) need to be specially handled and translated to their binary counterparts (like RawKeyQuery). This happens in the Metered store layers, where the serdes are known. It is necessary because lower store layers are only able to handle binary data (because they don't know the serdes). This situation is not ideal, since the Metered store layers will grow to host quite a bit of query handling and translation logic, because the relationship between typed queries and binary counterparts is not obvious, and because we can only automatically translate known query types. User-supplied queries and stores will have to work things out using their a-priori knowledge of the serdes. One suggestion (from [~mjsax] ) is to come up with some kind of generic "query mapping" API, which the Metered stores would use to translate back and forth between typed and raw keys and values. Users would be able to supply their own mappings along with their custom queries. Another option would be to have the Metered stores attach the serdes to the query on the way down and then to the result on the way up. Then, the serdes would be available in the bytes store (as part of the request) and to the users when they get their results (as part of the response). Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] KIP-782: Expandable batch size in producer
Hi Jun, 11. That was my initial thinking as well, but in a discussion some people pointed out the change of behavior in some scenarios. E.g. if someone for some reason really wants batches to be at least 16KB and sets large linger.ms, and most of the time the batches are filled quickly enough and they observe a certain latency. Then they upgrade their client with a default size 256KB and the latency increases. This could be seen as a regression. It could be fixed by just reducing linger.ms to specify the expected latency, but still could be seen as a disruption by some users. The other reason to have 2 sizes is to avoid allocating large buffers upfront. -Artem On Wed, Dec 8, 2021 at 3:07 PM Jun Rao wrote: > Hi, Artem, > > Thanks for the reply. > > 11. Got it. To me, batch.size is really used for throughput and not for > latency guarantees. There is no guarantee when 16KB will be accumulated. > So, if users want any latency guarantee, they will need to specify > linger.ms accordingly. > Then, batch.size can just be used to tune for throughput. > > 20. Could we also describe the unit of compression? Is > it batch.initial.size, batch.size or batch.max.size? > > Thanks, > > Jun > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits > wrote: > > > Hi Jun, > > > > 10. My understanding is that MemoryRecords would under the covers be > > allocated in chunks, so logically it still would be one MemoryRecords > > object, it's just instead of allocating one large chunk upfront, smaller > > chunks are allocated as needed to grow the batch and linked into a list. > > > > 11. The reason for 2 sizes is to avoid change of behavior when triggering > > batch send with large linger.ms. Currently, a batch send is triggered > > once > > the batch reaches 16KB by default, if we just raise the default to 256KB, > > then the batch send will be delayed. Using a separate value would allow > > keeping the current behavior when sending the batch out, but provide > better > > throughput with high latency + high bandwidth channels. > > > > -Artem > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao wrote: > > > > > Hi, Luke, > > > > > > Thanks for the KIP. A few comments below. > > > > > > 10. Accumulating small batches could improve memory usage. Will that > > > introduce extra copying when generating a produce request? Currently, a > > > produce request takes a single MemoryRecords per partition. > > > 11. Do we need to introduce a new config batch.max.size? Could we just > > > increase the default of batch.size? We probably need to have KIP-794 > > > < > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > > > > > resolved > > > before increasing the default batch size since the larger the batch > size, > > > the worse the problem in KIP-794. > > > 12. As for max.request.size, currently it's used for both the max > record > > > size and the max request size, which is unintuitive. Perhaps we could > > > introduce a new config max.record.size that defaults to 1MB. We could > > then > > > increase max.request.size to sth like 10MB. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits > > > wrote: > > > > > > > Hi Luke, > > > > > > > > I don't mind increasing the max.request.size to a higher number, e.g. > > 2MB > > > > could be good. I think we should also run some benchmarks to see the > > > > effects of different sizes. > > > > > > > > I agree that changing round robin to random solves an independent > > > existing > > > > issue, however the logic in this KIP exacerbates the issue, so there > is > > > > some dependency. > > > > > > > > -Artem > > > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen > wrote: > > > > > > > > > Hi Artem, > > > > > Yes, I agree if we go with random selection instead of round-robin > > > > > selection, the latency issue will be more fair. That is, if there > are > > > 10 > > > > > partitions, the 10th partition will always be the last choice in > each > > > > round > > > > > in current design, but with random selection, the chance to be > > selected > > > > is > > > > > more fair. > > > > > > > > > > However, I think that's kind of out of scope with this KIP. This is > > an > > > > > existing issue, and it might need further discussion to decide if > > this > > > > > change is necessary. > > > > > > > > > > I agree the default 32KB for "batch.max.size" might be not huge > > > > improvement > > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for > > > > > "batch.max.size", and make the documentation clear that if the > > > > > "batch.max.size" > > > > > is increased, there might be chances that the "ready" partitions > need > > > to > > > > > wait for next request to send to broker, because of the > > > > "max.request.size" > > > > > (default 1MB) limitation. "max.request.size" can also be considered > > to > > > > > increase to avoid this issue. What do you think? > > > > > >
Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2
Thanks for the details! I also chatted with John about it, and he filed https://issues.apache.org/jira/browse/KAFKA-13526 to incorporate some feedback as follow up work. IMHO, the hard coded query translation is not ideal and should be plugable. But for a v1 of IQv2 (pun intended) the hardcoded translation seems to be good enough. -Matthias On 12/8/21 9:37 AM, Vasiliki Papavasileiou wrote: Hey Matthias, Thank you for looking into the KIP! We are adding raw versions of typed queries, like `RawRangeQuery` because it simplifies internal query handling since the bytes stores only support raw queries. A typed RangeQuery is handled by the `MeteredStore` which creates a new `RawRangeQuery` to pass down to the wrapped stores. When it gets the result back, it deserializes the data and creates a typed query result to return to the user. So, the store's key serde are used to translate typed `RangeQueries` into `RawRangeQueries` and it's value serde are used to translate the result of the query on the way back. This allows users to provide their own queries even if the MeteredStore has no knowledge of them. I hope this answers your question. Let me know if you have any other questions. Best, Vicky On Tue, Dec 7, 2021 at 12:46 AM Matthias J. Sax wrote: Thanks for the KIP. Overall, make sense. One question: What is the purpose to `RawRangeQuery`? Seems not very user friendly. -Matthias On 11/30/21 12:48 PM, Vasiliki Papavasileiou wrote: Thank you John! Yes, that was a typo from copying and I fixed it. Since there have been no more comments, I will start the vote. Best, Vicky On Tue, Nov 30, 2021 at 5:22 AM John Roesler wrote: Thanks for the KIP, Vicky! This KIP will help fill in the parity gap between IQ and IQv2. One thing I noticed, which looks like just a typo is that the value type of the proposed RangeQuery should probably be KeyValueIterator, right? Otherwise, it looks good to me! Thanks, -John On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou wrote: Hello everyone, I would like to start the discussion for KIP-805: Add range and scan query support in IQ v2 The KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2 Any suggestions are more than welcome. Many thanks, Vicky
Re: [VOTE] KIP-805: Add range and scan query support in IQ v2
Thanks for the KIP. +1 (binding) On 12/5/21 7:03 PM, Luke Chen wrote: Hi Vasiliki, Thanks for the KIP! It makes sense to have the range and scan query in IQv2, as in IQv1. +1 (non-binding) Thank you. Luke On Thu, Dec 2, 2021 at 5:41 AM John Roesler wrote: Thanks for the KIP, Vicky! I’m +1 (binding) -John On Tue, Nov 30, 2021, at 14:51, Vasiliki Papavasileiou wrote: Hello everyone, I would like to start a vote for KIP-805 that adds range and scan KeyValue queries in IQ2. The KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2 Cheers! Vicky
[jira] [Resolved] (KAFKA-12980) Allow consumers to return from poll when position advances due to aborted transactions
[ https://issues.apache.org/jira/browse/KAFKA-12980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12980. - Fix Version/s: 3.2.0 Resolution: Fixed > Allow consumers to return from poll when position advances due to aborted > transactions > -- > > Key: KAFKA-12980 > URL: https://issues.apache.org/jira/browse/KAFKA-12980 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.2.0 > > > When {{Consumer::poll}} is invoked on a topic with an open transaction, and > then that transaction is aborted, {{poll}} does not return unless there are > other records available in that topic after the aborted transaction. > Instead, {{poll}} could return in this case, even when no records are > available. > This facilitates reads to the end of a topic where the end offsets of a topic > are listed and then a consumer for that topic is polled until its > [position|https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)] > is at or beyond each of those offsets (for example, [Connect does > this|https://github.com/apache/kafka/blob/fce771579c3e20f20949c4c7e0a5e3a16c57c7f0/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L322-L345] > when reading to the end of any of its internal topics). > We could update the existing language in the docs for {{Consumer::poll}} from > {quote}This method returns immediately if there are records available. > {quote} > to > {quote}This method returns immediately if there are records available or if > the position advances past control records. > {quote} > > A workaround for existing users who would like to see this is to use short > poll intervals and manually check the consumer's position in between each > poll, but this is fairly tedious and may lead to excess CPU and network > utilization depending on the latency requirements for knowing when the end of > the topic has been reached. -- This message was sent by Atlassian Jira (v8.20.1#820001)