[VOTE] KIP-886 Add Client Producer and Consumer Builders

2022-11-11 Thread Dan S
Hello all,

I think that adding builders for the producer and the consumer in kafka
client would make it much easier for developers to instantiate new
producers and consumers, especially if they are using an IDE with
intellisense, and using the IDE to navigate to the documentation which
could be added to the builder's withXYZ methods.

Please let me know if you have any comments, questions, or suggestions in
the discussion thread, or vote here!

https://cwiki.apache.org/confluence/display/KAFKA/KIP-886
%3A+Add+Client+Producer+and+Consumer+Builders

Thanks,

Dan


[jira] [Created] (KAFKA-14383) CorruptRecordException when reading data from log segment will not cause log offline

2022-11-11 Thread shen (Jira)
shen created KAFKA-14383:


 Summary: CorruptRecordException when reading data from log segment 
will not cause log offline
 Key: KAFKA-14383
 URL: https://issues.apache.org/jira/browse/KAFKA-14383
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.8.1
Reporter: shen


In our production environment, disk break down cause data corruption. When 
consumer and follower read from partition leader, CorruptRecordException is 
thrown:
{code:java}
Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 0 
is less than the minimum record overhead
{code}
Call stack is muck like:
{code:java}
Breakpoint reached
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:62)
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40)
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:286)
at kafka.log.LogSegment.translateOffset(LogSegment.scala:254)
at kafka.log.LogSegment.read(LogSegment.scala:277)
at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1161)
at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1116)
at kafka.log.Log.maybeHandleIOException(Log.scala:1839) <--- 
only cope with IOException
at kafka.log.Log.read(Log.scala:1116)
at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:926)
at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:989)
at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:988)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:988)
at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:815)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:828)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:680)
at kafka.server.KafkaApis.handle(KafkaApis.scala:107)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748)
{code}
 

CorruptRecordException extends RetriableException. When broker reads from local 
log segment, data corruption usually cannot fixed by retry.

I think local file currption should cause log offline, but currently only 
IOException has chance to cause log offline in Log#maybeHandleIOException.

So even if I have 3 replica, consumer will never continue consume once data 
curruption happen in leader.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-885: Expose Broker's Name and Version to Clients Skip to end of metadata

2022-11-11 Thread David Jacot
Hi Travis,

Thanks for the KIP. That seems to be a useful addition. I have a few
concerns/comments:

01: Managed Kafka services do not necessarily run a specific version or may
not want to expose it. I suppose that they could keep an empty string.

02: I am a bit concerned by clients that could misuse these information.
For instance, one may be tempted to rely on the version to decide whether a
feature is enabled or not. The api versions should remain the source of
truth but we cannot enforce it with the proposed approach. That may be
acceptable though.

03: Updating the API does not seem to be enough. How would tools access the
information? Tools rarely use the API directly. They would rather use the
admin client for instance.

Best,
David

Le jeu. 10 nov. 2022 à 19:10, Travis Bischel  a
écrit :

> Hi all,
>
> I've written a KIP to expose the BrokerSoftwareName and
> BrokerSoftwareVersion to clients:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-885%3A+Expose+Broker%27s+Name+and+Version+to+Clients
>
> If we agree this is useful, it would be great to have this in by 3.4.
>
> Thank you,
> - Travis
>


Re: Starting out with Kafka

2022-11-11 Thread John Roesler
Hello Vinay,

One thing I’ve noticed recently is that I have to click the “build” button in 
intellij before I can use the “run” or “debug” buttons. I’m not sure why. 

Welcome to the community!
-John

On Fri, Nov 11, 2022, at 02:47, deng ziming wrote:
> Hello, Vinay
> Kafka uses gradlew as build tool and java/scala as program language,
> You can firstly use `./gradlew unitTest` to build it using terminal, 
> and reload it in gradle window, sometimes I also change default build 
> tool from IDEA to gradle in Preference/Build/build tools/Gradle:
>
> PastedGraphic-1.tiff
>
> --
> Ziming
>
>> On Nov 11, 2022, at 13:30, vinay deshpande  wrote:
>> 
>> Hi All,
>> I have a basic question: I tried importing kafka source code into intellij
>> but there are bunch of imports that IDE cannot find like these:
>> 
>> import kafka.api.ApiVersion;
>> import kafka.log.CleanerConfig;
>> import kafka.log.LogConfig;
>> import kafka.log.LogManager;
>> 
>> 
>> TIA.
>> 
>> Thanks,
>> Vinay


Re: [DISCUSS] KIP-885: Expose Broker's Name and Version to Clients Skip to end of metadata

2022-11-11 Thread Magnus Edenhill
Hi Travis and thanks for the KIP, two comments below:


Den fre 11 nov. 2022 kl 13:37 skrev David Jacot :

> 02: I am a bit concerned by clients that could misuse these information.
> For instance, one may be tempted to rely on the version to decide whether a
> feature is enabled or not. The api versions should remain the source of
> truth but we cannot enforce it with the proposed approach. That may be
> acceptable though.
>

We proposed including this in the original ApiVersionRequest KIP-35 (waaay
back), but it was rejected
for exactly this reason; that it may(will) be misused by clients.



I would also like to question the actual end-user use of this information,
the existing ApiVersionsReq
already provides - on a detailed level - what features and functionality
the broker supports -
this information is richer than a single version string.

The KIP says "End users can quickly check from a client if the cluster is
up to date" and
"Clients can also use the broker version in log lines so that end users can
quickly see
if a version looks about right or if something is seriously broken.":

In my mind that's not typically the end-users role or responsibility, but
that of the Kafka cluster operator,
who'll already know the version being deployed.


Regards,
Magnus



> Le jeu. 10 nov. 2022 à 19:10, Travis Bischel  a
> écrit :
>
> > Hi all,
> >
> > I've written a KIP to expose the BrokerSoftwareName and
> > BrokerSoftwareVersion to clients:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-885%3A+Expose+Broker%27s+Name+and+Version+to+Clients
> >
> > If we agree this is useful, it would be great to have this in by 3.4.
> >
> > Thank you,
> > - Travis
> >
>


Streams: clarification needed, checkpoint vs. position files

2022-11-11 Thread Nick Telford
Hi everyone,

I'm trying to understand how StateStores work internally for some changes
that I plan to propose, and I'd like some clarification around checkpoint
files and position files.

It appears as though position files are relatively new, and were created as
part of the IQv2 initiative, as a means to track the position of the local
state store so that reads could be bound by particular positions?

Checkpoint files look much older, and are managed by the Task itself
(actually, ProcessorStateManager). It looks like this is used exclusively
for determining a) whether to restore a store, and b) which offsets to
restore from?

If I've understood the above correctly, is there any scope to potentially
replace checkpoint files with StateStore#position()?

Regards,

Nick


Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-11 Thread Nick Telford
Hi everyone,

Sorry to dredge this up again. I've had a chance to start doing some
testing with the WIP Pull Request, and it appears as though the secondary
store solution performs rather poorly.

In our testing, we had a non-transactional state store that would restore
(from scratch), at a rate of nearly 1,000,000 records/second. When we
switched it to a transactional store, it restored at a rate of less than
40,000 records/second.

I suspect the key issues here are having to copy the data out of the
temporary store and into the main store on-commit, and to a lesser extent,
the extra memory copies during writes.

I think it's worth re-visiting the WriteBatchWithIndex solution, as it's
clear from the RocksDB post[1] on the subject that it's the recommended way
to achieve transactionality.

The only issue you identified with this solution was that uncommitted
writes are required to entirely fit in-memory, and RocksDB recommends they
don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I
think we'll find that this will be a non-issue for all but the most extreme
cases, and for those, I think I have a fairly simple solution.

Firstly, when EOS is enabled, the default commit.interval.ms is set to
100ms, which provides fairly short intervals that uncommitted writes need
to be buffered in-memory. If we assume a worst case of 1024 byte records
(and for most cases, they should be much smaller), then 4MiB would hold
~4096 records, which with 100ms commit intervals is a throughput of
approximately 40,960 records/second. This seems quite reasonable.

For use cases that wouldn't reasonably fit in-memory, my suggestion is that
we have a mechanism that tracks the number/size of uncommitted records in
stores, and prematurely commits the Task when this size exceeds a
configured threshold.

Thanks for your time, and let me know what you think!
--
Nick

1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html

On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov
 wrote:

> Hey Nick,
>
> It is going to be option c. Existing state is considered to be committed
> and there will be an additional RocksDB for uncommitted writes.
>
> I am out of office until October 24. I will update KIP and make sure that
> we have an upgrade test for that after coming back from vacation.
>
> Best,
> Alex
>
> On Thu, Oct 6, 2022 at 5:06 PM Nick Telford 
> wrote:
>
> > Hi everyone,
> >
> > I realise this has already been voted on and accepted, but it occurred to
> > me today that the KIP doesn't define the migration/upgrade path for
> > existing non-transactional StateStores that *become* transactional, i.e.
> by
> > adding the transactional boolean to the StateStore constructor.
> >
> > What would be the result, when such a change is made to a Topology,
> without
> > explicitly wiping the application state?
> > a) An error.
> > b) Local state is wiped.
> > c) Existing RocksDB database is used as committed writes and new RocksDB
> > database is created for uncommitted writes.
> > d) Something else?
> >
> > Regards,
> >
> > Nick
> >
> > On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov
> >  wrote:
> >
> > > Hey Guozhang,
> > >
> > > Sounds good. I annotated all added StateStore methods (commit, recover,
> > > transactional) with @Evolving.
> > >
> > > Best,
> > > Alex
> > >
> > >
> > >
> > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Alex,
> > > >
> > > > Thanks for the detailed replies, I think that makes sense, and in the
> > > long
> > > > run we would need some public indicators from StateStore to determine
> > if
> > > > checkpoints can really be used to indicate clean snapshots.
> > > >
> > > > As for the @Evolving label, I think we can still keep it but for a
> > > > different reason, since as we add more state management
> functionalities
> > > in
> > > > the near future we may need to revisit the public APIs again and
> hence
> > > > keeping it as @Evolving would allow us to modify if necessary, in an
> > > easier
> > > > path than deprecate -> delete after several minor releases.
> > > >
> > > > Besides that, I have no further comments about the KIP.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Aug 26, 2022 at 1:51 AM Alexander Sorokoumov
> > > >  wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > >
> > > > > I think that we will have to keep StateStore#transactional()
> because
> > > > > post-commit checkpointing of non-txn state stores will break the
> > > > guarantees
> > > > > we want in
> ProcessorStateManager#initializeStoreOffsetsFromCheckpoint
> > > for
> > > > > correct recovery. Let's consider checkpoint-recovery behavior under
> > EOS
> > > > > that we want to support:
> > > > >
> > > > > 1. Non-txn state stores should checkpoint on graceful shutdown and
> > > > restore
> > > > > from that checkpoint.
> > > > >
> > > > > 2. Non-txn state stores should delete local data during recovery
> > after
> > > a
> > > > > crash failure.
> > > > >
> > > > > 3. T

RE: Re: [DISCUSS] KIP-885: Expose Broker's Name and Version to Clients Skip to end of metadata

2022-11-11 Thread Travis Bischel
Thanks for the replies David and Magnus

David:

02: From a client implementation perspective, it is easier to gate features 
based on the max version numbers returned per request, rather than any textual 
representation of a string. I’m not really envisioning a client implementation 
trying to match on an undefined string, especially if it’s documented as just 
metadata information.

03: Interesting, I may be one of the few that does query the version directly. 
Perhaps this can be some new information that is instead added to request 60, 
ClusterMetadata? The con with ClusterMetadata is that I’m interested in this 
information on a per-broker basis. We could add these fields per each broker in 
the Brokers field, though.

Magnus:

As far as I can see, only my franz-go client offers this ability to “guess” the 
version of a broker — and it’s historically done so through ApiVersions, which 
was the only way to do this. This feature was used in three projects by two 
people: my kcl project, and the formerly-known-as Kowl and Kminion projects. 

After reading through most of the discussion thread on KIP-35, it seems that 
the conversation about using a broker version string / cluster aggregate 
version was specifically related to having the client choose how to behave 
(i.e., choose what versions of requests to use). The discussion was not around 
having the broker version as a piece of information that a client can use in 
log lines or for end-user presentation purposes.

It seems a bit of an misdirected worry that a client implementor may 
accidentally use an unstructured string field for versioning purposes, 
especially when another field (ApiKeys) exists for versioning purposes and is 
widely known. Implementing a Kafka client is quite complex and there are so 
many other areas an implementor can go wrong, I’m not sure that we should be 
worried about an unstructured and documented metadata field.

"the existing ApiVersionsReq  … this information is richer than a single 
version string"

Agree, this true for clients. However, it’s completely useless visually for end 
users.

The reason Kminion used the version guess was two fold: to emit log a log on 
startup that the process was talking to Kafka v#.#, and to emit a const gauge 
metric for Prometheus where people could monitor external to Kafka what version 
each broker was running.

Kowl uses the version guess to display the Kafka version the process is talking 
to immediately when you go to the Brokers panel. I envision that this same UI 
display can be added to Conduktor, even Confluent, etc.

kcl uses the version guess as an extremely quick debugging utility: software 
engineers (not cluster administrators) might not always know what version of 
Kafka they are talking to, but they are trying to use a client. I often receive 
questions about “why isn’t this xyz thing working”, I ask for their cluster 
version with kcl, and then we can jump into diagnosing the problem much quicker.

I think, if we focus on the persona of a cluster administrator, it’s not 
obvious what the need for this KIP is. For me, focusing on the perspective of 
an end-user of a client makes the need a bit clearer. It is not the 
responsibility of an end-user to manage the cluster version, but it is the 
responsibility of an end-user to know which version of a cluster they are 
talking to so that they know which fields / requests / behaviors are supported 
in a client

All that said: I think this information is worth it and unlikely to be misused. 
IMO, ApiVersions is the main place to include this information, but another 
alternative is ClusterMetadata. Adding these fields to ClusterMetadata might be 
a bit awkward and may return stale information sometimes during a rolling 
upgrade.

Curious to know your thoughts, and again thank you for the consideration and 
replies,
- Travis

On 2022/11/11 13:07:37 Magnus Edenhill wrote:
> Hi Travis and thanks for the KIP, two comments below:
> 
> 
> Den fre 11 nov. 2022 kl 13:37 skrev David Jacot :
> 
> > 02: I am a bit concerned by clients that could misuse these information.
> > For instance, one may be tempted to rely on the version to decide whether a
> > feature is enabled or not. The api versions should remain the source of
> > truth but we cannot enforce it with the proposed approach. That may be
> > acceptable though.
> >
> 
> We proposed including this in the original ApiVersionRequest KIP-35 (waaay
> back), but it was rejected
> for exactly this reason; that it may(will) be misused by clients.
> 
> 
> 
> I would also like to question the actual end-user use of this information,
> the existing ApiVersionsReq
> already provides - on a detailed level - what features and functionality
> the broker supports -
> this information is richer than a single version string.
> 
> The KIP says "End users can quickly check from a client if the cluster is
> up to date" and
> "Clients can also use the broker version in log lines so that end users c

RE: RE: Re: [DISCUSS] KIP-885: Expose Broker's Name and Version to Clients Skip to end of metadata

2022-11-11 Thread Travis Bischel
Two quick mistakes to clarify:

When I say ClusterMetadata, I mean request 60, DescribeCluster.

Also, the email subject of this entire thread should be "[DISCUSS] KIP-885: 
Expose Broker's Name and Version to Clients”. I must have either accidentally 
pasted the “Skip to end of metadata”, or did not delete something.

Cheers,
-Travis

On 2022/11/11 16:45:12 Travis Bischel wrote:
> Thanks for the replies David and Magnus
> 
> David:
> 
> 02: From a client implementation perspective, it is easier to gate features 
> based on the max version numbers returned per request, rather than any 
> textual representation of a string. I’m not really envisioning a client 
> implementation trying to match on an undefined string, especially if it’s 
> documented as just metadata information.
> 
> 03: Interesting, I may be one of the few that does query the version 
> directly. Perhaps this can be some new information that is instead added to 
> request 60, ClusterMetadata? The con with ClusterMetadata is that I’m 
> interested in this information on a per-broker basis. We could add these 
> fields per each broker in the Brokers field, though.
> 
> Magnus:
> 
> As far as I can see, only my franz-go client offers this ability to “guess” 
> the version of a broker — and it’s historically done so through ApiVersions, 
> which was the only way to do this. This feature was used in three projects by 
> two people: my kcl project, and the formerly-known-as Kowl and Kminion 
> projects. 
> 
> After reading through most of the discussion thread on KIP-35, it seems that 
> the conversation about using a broker version string / cluster aggregate 
> version was specifically related to having the client choose how to behave 
> (i.e., choose what versions of requests to use). The discussion was not 
> around having the broker version as a piece of information that a client can 
> use in log lines or for end-user presentation purposes.
> 
> It seems a bit of an misdirected worry that a client implementor may 
> accidentally use an unstructured string field for versioning purposes, 
> especially when another field (ApiKeys) exists for versioning purposes and is 
> widely known. Implementing a Kafka client is quite complex and there are so 
> many other areas an implementor can go wrong, I’m not sure that we should be 
> worried about an unstructured and documented metadata field.
> 
> "the existing ApiVersionsReq  … this information is richer than a single 
> version string"
> 
> Agree, this true for clients. However, it’s completely useless visually for 
> end users.
> 
> The reason Kminion used the version guess was two fold: to emit log a log on 
> startup that the process was talking to Kafka v#.#, and to emit a const gauge 
> metric for Prometheus where people could monitor external to Kafka what 
> version each broker was running.
> 
> Kowl uses the version guess to display the Kafka version the process is 
> talking to immediately when you go to the Brokers panel. I envision that this 
> same UI display can be added to Conduktor, even Confluent, etc.
> 
> kcl uses the version guess as an extremely quick debugging utility: software 
> engineers (not cluster administrators) might not always know what version of 
> Kafka they are talking to, but they are trying to use a client. I often 
> receive questions about “why isn’t this xyz thing working”, I ask for their 
> cluster version with kcl, and then we can jump into diagnosing the problem 
> much quicker.
> 
> I think, if we focus on the persona of a cluster administrator, it’s not 
> obvious what the need for this KIP is. For me, focusing on the perspective of 
> an end-user of a client makes the need a bit clearer. It is not the 
> responsibility of an end-user to manage the cluster version, but it is the 
> responsibility of an end-user to know which version of a cluster they are 
> talking to so that they know which fields / requests / behaviors are 
> supported in a client
> 
> All that said: I think this information is worth it and unlikely to be 
> misused. IMO, ApiVersions is the main place to include this information, but 
> another alternative is ClusterMetadata. Adding these fields to 
> ClusterMetadata might be a bit awkward and may return stale information 
> sometimes during a rolling upgrade.
> 
> Curious to know your thoughts, and again thank you for the consideration and 
> replies,
> - Travis
> 
> On 2022/11/11 13:07:37 Magnus Edenhill wrote:
> > Hi Travis and thanks for the KIP, two comments below:
> > 
> > 
> > Den fre 11 nov. 2022 kl 13:37 skrev David Jacot :
> > 
> > > 02: I am a bit concerned by clients that could misuse these information.
> > > For instance, one may be tempted to rely on the version to decide whether 
> > > a
> > > feature is enabled or not. The api versions should remain the source of
> > > truth but we cannot enforce it with the proposed approach. That may be
> > > acceptable though.
> > >
> > 
> > We proposed including this in the original

Re: [VOTE] KIP-866 ZooKeeper to KRaft Migration

2022-11-11 Thread David Arthur
Thanks, Colin.

> never start an upgrade without first verifying the quorum configuration on 
> the ZK-based brokers

I agree that this is a pretty big benefit. I could imagine debugging
and fixing connection problems mid-migration would be a big pain.
Especially if you had some brokers correctly configured, and others
not.

Adding a heartbeat raises some questions about what to do if a broker
goes into a bad state, or stops heartbeating, during a migration.
However, I think the same is true for a registration based approach,
so maybe it's not an increase in net complexity.

I've replaced the ZK registration section with a new RPC and brief
description. Please take a look.

Thanks!
David

On Wed, Nov 9, 2022 at 5:46 PM Colin McCabe  wrote:
>
> Hi David,
>
> Thanks for the response. Replies inline.
>
> On Wed, Nov 9, 2022, at 08:17, David Arthur wrote:
> > Colin
> >
> >>  Maybe zk.metadata.migration.enable ?
> >
> > Done. I went with "zookeeper.metadata.migration.enable" since our
> > other ZK configs start with "zookeeper.*"
> >
> >> SImilarly, for MigrationRecord: can we rename this to 
> >> ZkMigrationStateRecord? Then change MigrationState -> ZkMigrationState.
> >
> > Sure
> >
> >> With ZkMigrationStateRecord, one thing to keep in mind here is that we 
> >> will eventually compact all the metadata logs into a snapshot. That 
> >> snapshot will then have to keep alive the memory of the old migration. So 
> >> it is not really a matter of replaying the old metadata logs (probably) 
> >> but a matter of checking to see what the ZkMigrationState is, which I 
> >> suppose could be Optional. If it's not Optional.empty, 
> >> we already migrated / are migrating.
> >
> > Yea, makes sense.
> >
> >> For the /migration ZNode, is "last_update_time_ms" necessary? I thought ZK 
> >> already tracked this information in the mzxid of the znode?
> >
> > Yes, Jun pointed this out previously, I missed this update in the KIP.
> > Fixed now.
> >
> >> It is true that technically it is only needed in UMR, but I would still 
> >> suggest including KRaftControllerId in LeaderAndIsrRequest because it will 
> >> make debugging much easier.
> >>
> >> I would suggest not implementing the topic deletion state machine, but 
> >> just deleting topics eagerly when in migration mode. We can implement this 
> >> behavior change by keying off of whether KRaftControllerId is present in 
> >> LeaderAndIsrRequest. On broker startup, we'll be sent a full 
> >> LeaderAndIsrRequest and can delete stray partitions whose IDs are not as 
> >> expected (again, this behavior change would only be for migration mode)
> >
> > Sounds good to me. Since this is somewhat of an implementation detail,
> > do you think we need this included in the KIP?
>
> Yeah, maybe we don't need to go into the delete behavior here. But I think 
> the KIP should specify that we have KRaftControllerId in both 
> LeaderAndIsrRequest. That will allow us to implement this behavior 
> conditionally on zk-based brokers when in dual write mode.
>
> >
> >> For existing KRaft controllers, will 
> >> kafka.controller:type=KafkaController,name=MigrationState show up as 4 
> >> (MigrationFinalized)? I assume this is true, but it would be good to spell 
> >> it out. Sorry if this is answered somewhere else.
> >
> > We discussed using 0 (None) as the value to report for original,
> > un-migrated KRaft clusters. 4 (MigrationFinalized) would be for
> > clusters which underwent a migration. I have some description of this
> > in the table under "Migration Overview"
> >
>
> I don't feel that strongly about this, but wouldn't it be a good idea for 
> MigrationState to have a different value for ZK-based clusters and 
> KRaft-based clusters? If you have a bunch of clusters and you take an 
> aggregate of this metric, it would be good to get a report of three numbers:
> 1. unupgraded ZK
> 2. in progress upgrades
> 3. kraft
>
> I guess we could get that from examining some other metrics too, though. Not 
> sure, what do you think?
>
> >> As you point out, the ZK brokers being upgraded will need to contact the 
> >> KRaft quorum in order to forward requests to there, once we are in 
> >> migration mode. This raises a question: rather than changing the broker 
> >> registration, can we have those brokers send an RPC to the kraft 
> >> controller quorum instead? This would serve to confirm that they can reach 
> >> the quorum. Then the quorum could wait for all of the brokers to check in 
> >> this way before starting the migration (It would know all the brokers by 
> >> looking at /brokers)
> >
> > One of the motivations I had for putting the migration details in the
> > broker registration is that it removes ordering constraints between
> > the brokers and controllers when starting the migration. If we set the
> > brokers in migration mode before the KRaft quorum is available, they
> > will just carry on until the KRaft controller takes over the
> > controller leadership and starts sending UMR/LIS

Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-11-11 Thread Chris Egerton
Hi Mickael,

Thanks for your feedback. This has been on my TODO list as well :)

1. That's fair! Support for altering offsets is easy enough to design, so
I've added it to the KIP. The reset-after-delete feature, on the other
hand, is actually pretty tricky to design; I've updated the rationale in
the KIP for delaying it and clarified that it's not just a matter of
implementation but also design work. If you or anyone else can think of a
clean, simple way to implement it, I'm happy to add it to this KIP, but
otherwise I'd prefer not to tie it to the approval and release of the
features already proposed in the KIP.

2. Yeah, it's a little awkward. In my head I've justified the ugliness of
the implementation with the smooth user-facing experience; falling back
seamlessly on the PAUSED state without even logging an error message is a
lot better than I'd initially hoped for when I was designing this feature.

I've also added an implementation plan to the KIP, which calls out the
different parts that can be worked on independently so that others (hi Yash
🙂) can also tackle parts of this if they'd like.

Finally, I've removed the "type" field from the response body format for
offset read requests. This way, users can copy+paste the response from that
endpoint into a request to alter a connector's offsets without having to
remove the "type" field first. An alternative was to keep the "type" field
and add it to the request body format for altering offsets, but this didn't
seem to make enough sense for cases not involving the aforementioned
copy+paste process.

Cheers,

Chris

On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison 
wrote:

> Hi Chris,
>
> Thanks for the KIP, you're picking something that has been in my todo
> list for a while ;)
>
> It looks good overall, I just have a couple of questions:
> 1) I consider both features listed in Future Work pretty important. In
> both cases you mention the reason for not addressing them now is
> because of the implementation. If the design is simple and if we have
> volunteers to implement them, I wonder if we could include them in
> this KIP. So you would not have to implement everything but we would
> have a single KIP and vote.
>
> 2) Regarding the backward compatibility for the stopped state. The
> "state.v2" field is a bit unfortunate but I can't think of a better
> solution. The other alternative would be to not do anything but I
> think the graceful degradation you propose is a bit better.
>
> Thanks,
> Mickael
>
>
>
>
>
> On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton 
> wrote:
> >
> > Hi Yash,
> >
> > Good question! This is actually a subtle source of asymmetry in the
> current
> > proposal. Requests to delete a consumer group with active members will
> > fail, so if there are zombie sink tasks that are still communicating with
> > Kafka, offset reset requests for that connector will also fail. It is
> > possible to use an admin client to remove all active members from the
> group
> > and then delete the group. However, this solution isn't as complete as
> the
> > zombie fencing that we can perform for exactly-once source tasks, since
> > removing consumers from a group doesn't prevent them from immediately
> > rejoining the group, which would either cause the group deletion request
> to
> > fail (if they rejoin before the group is deleted), or recreate the group
> > (if they rejoin after the group is deleted).
> >
> > For ease of implementation, I'd prefer to leave the asymmetry in the API
> > for now and fail fast and clearly if there are still consumers active in
> > the sink connector's group. We can try to detect this case and provide a
> > helpful error message to the user explaining why the offset reset request
> > has failed and some steps they can take to try to resolve things (wait
> for
> > slow task shutdown to complete, restart zombie workers and/or workers
> with
> > blocked tasks on them). In the future we can possibly even revisit
> KIP-611
> > [1] or something like it to provide better insight into zombie tasks on a
> > worker so that it's easier to find which tasks have been abandoned but
> are
> > still running.
> >
> > Let me know what you think; this is an important point to call out and if
> > we can reach some consensus on how to handle sink connector offset resets
> > w/r/t zombie tasks, I'll update the KIP with the details.
> >
> > [1] -
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya  wrote:
> >
> > > Hi Chris,
> > >
> > > Thanks for the response and the explanations, I think you've answered
> > > pretty much all the questions I had meticulously!
> > >
> > >
> > > > if something goes wrong while resetting offsets, there's no
> > > > immediate impact--the connector will still be in the STOPPED
> > > >  state. The REST response for requests to reset the offsets
> > > > will clearly call out that

Re: Starting out with Kafka

2022-11-11 Thread vinay deshpande
Hi,
I tried both the suggestions given in the previous mail threads, all the
unit tests passed except for testMuteOnOOM(). But the issue with IDE
persists.
I even tried invalidating the cache a few times, deleted the .bin/ and
.idea/ folder and built again  but there are quite few imports that aren't
being resolved (especially kafka.server, kafka.zk and kafka.log).
I'm attaching the result of the unit test here.
[image: Screen Shot 2022-11-11 at 10.45.48 AM.png]
TIA.

On Fri, Nov 11, 2022 at 5:04 AM John Roesler  wrote:

> Hello Vinay,
>
> One thing I’ve noticed recently is that I have to click the “build” button
> in intellij before I can use the “run” or “debug” buttons. I’m not sure
> why.
>
> Welcome to the community!
> -John
>
> On Fri, Nov 11, 2022, at 02:47, deng ziming wrote:
> > Hello, Vinay
> > Kafka uses gradlew as build tool and java/scala as program language,
> > You can firstly use `./gradlew unitTest` to build it using terminal,
> > and reload it in gradle window, sometimes I also change default build
> > tool from IDEA to gradle in Preference/Build/build tools/Gradle:
> >
> > PastedGraphic-1.tiff
> >
> > --
> > Ziming
> >
> >> On Nov 11, 2022, at 13:30, vinay deshpande 
> wrote:
> >>
> >> Hi All,
> >> I have a basic question: I tried importing kafka source code into
> intellij
> >> but there are bunch of imports that IDE cannot find like these:
> >>
> >> import kafka.api.ApiVersion;
> >> import kafka.log.CleanerConfig;
> >> import kafka.log.LogConfig;
> >> import kafka.log.LogManager;
> >>
> >>
> >> TIA.
> >>
> >> Thanks,
> >> Vinay
>


Re: Starting out with Kafka

2022-11-11 Thread John Roesler
Hmm,

I assume you did, but just to be sure, did you compile from the terminal before 
trying to build in idea? There are some generated Java classes that might be 
getting missed.

The compile step is probably something like “compileJava compileScala 
compileTestJava compileTestScala” from the top of my head. 

I hope this helps!
-John

Ps: I don’t think image attachments work with the mailing list. Maybe you can 
use a gist?

On Fri, Nov 11, 2022, at 12:51, vinay deshpande wrote:
> Hi,
> I tried both the suggestions given in the previous mail threads, all 
> the unit tests passed except for testMuteOnOOM(). But the issue with 
> IDE persists.
> I even tried invalidating the cache a few times, deleted the .bin/ and 
> .idea/ folder and built again  but there are quite few imports that 
> aren't being resolved (especially kafka.server, kafka.zk and kafka.log).
> I'm attaching the result of the unit test here.
> Screen Shot 2022-11-11 at 10.45.48 AM.png
> TIA.
>
> On Fri, Nov 11, 2022 at 5:04 AM John Roesler  wrote:
>> Hello Vinay,
>> 
>> One thing I’ve noticed recently is that I have to click the “build” button 
>> in intellij before I can use the “run” or “debug” buttons. I’m not sure why. 
>> 
>> Welcome to the community!
>> -John
>> 
>> On Fri, Nov 11, 2022, at 02:47, deng ziming wrote:
>> > Hello, Vinay
>> > Kafka uses gradlew as build tool and java/scala as program language,
>> > You can firstly use `./gradlew unitTest` to build it using terminal, 
>> > and reload it in gradle window, sometimes I also change default build 
>> > tool from IDEA to gradle in Preference/Build/build tools/Gradle:
>> >
>> > PastedGraphic-1.tiff
>> >
>> > --
>> > Ziming
>> >
>> >> On Nov 11, 2022, at 13:30, vinay deshpande  wrote:
>> >> 
>> >> Hi All,
>> >> I have a basic question: I tried importing kafka source code into intellij
>> >> but there are bunch of imports that IDE cannot find like these:
>> >> 
>> >> import kafka.api.ApiVersion;
>> >> import kafka.log.CleanerConfig;
>> >> import kafka.log.LogConfig;
>> >> import kafka.log.LogManager;
>> >> 
>> >> 
>> >> TIA.
>> >> 
>> >> Thanks,
>> >> Vinay


Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-11 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Thanks Greg for taking your time to review not just the KIP but also the PR. 

1. You made very valid points regarding the behavior of the destroy() callback 
for connectors that don't follow the happy path. After thinking about it, I 
decided to tweak the implementation a bit and have the destroy() method be 
called during the worker shutdown: this means it will share the same guarantees 
the connector#stop() method has. An alternative implementation can be to have 
an overloaded connector#stop(boolean deleted) method that signals a connector 
that it is being stopped due to deletion, but I think that having a separate 
destroy() method provides clearer semantics.

I'll make sure to ammend the KIP with these details.

3. Without going too deep on the types of operations that can be performed by a 
connector when it's being deleted, I can imagine the 
org.apache.kafka.connect.source.SourceConnector base class having a default 
implementation that deletes the connector's offsets automatically (controlled 
by a property); this is in the context of KIP-875 (first-class offsets support 
in Kafka Connect). Similar behaviors can be introduced for the SinkConnector, 
however I'm not sure if this KIP is the right place to discuss all the 
possibilities, or if we shoold keeping it more narrow-focused on  providing a 
callback mechanism for when connectors are deleted, and what the expectations 
are around this newly introduced method. What do you think?


From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Hector,

Thanks for the KIP!

This is certainly missing functionality from the native Connect framework,
and we should try to make it possible to inform connectors about this part
of their lifecycle.
However, as with most functionality that was left out of the initial
implementation of the framework, the details are more challenging to work
out.

1. What happens when the destroy call throws an error, how does the
framework respond?

This is unspecified in the KIP, and it appears that your proposed changes
could cause the herder to fail.
From the perspective of operators & connector developers, what is a
reasonable expectation to have for failure of a destroy?
I could see operators wanting both a graceful-delete to make use of this
new feature, and a force-delete for when the graceful-delete fails.
A connector developer could choose to swallow all errors encountered, or
fail-fast to indicate to the operator that there is an issue with the
graceful-delete flow.
If the alternative is crashing the herder, connector developers may choose
to hide serious errors, which is undesirable.

2. What happens when the destroy() call takes a long time to complete, or
is interrupted?

It appears that your implementation serially destroy()s each appropriate
connector, and may prevent the herder thread from making progress while the
operation is ongoing.
We have previously had to patch Connect to perform all connector and task
operations on a background thread, because some connector method
implementations can stall indefinitely.
Connect also has the notion of "cancelling" a connector/task if a graceful
shutdown timeout operation takes too long. Perhaps some of that design or
machinery may be useful to protect this method call as well.

More specific to the destroy() call itself, what happens when a connector
completes part of a destroy operation and then cannot complete the
remainder, either due to timing out or a worker crashing?
What is the contract with the connector developer about this method? Is the
destroy() only started exactly once during the lifetime of the connector,
or may it be retried?

3. What should be considered a reasonable custom implementation of the
destroy() call? What resources should it clean up by default?

I think we can broadly categorize the state a connector mutates among the
following
* Framework-managed state (e.g. source offsets, consumer offsets)
* Implementation detail state (e.g. debezium db history topic, audit
tables, temporary accounts)
* Third party system data (e.g. the actual data being written by a sink
connector)
* Third party system metadata (e.g. tables in a database, delivery
receipts, permissions)

I think it's apparent that the framework-managed state cannot/should not be
interacted with by the destroy() call. However, the framework could be
changed to clean up these resources at the same time that destroy() is
called. Is that out-of-scope of this proposal, and better handled by manual
intervention?
From the text of the KIP, I think it explicitly includes the Implementation
detail state, which should not be depended on externally and should be safe
to clean up during a destroy(). I think this is completely reasonable.
Are the third-party data and metadata out-of-scope for this proposal? Can
we officially recommend against it, or should we accommodate users and
connector dev

Re: Starting out with Kafka

2022-11-11 Thread vinay deshpande
Hi John,
Thanks a ton. Things are looking good now.
Even though Scala was present on my machine the ./gradlew compile was
unable to find the Scala classes, on installing Scala plugin on IDE and
then recompiling from terminal the projecting is building fine now.

Thanks John and Ziming for helping out.

Thanks,
Vinay

On Fri, Nov 11, 2022 at 11:01 AM John Roesler  wrote:

> Hmm,
>
> I assume you did, but just to be sure, did you compile from the terminal
> before trying to build in idea? There are some generated Java classes that
> might be getting missed.
>
> The compile step is probably something like “compileJava compileScala
> compileTestJava compileTestScala” from the top of my head.
>
> I hope this helps!
> -John
>
> Ps: I don’t think image attachments work with the mailing list. Maybe you
> can use a gist?
>
> On Fri, Nov 11, 2022, at 12:51, vinay deshpande wrote:
> > Hi,
> > I tried both the suggestions given in the previous mail threads, all
> > the unit tests passed except for testMuteOnOOM(). But the issue with
> > IDE persists.
> > I even tried invalidating the cache a few times, deleted the .bin/ and
> > .idea/ folder and built again  but there are quite few imports that
> > aren't being resolved (especially kafka.server, kafka.zk and kafka.log).
> > I'm attaching the result of the unit test here.
> > Screen Shot 2022-11-11 at 10.45.48 AM.png
> > TIA.
> >
> > On Fri, Nov 11, 2022 at 5:04 AM John Roesler 
> wrote:
> >> Hello Vinay,
> >>
> >> One thing I’ve noticed recently is that I have to click the “build”
> button in intellij before I can use the “run” or “debug” buttons. I’m not
> sure why.
> >>
> >> Welcome to the community!
> >> -John
> >>
> >> On Fri, Nov 11, 2022, at 02:47, deng ziming wrote:
> >> > Hello, Vinay
> >> > Kafka uses gradlew as build tool and java/scala as program language,
> >> > You can firstly use `./gradlew unitTest` to build it using terminal,
> >> > and reload it in gradle window, sometimes I also change default build
> >> > tool from IDEA to gradle in Preference/Build/build tools/Gradle:
> >> >
> >> > PastedGraphic-1.tiff
> >> >
> >> > --
> >> > Ziming
> >> >
> >> >> On Nov 11, 2022, at 13:30, vinay deshpande 
> wrote:
> >> >>
> >> >> Hi All,
> >> >> I have a basic question: I tried importing kafka source code into
> intellij
> >> >> but there are bunch of imports that IDE cannot find like these:
> >> >>
> >> >> import kafka.api.ApiVersion;
> >> >> import kafka.log.CleanerConfig;
> >> >> import kafka.log.LogConfig;
> >> >> import kafka.log.LogManager;
> >> >>
> >> >>
> >> >> TIA.
> >> >>
> >> >> Thanks,
> >> >> Vinay
>


Re: Streams: clarification needed, checkpoint vs. position files

2022-11-11 Thread Sophie Blee-Goldman
Hey Nick,

I haven't been following the new IQv2 work very closely so take this with a
grain of salt,
but as far as I'm aware there's no such thing as "position files" -- the
Position is just an
in-memory object and is related to a user's query against the state store,
whereas a
checkpoint file reflects the current state of the store ie how much of the
changelog it
contains.

In other words while these might look like they do similar things, the
actual usage and
implementation of Positions vs checkpoint files is pretty much unrelated.
So I don't think
it would sense for Streams to try and consolidate these or replace one with
another.

Hope this answers your question, and I'll ping John to make sure I'm not
misleading
you regarding the usage/intention of Positions

Sophie

On Fri, Nov 11, 2022 at 6:48 AM Nick Telford  wrote:

> Hi everyone,
>
> I'm trying to understand how StateStores work internally for some changes
> that I plan to propose, and I'd like some clarification around checkpoint
> files and position files.
>
> It appears as though position files are relatively new, and were created as
> part of the IQv2 initiative, as a means to track the position of the local
> state store so that reads could be bound by particular positions?
>
> Checkpoint files look much older, and are managed by the Task itself
> (actually, ProcessorStateManager). It looks like this is used exclusively
> for determining a) whether to restore a store, and b) which offsets to
> restore from?
>
> If I've understood the above correctly, is there any scope to potentially
> replace checkpoint files with StateStore#position()?
>
> Regards,
>
> Nick
>


[jira] [Resolved] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12935.

Assignee: Lucas Brutschy

Fixed again via 
https://github.com/apache/kafka/commit/ce5faa222b3f58a74994190e3a6267ac87ee21a8

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf
>
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14384) Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14384:
--

 Summary: Flaky Test 
SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
 Key: KAFKA-14384
 URL: https://issues.apache.org/jira/browse/KAFKA-14384
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Did not receive all 5 records from topic 
selfjoin-outputSelfJoinUpgradeIntegrationTestshouldUpgradeWithTopologyOptimizationOff
 within 6 ms Expected: is a value equal to or greater than <5> but: <0> was 
less than <5> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueWithTimestampRecordsReceived$2(IntegrationTestUtils.java:763)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(IntegrationTestUtils.java:759)
 at 
org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.processKeyValueAndVerifyCount(SelfJoinUpgradeIntegrationTest.java:244)
 at 
org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff(SelfJoinUpgradeIntegrationTest.java:155)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12835/4/testReport/org.apache.kafka.streams.integration/SelfJoinUpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeWithTopologyOptimizationOff/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14385) Flaky Test QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14385:
--

 Summary: Flaky Test 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
 Key: KAFKA-14385
 URL: https://issues.apache.org/jira/browse/KAFKA-14385
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Failed twice on the same build (Java 8 & 11)
h3. Stacktrace

java.lang.AssertionError: KafkaStreams did not transit to RUNNING state within 
15000 milli seconds. Expected:  but: was  at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:134)
 at 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:121)
 at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:1038)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12836/3/testReport/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldNotMakeStoreAvailableUntilAllStoresAvailable/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Streams: clarification needed, checkpoint vs. position files

2022-11-11 Thread John Roesler
Hi all,

Just to clarify: there actually is a position file. It was a small detail of 
the IQv2 implementation to add it, otherwise a persistent store's position 
would be lost after a restart.

Otherwise, Sophie is right on the money. The checkpoint refers to an offset in 
the changelog, while the position refers to offsets in the task's input topics 
topics. So they are similar in function and structure, but they refer to two 
different things.

I agree that, given this, it doesn't seem like consolidating them (for example, 
into one file) would be worth it. It would make the code more complicated 
without deduping any information.

I hope this helps, and look forward to what you're cooking up, Nick!
-John

On 2022/11/12 00:50:27 Sophie Blee-Goldman wrote:
> Hey Nick,
> 
> I haven't been following the new IQv2 work very closely so take this with a
> grain of salt,
> but as far as I'm aware there's no such thing as "position files" -- the
> Position is just an
> in-memory object and is related to a user's query against the state store,
> whereas a
> checkpoint file reflects the current state of the store ie how much of the
> changelog it
> contains.
> 
> In other words while these might look like they do similar things, the
> actual usage and
> implementation of Positions vs checkpoint files is pretty much unrelated.
> So I don't think
> it would sense for Streams to try and consolidate these or replace one with
> another.
> 
> Hope this answers your question, and I'll ping John to make sure I'm not
> misleading
> you regarding the usage/intention of Positions
> 
> Sophie
> 
> On Fri, Nov 11, 2022 at 6:48 AM Nick Telford  wrote:
> 
> > Hi everyone,
> >
> > I'm trying to understand how StateStores work internally for some changes
> > that I plan to propose, and I'd like some clarification around checkpoint
> > files and position files.
> >
> > It appears as though position files are relatively new, and were created as
> > part of the IQv2 initiative, as a means to track the position of the local
> > state store so that reads could be bound by particular positions?
> >
> > Checkpoint files look much older, and are managed by the Task itself
> > (actually, ProcessorStateManager). It looks like this is used exclusively
> > for determining a) whether to restore a store, and b) which offsets to
> > restore from?
> >
> > If I've understood the above correctly, is there any scope to potentially
> > replace checkpoint files with StateStore#position()?
> >
> > Regards,
> >
> > Nick
> >
> 


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1349

2022-11-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.3 #114

2022-11-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1350

2022-11-11 Thread Apache Jenkins Server
See