[jira] [Created] (KAFKA-13518) Update gson and netty-codec in 3.0.0

2021-12-08 Thread Pavel Kuznetsov (Jira)
Pavel Kuznetsov created KAFKA-13518:
---

 Summary: Update gson and netty-codec in 3.0.0
 Key: KAFKA-13518
 URL: https://issues.apache.org/jira/browse/KAFKA-13518
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.0.0
Reporter: Pavel Kuznetsov


*Describe the bug*
I checked kafka_2.13-3.0.0.tgz distribution with WhiteSource and find out that 
some libraries have vulnerabilities.
Here they are:
* gson-2.8.6.jar has WS-2021-0419 vulnerability. The way to fix it is to 
upgrade to com.google.code.gson:gson:2.8.9
* netty-codec-4.1.65.Final.jar has CVE-2021-37136 and CVE-2021-37137 
vulnerabilities. The way to fix it is to upgrade to 
io.netty:netty-codec:4.1.68.Final

*To Reproduce*
Download kafka_2.13-3.0.0.tgz and find jars, listed above.
Check that these jars with corresponding versions are mentioned in 
corresponding vulnerability description.

*Expected behavior*

* gson upgraded to 2.8.9 or higher
* netty-codec upgraded to 4.1.68.Final or higher

*Actual behaviour*

* gson is 2.8.6
* netty-codec is 4.1.65.Final



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13519) Same JAAS configuration used for all producers

2021-12-08 Thread Sergey Lemekhov (Jira)
Sergey Lemekhov created KAFKA-13519:
---

 Summary: Same JAAS configuration used for all producers
 Key: KAFKA-13519
 URL: https://issues.apache.org/jira/browse/KAFKA-13519
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.0.0
Reporter: Sergey Lemekhov
 Attachments: AzureAuthCallbackHandler.java, OAuthBearerTokenImpl.java

h3. Problem

Sending messages to more than one Kafka cluster is impossible when using 
instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from 
{{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication 
configured.
Only one {{org.apache.kafka.common.security.authenticator.LoginManager}} is 
created for all of the clusters 
({{{}org.apache.kafka.common.security.authenticator.LoginManager#DYNAMIC_INSTANCES{}}}
 map contains only one entry).
h3. How to reproduce

Create two {{KafkaProducer}} instances with the following configuration 
(producers should use different kafka clusters and have different 
{{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting):
{code:java}
Properties properties = new Properties();
properties.put("security.protocol", "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, 
AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my 
case it is Azure Event Hubs with Kafka API support
properties.put(SaslConfigs.SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
required;");

//here custom configuration is set for callback handler settings
properties.put(AzureAuthCallbackHandler.AUTHORITY_CONFIG, 
"https://login.microsoftonline.com/"; + tenantId); //azure tenant id
properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 
2.0 app id
properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure 
oauth 2.0 app secret
{code}
Here {{AzureAuthCallbackHandler}} is a custom class which takes care of 
acquiring tokens from Azure. It is configured to fetch tokens from the same 
host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class 
instance should be different for each created {{{}KafkaProducer{}}}. However it 
is created only once by the client library (for the only {{{}LoginManager{}}}) 
and used for all producers.

When using both producers for sending messages this leads to a lot of:
{code:java}
[Producer clientId=my-client-id] Error while fetching metadata with correlation 
id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
{code}
and finally to:
{code:java}
org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in 
metadata after 6 ms.
{code}
The second producer tries to fetch metadata from the cluster configured for the 
first producer and can't find target topic there.
h3. Workaround

Add a unique jaas config option for each Kafka cluster:
{code:java}
String options = "cluster=" + clusterName; //clusterName should be unique for 
each created KafkaProducer instance
properties.put(SaslConfigs.SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " 
+ options + ";");
{code}
LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so 
adding a meaningless option to the string makes client library create different 
{{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears.
This is done in 
{{org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager}}
 method: a {{LoginMetadata}} instance is created and configured with 
{{SASL_JAAS_CONFIG}} value and used as the key in 
{{LoginManager.DYNAMIC_INSTANCES}} map.
h3. Suggested solution

Each {{KafkaProducer}} instance should have individual isolated authentication 
handling objects linked to it regardless of their similarities in 
configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be 
instantiated for each producer individually (since its 
{{org.apache.kafka.common.security.auth.AuthenticateCallbackHandler#configure}} 
method is invoked with producer's configuration which could be different from 
one producer to another).

h3. Additional details
I've attached callback handler and token implementation for reference.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13520) Quickstart does not work at topic creation step

2021-12-08 Thread Robin Moffatt (Jira)
Robin Moffatt created KAFKA-13520:
-

 Summary: Quickstart does not work at topic creation step
 Key: KAFKA-13520
 URL: https://issues.apache.org/jira/browse/KAFKA-13520
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Robin Moffatt


Step 3 fails

 
{code:java}
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 
localhost:9092

Missing required argument "[partitions]" {code}
Also needs `replication-factor`

 

Correct statement is: 
{code:java}
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 1 
--replication-factor 1 --topic quickstart-events {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka-site] bbejeck merged pull request #387: Fixes KAFKA-13520

2021-12-08 Thread GitBox


bbejeck merged pull request #387:
URL: https://github.com/apache/kafka-site/pull/387


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka-site] bbejeck commented on pull request #387: Fixes KAFKA-13520

2021-12-08 Thread GitBox


bbejeck commented on pull request #387:
URL: https://github.com/apache/kafka-site/pull/387#issuecomment-988878505


   thanks for the fix @rmoff!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13521) Supress changelog schema version breaks migration

2021-12-08 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-13521:


 Summary: Supress changelog schema version breaks migration
 Key: KAFKA-13521
 URL: https://issues.apache.org/jira/browse/KAFKA-13521
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.5.1, 2.5.0, 2.4.0
Reporter: Hector Geraldino


Hi,

We recently updated the kafka-streams library in one of our apps from v2.5.0 to 
v2.5.1. This upgrade changes the header format of the state store for suppress 
changelog topics (see https://issues.apache.org/jira/browse/KAFKA-10173 and 
[https://github.com/apache/kafka/pull/8905)]

What we noticed was that, introducing a new version on the binary schema header 
breaks older clients. I.e. applications running on v2.5.1 can parse the v3, v2, 
v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, previous 
versions) cannot read headers in v3 format.

The logged exception is:

 
{code:java}
java.lang.IllegalArgumentException: Restoring apparently invalid changelog 
record: ConsumerRecord(topic = 
msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, 
partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = 
1638828473341, serialized key size = 32, serialized value size = 90, headers = 
RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = 
false), key = [B@5cf0e540, value = [B@40abc004)
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
 ~[msgequator-1.59.3.jar:1.59.3]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 ~[msgequator-1.59.3.jar:1.59.3] {code}
 

 

There's obviously no clear solution for this other than stopping/starting all 
instances at once. A rolling bounce that takes some time to complete (in our 
case, days) will break  instances that haven't been upgraded yet after a 
rebalance that causes older clients to pick up the newly encoded changelog 
partition(s)

 

I don't know if adding a flag on the client side that lists the supported 
protocol versions (so it behaves like Kafka Consumers when picking the 
rebalance protocol - cooperative or eager), or if it just needs to be 
explicitly stated on the migration guide that a full stop/start migration is 
required in cases where the protocol version changes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2

2021-12-08 Thread Vasiliki Papavasileiou
Hey Matthias,

Thank you for looking into the KIP!

We are adding raw versions of typed queries, like `RawRangeQuery` because
it simplifies internal query handling since the bytes stores only support
raw queries. A typed RangeQuery is handled by the `MeteredStore` which
creates a new `RawRangeQuery` to pass down to the wrapped stores. When it
gets the result back, it deserializes the data and creates a typed query
result to return to the user. So, the store's key serde are used to
translate typed `RangeQueries` into `RawRangeQueries` and it's value serde
are used to translate the result of the query on the way back. This allows
users to provide their own queries even if the MeteredStore has no
knowledge of them.

I hope this answers your question. Let me know if you have any other
questions.

Best,
Vicky


On Tue, Dec 7, 2021 at 12:46 AM Matthias J. Sax  wrote:

> Thanks for the KIP. Overall, make sense.
>
> One question: What is the purpose to `RawRangeQuery`? Seems not very
> user friendly.
>
> -Matthias
>
>
> On 11/30/21 12:48 PM, Vasiliki Papavasileiou wrote:
> > Thank you John! Yes, that was a typo from copying and I fixed it.
> >
> > Since there have been no more comments, I will start the vote.
> >
> > Best,
> > Vicky
> >
> > On Tue, Nov 30, 2021 at 5:22 AM John Roesler 
> wrote:
> >
> >> Thanks for the KIP, Vicky!
> >>
> >> This KIP will help fill in the parity gap between IQ and
> >> IQv2.
> >>
> >> One thing I noticed, which looks like just a typo is that
> >> the value type of the proposed RangeQuery should probably be
> >> KeyValueIterator, right?
> >>
> >> Otherwise, it looks good to me!
> >>
> >> Thanks,
> >> -John
> >>
> >> On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou
> >> wrote:
> >>> Hello everyone,
> >>>
> >>> I would like to start the discussion for KIP-805: Add range and scan
> >> query
> >>> support in IQ v2
> >>>
> >>> The KIP can be found here:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2
> >>>
> >>> Any suggestions are more than welcome.
> >>>
> >>> Many thanks,
> >>> Vicky
> >>
> >>
> >
>


Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-12-08 Thread Artem Livshits
Hi Jun,

10. My understanding is that MemoryRecords would under the covers be
allocated in chunks, so logically it still would be one MemoryRecords
object, it's just instead of allocating one large chunk upfront, smaller
chunks are allocated as needed to grow the batch and linked into a list.

11. The reason for 2 sizes is to avoid change of behavior when triggering
batch send with large linger.ms.  Currently, a batch send is triggered once
the batch reaches 16KB by default, if we just raise the default to 256KB,
then the batch send will be delayed.  Using a separate value would allow
keeping the current behavior when sending the batch out, but provide better
throughput with high latency + high bandwidth channels.

-Artem

On Tue, Dec 7, 2021 at 5:29 PM Jun Rao  wrote:

> Hi, Luke,
>
> Thanks for the KIP.  A few comments below.
>
> 10. Accumulating small batches could improve memory usage. Will that
> introduce extra copying when generating a produce request? Currently, a
> produce request takes a single MemoryRecords per partition.
> 11. Do we need to introduce a new config batch.max.size? Could we just
> increase the default of batch.size? We probably need to have KIP-794
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> >
> resolved
> before increasing the default batch size since the larger the batch size,
> the worse the problem in KIP-794.
> 12. As for max.request.size, currently it's used for both the max record
> size and the max request size, which is unintuitive. Perhaps we could
> introduce a new config max.record.size that defaults to 1MB. We could then
> increase max.request.size to sth like 10MB.
>
> Thanks,
>
> Jun
>
>
> On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
>  wrote:
>
> > Hi Luke,
> >
> > I don't mind increasing the max.request.size to a higher number, e.g. 2MB
> > could be good.  I think we should also run some benchmarks to see the
> > effects of different sizes.
> >
> > I agree that changing round robin to random solves an independent
> existing
> > issue, however the logic in this KIP exacerbates the issue, so there is
> > some dependency.
> >
> > -Artem
> >
> > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen  wrote:
> >
> > > Hi Artem,
> > > Yes, I agree if we go with random selection instead of round-robin
> > > selection, the latency issue will be more fair. That is, if there are
> 10
> > > partitions, the 10th partition will always be the last choice in each
> > round
> > > in current design, but with random selection, the chance to be selected
> > is
> > > more fair.
> > >
> > > However, I think that's kind of out of scope with this KIP. This is an
> > > existing issue, and it might need further discussion to decide if this
> > > change is necessary.
> > >
> > > I agree the default 32KB for "batch.max.size" might be not huge
> > improvement
> > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > "batch.max.size", and make the documentation clear that if the
> > > "batch.max.size"
> > > is increased, there might be chances that the "ready" partitions need
> to
> > > wait for next request to send to broker, because of the
> > "max.request.size"
> > > (default 1MB) limitation. "max.request.size" can also be considered to
> > > increase to avoid this issue. What do you think?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > >  wrote:
> > >
> > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > >
> > > > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s
> per
> > > > partition.  We should probably set up some testing and see if 256KB
> has
> > > > problems.
> > > >
> > > > To illustrate latency dynamics, let's consider a simplified model: 1
> > > > in-flight request per broker, produce latency 125ms, 256KB max
> request
> > > > size, 16 partitions assigned to the same broker, every second 128KB
> is
> > > > produced to each partition (total production rate is 2MB/sec).
> > > >
> > > > If the batch size is 16KB, then the pattern would be the following:
> > > >
> > > > 0ms - produce 128KB into each partition
> > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > 125ms - complete first 16KB from each partition, send next 16KB
> > > > 250ms - complete second 16KB, send next 16KB
> > > > ...
> > > > 1000ms - complete 8th 16KB from each partition
> > > >
> > > > from this model it's easy to see that there are 256KB that are sent
> > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > > 875ms.
> > > >
> > > > If the batch size is 256KB, then the pattern would be the following:
> > > >
> > > > 0ms - produce 128KB into each partition
> > > > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > > > 125ms - complete 2 first partitions, send data from next 2 partitions
> > > > ...
> > > > 1000ms - 

Re: [VOTE] KIP-792: Add "generation" field into consumer protocol

2021-12-08 Thread Guozhang Wang
Hi Luke,

Thanks for the KIP.

One thing I'd like to double check is that, since the
ConsumerProtocolSubscription is not auto generated from the json file, we
need to make sure the old-versioned leader would be able to ignore the new
field during an upgrade e.g. without crashing. Other than that, the KIP
lgtm.

Guozhang

On Tue, Dec 7, 2021 at 6:16 PM Luke Chen  wrote:

> Hi Colin,
>
> I'm not quite sure if I understand your thoughts correctly.
> If I was wrong, please let me know.
>
> Also, I'm not quite sure how I could lock this feature to a new IBP
> version.
> I saw "KIP-584: Versioning scheme for features" is still under development.
> Not sure if I need to lock the IBP version, how should I do?
>
> Thank you.
> Luke
>
> On Tue, Dec 7, 2021 at 9:41 PM Luke Chen  wrote:
>
> > Hi Colin,
> >
> > Thanks for your comments. I've updated the KIP to mention about the KIP
> > won't affect current broker side behavior.
> >
> > > One scenario that we need to consider is what happens during a rolling
> > upgrade. If the coordinator moves back and forth between brokers with
> > different IBPs, it seems that the same epoch numbers could be reused for
> a
> > group, if things are done in the obvious manner (old IBP = don't read or
> > write epoch, new IBP = do)
> >
> > I think this KIP doesn't care about the group epoch number at all. The
> > subscription metadata is passed from each member to group coordinator,
> and
> > then the group coordinator pass all of them back to the consumer lead. So
> > even if the epoch number is reused in a group, it should be fine. On the
> > other hand, the group coordinator will have no idea if the join group
> > request sent from consumer containing the new subscription "generation"
> > field or not, because group coordinator won't deserialize the metadata.
> >
> > I've added also added them into the KIP.
> >
> > Thank you.
> > Luke
> >
> > On Mon, Dec 6, 2021 at 10:39 AM Colin McCabe  wrote:
> >
> >> Hi Luke,
> >>
> >> Thanks for the explanation.
> >>
> >> I don't see any description of how the broker decides to use the new
> >> version of ConsumerProtocolSubscription or not. This probably needs to
> be
> >> locked to a new IBP version.
> >>
> >> One scenario that we need to consider is what happens during a rolling
> >> upgrade. If the coordinator moves back and forth between brokers with
> >> different IBPs, it seems that the same epoch numbers could be reused
> for a
> >> group, if things are done in the obvious manner (old IBP = don't read or
> >> write epoch, new IBP = do).
> >>
> >> best,
> >> Colin
> >>
> >>
> >> On Fri, Dec 3, 2021, at 18:46, Luke Chen wrote:
> >> > Hi Colin,
> >> > Thanks for your comment.
> >> >
> >> >> How are we going to avoid the situation where the broker restarts,
> and
> >> > the same generation number is reused?
> >> >
> >> > Actually, this KIP doesn't have anything to do with the brokers. The
> >> > "generation" field I added, is in the subscription metadata, which
> will
> >> not
> >> > be deserialized by brokers. The metadata is only deserialized by
> >> consumer
> >> > lead. And for the consumer lead, the only thing the lead cared about,
> is
> >> > the highest generation of the ownedPartitions among all the consumers.
> >> With
> >> > the highest generation of the ownedPartitions, the consumer lead can
> >> > distribute the partitions as sticky as possible, and most importantly,
> >> > without errors.
> >> >
> >> > That is, after this KIP, if the broker restarts, and the same
> generation
> >> > number is reused, it won't break current rebalance behavior. But it'll
> >> help
> >> > the consumer lead do the sticky assignments correctly.
> >> >
> >> > Thank you.
> >> > Luke
> >> >
> >> > On Fri, Dec 3, 2021 at 6:30 AM Colin McCabe 
> wrote:
> >> >
> >> >> How are we going to avoid the situation where the broker restarts,
> and
> >> the
> >> >> same generation number is reused?
> >> >>
> >> >> best,
> >> >> Colin
> >> >>
> >> >> On Tue, Nov 30, 2021, at 16:36, Luke Chen wrote:
> >> >> > Hi all,
> >> >> >
> >> >> > I'd like to start the vote for KIP-792: Add "generation" field into
> >> >> > consumer protocol.
> >> >> >
> >> >> > The goal of this KIP is to allow the assignor/consumer coordinator
> to
> >> >> have
> >> >> > a way to identify the out-of-date members/assignments, to avoid
> >> rebalance
> >> >> > stuck issues in current protocol.
> >> >> >
> >> >> > Detailed description can be found here:
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614
> >> >> >
> >> >> > Any feedback is welcome.
> >> >> >
> >> >> > Thank you.
> >> >> > Luke
> >> >>
> >>
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-12-08 Thread Guozhang Wang
Thanks for the clarification, it looks good to me now.

On Wed, Nov 17, 2021 at 9:21 PM John Roesler  wrote:

> Ah, sorry, Guozhang,
>
> It seem I was a bit too eager with starting the vote thread.
>
> 13: I think that makes perfect sense. I've updated the KIP.
>
> 14: Oof, I can't believe I overlooked those newer
> exceptions. Some of them will become exceptions in IQv2,
> whereas others will beceome individual partition QueryResult
> failures. Here is an accounting of what will become of those
> proposed exceptions:
>
> * StreamsNotStartedException: thrown when stream thread
> state is CREATED, the user can retry until to RUNNING.
>
> * StreamsRebalancingException: thrown when stream thread is
> not running and stream state is REBALANCING. This exception
> is no longer applicable. Regardless of the rebalanceing
> state of the store's task, the state will either be up to
> the requested bound or not.
>
> * StateStoreMigratedException: thrown when state store
> already closed and stream state is RUNNING. This is a per-
> partition failure, so it now maps to the
> FailureReason.NOT_PRESENT failure.
>
>
> * StateStoreNotAvailableException: thrown when state store
> closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING /
> ERROR. I (subjectively) felt the name was ambiguous with
> respect to the prior condition in which a store partition is
> not locally available. This is replaced with the thrown
> exception, StreamsStoppedException (the JavaDoc states the
> that it is thrown when Streams is in any terminal state).
>
> * UnknownStateStoreException: thrown when passing an unknown
> state store. This is still a thown exception.
>
> * InvalidStateStorePartitionException: thrown when user
> requested partition is not available on the stream instance.
> If the partition actually does exist, then we will now
> return a per-partition FailureReason.NOT_PRESENT. If the
> requested partition is actually not present in the topology
> at all, then we will return the per-partition
> FailureReason.DOES_NOT_EXIST.
>
> Sorry for the oversight. The KIP has been updated.
>
> Thanks,
> -John
>
> On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote:
> > Thanks John.
> >
> > I made another pass on the KIP and overall it already looks pretty good.
> I
> > just have a couple more minor comments:
> >
> > 13: What do you think about just removing the following function in
> > QueryResult
> >
> >   // returns a failed query result because caller requested a "latest"
> > bound, but the task was
> >   // not active and running.
> >   public static  QueryResult notActive(String currentState);
> >
> > Instead just use `notUpToBound` for the case when `latest` bound is
> > requested but the node is not the active replica. My main motivation is
> > trying to abstract away the notion of active/standby from the public APIs
> > itself, and hence capturing both this case as well as just a
> > normal "position bound not achieved" in the same return signal, until
> later
> > when we think it is indeed needed to separate them with different
> returns.
> >
> > 14: Regarding the possible exceptions being thrown from `query`, it seems
> > more exception types are possible from KIP-216:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> ,
> > should we include all in the javadocs?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Nov 17, 2021 at 3:25 PM John Roesler 
> wrote:
> >
> > > Thanks for the reply, Guozhang!
> > >
> > > I have updated the KIP to tie up the remaining points that
> > > we have discussed. I really appreciate your time in refining
> > > the proposal. I included a quick summary of the final state
> > > of our discussion points below.
> > >
> > > Since it seems like this discussion thread is pretty
> > > convergent, I'll go ahead and start the voting thread soon.
> > >
> > > Thanks again!
> > > -John
> > >
> > > P.S.: the final state of our discussion points:
> > >
> > > 1. I removed serdesForStore from the proposal (and moved it
> > > to Rejected Alternatives)
> > >
> > > 2. Thanks for that reference. I had overlooked that
> > > implementation. I'd note that the ListValuesStore is
> > > currently only used in the KStream API, which doesn't
> > > support queries at all. Due to its interface, it could
> > > theoretically be used to materialize a KTable, though it has
> > > no supplier provided in the typical Stores factory class.
> > >
> > > Regardless, I think that it would still be a similar story
> > > to the Segmented store. The ListValues store would simply
> > > choose to terminate the query on its own and not delegate to
> > > any of the wrapped KeyValue stores. It wouldn't matter that
> > > the wrapped stores have a query-handling facility of their
> > > own, if the wrapping store doesn't choose to delegate, the
> > > wrapped store will not try to execute any queries.
> > >
> > > Specifically regarding the key transformation that these
> > > "forma

Re: [DISCUSS] KIP-801: Implement an Authorizer that stores metadata in __cluster_metadata

2021-12-08 Thread José Armando García Sancio
Hi Colin,

Thanks for the KIP.

1. Can you talk about how the set of ACLs needed to authorize
controllers and brokers will get bootstrapped? I am particularly
interested in how we are going to configure a new cluster so that the
controllers nodes can authorize each other's requests to establish
quorum. After a quorum is established, I am interested in how the user
would make sure that new brokers will get authorize against the
controllers for requests like "register broker" and "fetch".

thanks,
-Jose


[jira] [Created] (KAFKA-13522) IQv2: Implement position tracking and bounding in API

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13522:


 Summary: IQv2: Implement position tracking and bounding in API
 Key: KAFKA-13522
 URL: https://issues.apache.org/jira/browse/KAFKA-13522
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13491) Implement IQv2 Framework

2021-12-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13491.
--
Resolution: Fixed

> Implement IQv2 Framework
> 
>
> Key: KAFKA-13491
> URL: https://issues.apache.org/jira/browse/KAFKA-13491
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> See https://cwiki.apache.org/confluence/x/85OqCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13506) IQv2: Transmit position to standbys

2021-12-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13506.
--
Resolution: Fixed

> IQv2: Transmit position to standbys
> ---
>
> Key: KAFKA-13506
> URL: https://issues.apache.org/jira/browse/KAFKA-13506
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13523) Implement IQv2 support in global stores

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13523:


 Summary: Implement IQv2 support in global stores
 Key: KAFKA-13523
 URL: https://issues.apache.org/jira/browse/KAFKA-13523
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Global stores pose one significant problem for IQv2: when they start up, they 
skip the regular ingest pipeline and instead use the "restoration" pipeline to 
read up until the current end offset. Then, they switch over to the regular 
ingest pipeline.

IQv2 position tracking expects to track the position of each record from the 
input topic through the ingest pipeline and then get the position headers 
through the restoration pipeline via the changelog topic. The fact that global 
stores "restore" the input topic instead of ingesting it violates our 
expectations.

It has also caused other problems, so we may want to consider switching the 
global store processing to use the normal paradigm rather than adding 
special-case logic to track positions in global stores.

 

Note: there are two primary reasons that global stores behave this way:
 # We can write in batches during restoration, so the I/O may be more efficient
 # The global thread does not transition to RUNNING state until it reaches the 
(current) end of the input topic, which blocks other threads from joining 
against it, thereby improving the time synchronization of global KTable joins.

If we want to propose changing the bootstrapping pipeline for global threads, 
we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-12-08 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

11. Got it. To me, batch.size is really used for throughput and not for
latency guarantees. There is no guarantee when 16KB will be accumulated.
So, if users want any latency guarantee, they will need to specify
linger.ms accordingly.
Then, batch.size can just be used to tune for throughput.

20. Could we also describe the unit of compression? Is
it batch.initial.size, batch.size or batch.max.size?

Thanks,

Jun

On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
 wrote:

> Hi Jun,
>
> 10. My understanding is that MemoryRecords would under the covers be
> allocated in chunks, so logically it still would be one MemoryRecords
> object, it's just instead of allocating one large chunk upfront, smaller
> chunks are allocated as needed to grow the batch and linked into a list.
>
> 11. The reason for 2 sizes is to avoid change of behavior when triggering
> batch send with large linger.ms.  Currently, a batch send is triggered
> once
> the batch reaches 16KB by default, if we just raise the default to 256KB,
> then the batch send will be delayed.  Using a separate value would allow
> keeping the current behavior when sending the batch out, but provide better
> throughput with high latency + high bandwidth channels.
>
> -Artem
>
> On Tue, Dec 7, 2021 at 5:29 PM Jun Rao  wrote:
>
> > Hi, Luke,
> >
> > Thanks for the KIP.  A few comments below.
> >
> > 10. Accumulating small batches could improve memory usage. Will that
> > introduce extra copying when generating a produce request? Currently, a
> > produce request takes a single MemoryRecords per partition.
> > 11. Do we need to introduce a new config batch.max.size? Could we just
> > increase the default of batch.size? We probably need to have KIP-794
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > >
> > resolved
> > before increasing the default batch size since the larger the batch size,
> > the worse the problem in KIP-794.
> > 12. As for max.request.size, currently it's used for both the max record
> > size and the max request size, which is unintuitive. Perhaps we could
> > introduce a new config max.record.size that defaults to 1MB. We could
> then
> > increase max.request.size to sth like 10MB.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> >  wrote:
> >
> > > Hi Luke,
> > >
> > > I don't mind increasing the max.request.size to a higher number, e.g.
> 2MB
> > > could be good.  I think we should also run some benchmarks to see the
> > > effects of different sizes.
> > >
> > > I agree that changing round robin to random solves an independent
> > existing
> > > issue, however the logic in this KIP exacerbates the issue, so there is
> > > some dependency.
> > >
> > > -Artem
> > >
> > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen  wrote:
> > >
> > > > Hi Artem,
> > > > Yes, I agree if we go with random selection instead of round-robin
> > > > selection, the latency issue will be more fair. That is, if there are
> > 10
> > > > partitions, the 10th partition will always be the last choice in each
> > > round
> > > > in current design, but with random selection, the chance to be
> selected
> > > is
> > > > more fair.
> > > >
> > > > However, I think that's kind of out of scope with this KIP. This is
> an
> > > > existing issue, and it might need further discussion to decide if
> this
> > > > change is necessary.
> > > >
> > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > improvement
> > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > "batch.max.size", and make the documentation clear that if the
> > > > "batch.max.size"
> > > > is increased, there might be chances that the "ready" partitions need
> > to
> > > > wait for next request to send to broker, because of the
> > > "max.request.size"
> > > > (default 1MB) limitation. "max.request.size" can also be considered
> to
> > > > increase to avoid this issue. What do you think?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > >  wrote:
> > > >
> > > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > > >
> > > > > I think 32KB is too small.  With 5 in-flight and 100ms latency we
> can
> > > > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s
> > per
> > > > > partition.  We should probably set up some testing and see if 256KB
> > has
> > > > > problems.
> > > > >
> > > > > To illustrate latency dynamics, let's consider a simplified model:
> 1
> > > > > in-flight request per broker, produce latency 125ms, 256KB max
> > request
> > > > > size, 16 partitions assigned to the same broker, every second 128KB
> > is
> > > > > produced to each partition (total production rate is 2MB/sec).
> > > > >
> > > > > If the batch size is 16KB, then the pattern would be the following:
> > > > >
> > > > > 0ms - produce 128KB into each partition
> > > > > 0

[jira] [Created] (KAFKA-13524) IQv2: Add option to query from caches

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13524:


 Summary: IQv2: Add option to query from caches
 Key: KAFKA-13524
 URL: https://issues.apache.org/jira/browse/KAFKA-13524
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13525) IQv2: Implement KeyQuery from the KIP

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13525:


 Summary: IQv2: Implement KeyQuery from the KIP
 Key: KAFKA-13525
 URL: https://issues.apache.org/jira/browse/KAFKA-13525
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13526:


 Summary: IQv2: Consider more generic logic for mapping between 
binary and typed queries
 Key: KAFKA-13526
 URL: https://issues.apache.org/jira/browse/KAFKA-13526
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Right now, typed queries (like KeyQuery) need to be specially handled and 
translated to their binary counterparts (like RawKeyQuery). This happens in the 
Metered store layers, where the serdes are known. It is necessary because lower 
store layers are only able to handle binary data (because they don't know the 
serdes).

This situation is not ideal, since the Metered store layers will grow to host 
quite a bit of query handling and translation logic, because the relationship 
between typed queries and binary counterparts is not obvious, and because we 
can only automatically translate known query types. User-supplied queries and 
stores will have to work things out using their a-priori knowledge of the 
serdes.

 

One suggestion (from [~mjsax] ) is to come up with some kind of generic "query 
mapping" API, which the Metered stores would use to translate back and forth 
between typed and raw keys and values. Users would be able to supply their own 
mappings along with their custom queries.

Another option would be to have the Metered stores attach the serdes to the 
query on the way down and then to the result on the way up. Then, the serdes 
would be available in the bytes store (as part of the request) and to the users 
when they get their results (as part of the response).

Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-12-08 Thread Artem Livshits
Hi Jun,

11. That was my initial thinking as well, but in a discussion some people
pointed out the change of behavior in some scenarios.  E.g. if someone for
some reason really wants batches to be at least 16KB and sets large
linger.ms, and most of the time the batches are filled quickly enough and
they observe a certain latency.  Then they upgrade their client with a
default size 256KB and the latency increases.  This could be seen as a
regression.  It could be fixed by just reducing linger.ms to specify the
expected latency, but still could be seen as a disruption by some users.
The other reason to have 2 sizes is to avoid allocating large buffers
upfront.

-Artem

On Wed, Dec 8, 2021 at 3:07 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 11. Got it. To me, batch.size is really used for throughput and not for
> latency guarantees. There is no guarantee when 16KB will be accumulated.
> So, if users want any latency guarantee, they will need to specify
> linger.ms accordingly.
> Then, batch.size can just be used to tune for throughput.
>
> 20. Could we also describe the unit of compression? Is
> it batch.initial.size, batch.size or batch.max.size?
>
> Thanks,
>
> Jun
>
> On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > 10. My understanding is that MemoryRecords would under the covers be
> > allocated in chunks, so logically it still would be one MemoryRecords
> > object, it's just instead of allocating one large chunk upfront, smaller
> > chunks are allocated as needed to grow the batch and linked into a list.
> >
> > 11. The reason for 2 sizes is to avoid change of behavior when triggering
> > batch send with large linger.ms.  Currently, a batch send is triggered
> > once
> > the batch reaches 16KB by default, if we just raise the default to 256KB,
> > then the batch send will be delayed.  Using a separate value would allow
> > keeping the current behavior when sending the batch out, but provide
> better
> > throughput with high latency + high bandwidth channels.
> >
> > -Artem
> >
> > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao  wrote:
> >
> > > Hi, Luke,
> > >
> > > Thanks for the KIP.  A few comments below.
> > >
> > > 10. Accumulating small batches could improve memory usage. Will that
> > > introduce extra copying when generating a produce request? Currently, a
> > > produce request takes a single MemoryRecords per partition.
> > > 11. Do we need to introduce a new config batch.max.size? Could we just
> > > increase the default of batch.size? We probably need to have KIP-794
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > >
> > > resolved
> > > before increasing the default batch size since the larger the batch
> size,
> > > the worse the problem in KIP-794.
> > > 12. As for max.request.size, currently it's used for both the max
> record
> > > size and the max request size, which is unintuitive. Perhaps we could
> > > introduce a new config max.record.size that defaults to 1MB. We could
> > then
> > > increase max.request.size to sth like 10MB.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > I don't mind increasing the max.request.size to a higher number, e.g.
> > 2MB
> > > > could be good.  I think we should also run some benchmarks to see the
> > > > effects of different sizes.
> > > >
> > > > I agree that changing round robin to random solves an independent
> > > existing
> > > > issue, however the logic in this KIP exacerbates the issue, so there
> is
> > > > some dependency.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen 
> wrote:
> > > >
> > > > > Hi Artem,
> > > > > Yes, I agree if we go with random selection instead of round-robin
> > > > > selection, the latency issue will be more fair. That is, if there
> are
> > > 10
> > > > > partitions, the 10th partition will always be the last choice in
> each
> > > > round
> > > > > in current design, but with random selection, the chance to be
> > selected
> > > > is
> > > > > more fair.
> > > > >
> > > > > However, I think that's kind of out of scope with this KIP. This is
> > an
> > > > > existing issue, and it might need further discussion to decide if
> > this
> > > > > change is necessary.
> > > > >
> > > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > > improvement
> > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > > "batch.max.size", and make the documentation clear that if the
> > > > > "batch.max.size"
> > > > > is increased, there might be chances that the "ready" partitions
> need
> > > to
> > > > > wait for next request to send to broker, because of the
> > > > "max.request.size"
> > > > > (default 1MB) limitation. "max.request.size" can also be considered
> > to
> > > > > increase to avoid this issue. What do you think?
> > > > >
>

Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2

2021-12-08 Thread Matthias J. Sax

Thanks for the details!

I also chatted with John about it, and he filed 
https://issues.apache.org/jira/browse/KAFKA-13526 to incorporate some 
feedback as follow up work.


IMHO, the hard coded query translation is not ideal and should be 
plugable. But for a v1 of IQv2 (pun intended) the hardcoded translation 
seems to be good enough.



-Matthias

On 12/8/21 9:37 AM, Vasiliki Papavasileiou wrote:

Hey Matthias,

Thank you for looking into the KIP!

We are adding raw versions of typed queries, like `RawRangeQuery` because
it simplifies internal query handling since the bytes stores only support
raw queries. A typed RangeQuery is handled by the `MeteredStore` which
creates a new `RawRangeQuery` to pass down to the wrapped stores. When it
gets the result back, it deserializes the data and creates a typed query
result to return to the user. So, the store's key serde are used to
translate typed `RangeQueries` into `RawRangeQueries` and it's value serde
are used to translate the result of the query on the way back. This allows
users to provide their own queries even if the MeteredStore has no
knowledge of them.

I hope this answers your question. Let me know if you have any other
questions.

Best,
Vicky


On Tue, Dec 7, 2021 at 12:46 AM Matthias J. Sax  wrote:


Thanks for the KIP. Overall, make sense.

One question: What is the purpose to `RawRangeQuery`? Seems not very
user friendly.

-Matthias


On 11/30/21 12:48 PM, Vasiliki Papavasileiou wrote:

Thank you John! Yes, that was a typo from copying and I fixed it.

Since there have been no more comments, I will start the vote.

Best,
Vicky

On Tue, Nov 30, 2021 at 5:22 AM John Roesler 

wrote:



Thanks for the KIP, Vicky!

This KIP will help fill in the parity gap between IQ and
IQv2.

One thing I noticed, which looks like just a typo is that
the value type of the proposed RangeQuery should probably be
KeyValueIterator, right?

Otherwise, it looks good to me!

Thanks,
-John

On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou
wrote:

Hello everyone,

I would like to start the discussion for KIP-805: Add range and scan

query

support in IQ v2

The KIP can be found here:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2


Any suggestions are more than welcome.

Many thanks,
Vicky











Re: [VOTE] KIP-805: Add range and scan query support in IQ v2

2021-12-08 Thread Matthias J. Sax

Thanks for the KIP.

+1 (binding)

On 12/5/21 7:03 PM, Luke Chen wrote:

Hi Vasiliki,

Thanks for the KIP!
It makes sense to have the range and scan query in IQv2, as in IQv1.

+1 (non-binding)

Thank you.
Luke

On Thu, Dec 2, 2021 at 5:41 AM John Roesler  wrote:


Thanks for the KIP, Vicky!

I’m +1 (binding)

-John

On Tue, Nov 30, 2021, at 14:51, Vasiliki Papavasileiou wrote:

Hello everyone,

I would like to start a vote for KIP-805 that adds range and scan

KeyValue

queries in IQ2.

The KIP can be found here:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2


Cheers!
Vicky






[jira] [Resolved] (KAFKA-12980) Allow consumers to return from poll when position advances due to aborted transactions

2021-12-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12980.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

> Allow consumers to return from poll when position advances due to aborted 
> transactions
> --
>
> Key: KAFKA-12980
> URL: https://issues.apache.org/jira/browse/KAFKA-12980
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.2.0
>
>
> When {{Consumer::poll}} is invoked on a topic with an open transaction, and 
> then that transaction is aborted, {{poll}} does not return unless there are 
> other records available in that topic after the aborted transaction.
> Instead, {{poll}} could return in this case, even when no records are 
> available.
> This facilitates reads to the end of a topic where the end offsets of a topic 
> are listed and then a consumer for that topic is polled until its 
> [position|https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)]
>  is at or beyond each of those offsets (for example, [Connect does 
> this|https://github.com/apache/kafka/blob/fce771579c3e20f20949c4c7e0a5e3a16c57c7f0/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L322-L345]
>  when reading to the end of any of its internal topics).
> We could update the existing language in the docs for {{Consumer::poll}} from
> {quote}This method returns immediately if there are records available.
> {quote}
> to
> {quote}This method returns immediately if there are records available or if 
> the position advances past control records.
> {quote}
>  
> A workaround for existing users who would like to see this is to use short 
> poll intervals and manually check the consumer's position in between each 
> poll, but this is fairly tedious and may lead to excess CPU and network 
> utilization depending on the latency requirements for knowing when the end of 
> the topic has been reached.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)