[jira] [Created] (KAFKA-14410) Allow connect runtime to run multiple versions of a connector.

2022-11-21 Thread Snehashis Pal (Jira)
Snehashis Pal created KAFKA-14410:
-

 Summary: Allow connect runtime to run multiple versions of a 
connector. 
 Key: KAFKA-14410
 URL: https://issues.apache.org/jira/browse/KAFKA-14410
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Snehashis Pal
Assignee: Snehashis Pal


Connect Runtime should support running multiple versions of the same connector. 
Please refer to 
[KIP-891|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235834793]
 for more information on the problem and the proposed solution. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Snehashis
Hi all,

I'd like to start a discussion thread on KIP-891: Running multiple versions
of a connector.

The KIP aims to add the ability for the connect runtime to run multiple
versions of a connector.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector

Please take a look and let me know what you think.

Thank you
Snehashis Pal


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #123

2022-11-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 500325 lines...]
[2022-11-21T05:29:38.183Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] STARTED
[2022-11-21T05:29:52.166Z] 
[2022-11-21T05:29:52.166Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = true] PASSED
[2022-11-21T05:29:52.166Z] 
[2022-11-21T05:29:52.166Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] STARTED
[2022-11-21T05:29:57.995Z] 
[2022-11-21T05:29:57.995Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = true] PASSED
[2022-11-21T05:29:57.995Z] 
[2022-11-21T05:29:57.995Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED
[2022-11-21T05:30:03.821Z] 
[2022-11-21T05:30:03.821Z] Exception: java.lang.OutOfMemoryError thrown from 
the UncaughtExceptionHandler in thread "delete-temp-file-shutdown-hook"
[2022-11-21T05:30:06.517Z] 
[2022-11-21T05:30:06.517Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED
[2022-11-21T05:30:06.517Z] 
[2022-11-21T05:30:06.517Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED
[2022-11-21T05:30:16.618Z] 
[2022-11-21T05:30:16.618Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED
[2022-11-21T05:30:16.618Z] 
[2022-11-21T05:30:16.618Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = true] STARTED
[2022-11-21T05:30:28.423Z] 
[2022-11-21T05:30:28.423Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = true] PASSED
[2022-11-21T05:30:28.423Z] 
[2022-11-21T05:30:28.423Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = true] STARTED
[2022-11-21T05:30:42.459Z] 
[2022-11-21T05:30:42.459Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeftRepartitioned[caching enabled = true] PASSED
[2022-11-21T05:30:42.459Z] 
[2022-11-21T05:30:42.459Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testSelfJoin[caching enabled = true] STARTED
[2022-11-21T05:30:45.086Z] 
[2022-11-21T05:30:45.086Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testSelfJoin[caching enabled = true] PASSED
[2022-11-21T05:30:45.086Z] 
[2022-11-21T05:30:45.086Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = false] STARTED
[2022-11-21T05:30:57.041Z] 
[2022-11-21T05:30:57.041Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInnerRepartitioned[caching enabled = false] PASSED
[2022-11-21T05:30:57.041Z] 
[2022-11-21T05:30:57.041Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = false] STARTED
[2022-11-21T05:31:08.819Z] 
[2022-11-21T05:31:08.819Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuterRepartitioned[caching enabled = false] PASSED
[2022-11-21T05:31:08.819Z] 
[2022-11-21T05:31:08.819Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = false] STARTED
[2022-11-21T05:31:15.805Z] 
[2022-11-21T05:31:15.805Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testInner[caching enabled = false] PASSED
[2022-11-21T05:31:15.805Z] 
[2022-11-21T05:31:15.805Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = false] STARTED
[2022-11-21T05:31:24.370Z] 
[2022-11-21T05:31:24.370Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testOuter[caching enabled = false] PASSED
[2022-11-21T05:31:24.370Z] 
[2022-11-21T05:31:24.370Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED
[2022-11-21T05:31:32.764Z] 
[2022-11-21T05:31:32.764Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED
[2022-11-21T05:31:32.764Z] 
[2022-11-21T05:31:32.764Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = false] STARTED
[2022-11-21T05:31:44.871Z] 
[2022-11-21T05:31:44.871Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testMultiInner[caching enabled = false] PASSED
[2022-11-21T05:31:44.871Z] 
[2022-11-21T05:31:44.871Z] 
org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest > 
testLef

Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-11-21 Thread Mickael Maison
Hi Chris,

Thanks for all the updates, yes that seems good!

Mickael

On Thu, Nov 17, 2022 at 8:41 PM Chris Egerton  wrote:
>
> Hi Mickael,
>
> Thanks for your thoughts! IMO it's most intuitive to use a null value in
> the PATCH API to signify that an offset should be reset, since it aligns
> nicely with the API we provide to source connectors, where null offsets are
> translated under the hood to tombstone records in the internal offsets
> topic. Does that seem reasonable to you?
>
> Cheers,
>
> Chris
>
> On Thu, Nov 17, 2022 at 2:35 PM Chris Egerton  wrote:
>
> > Hi Yash,
> >
> > I've updated the KIP with the correct "kafka_topic", "kafka_partition",
> > and "kafka_offset" keys in the JSON examples (settled on those instead of
> > prefixing with "Kafka " for better interactions with tooling like JQ). I've
> > also added a note about sink offset requests failing if there are still
> > active members in the consumer group.
> >
> > I don't believe logging an error message is sufficient for handling
> > failures to reset-after-delete. IMO it's highly likely that users will
> > either shoot themselves in the foot by not reading the fine print and
> > realizing that the offset request may have failed, or will ask for better
> > visibility into the success or failure of the reset request than scanning
> > log files. I don't doubt that there are ways to address this, but I would
> > prefer to leave them to a separate KIP since the required design work is
> > non-trivial and I do not feel that the added burden is worth tying to this
> > KIP as a blocker.
> >
> > I was really hoping to avoid introducing a change to the developer-facing
> > APIs with this KIP, but after giving it some thought I think this may be
> > unavoidable. It's debatable whether validation of altered offsets is a good
> > enough use case on its own for this kind of API, but since there are also
> > connectors out there that manage offsets externally, we should probably add
> > a hook to allow those external offsets to be managed, which can then serve
> > double- or even-triple duty as a hook to validate custom offsets and to
> > notify users whether offset resets/alterations are supported at all (which
> > they may not be if, for example, offsets are coupled tightly with the data
> > written by a sink connector). I've updated the KIP with the
> > developer-facing API changes for this logic; let me know what you think.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison 
> > wrote:
> >
> >> Hi Chris,
> >>
> >> Thanks for the update!
> >>
> >> It's relatively common to only want to reset offsets for a specific
> >> resource (for example with MirrorMaker for one or a group of topics).
> >> Could it be possible to add a way to do so? Either by providing a
> >> payload to DELETE or by setting the offset field to an empty object in
> >> the PATCH payload?
> >>
> >> Thanks,
> >> Mickael
> >>
> >> On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya  wrote:
> >> >
> >> > Hi Chris,
> >> >
> >> > Thanks for pointing out that the consumer group deletion step itself
> >> will
> >> > fail in case of zombie sink tasks. Since we can't get any stronger
> >> > guarantees from consumers (unlike with transactional producers), I
> >> think it
> >> > makes perfect sense to fail the offset reset attempt in such scenarios
> >> with
> >> > a relevant error message to the user. I was more concerned about
> >> silently
> >> > failing but it looks like that won't be an issue. It's probably worth
> >> > calling out this difference between source / sink connectors explicitly
> >> in
> >> > the KIP, what do you think?
> >> >
> >> > > changing the field names for sink offsets
> >> > > from "topic", "partition", and "offset" to "Kafka
> >> > > topic", "Kafka partition", and "Kafka offset" respectively, to
> >> > > reduce the stuttering effect of having a "partition" field inside
> >> > >  a "partition" field and the same with an "offset" field
> >> >
> >> > The KIP is still using the nested partition / offset fields by the way -
> >> > has it not been updated because we're waiting for consensus on the field
> >> > names?
> >> >
> >> > > The reset-after-delete feature, on the other
> >> > > hand, is actually pretty tricky to design; I've updated the
> >> > > rationale in the KIP for delaying it and clarified that it's not
> >> > > just a matter of implementation but also design work.
> >> >
> >> > I like the idea of writing an offset reset request to the config topic
> >> > which will be processed by the herder's config update listener - I'm not
> >> > sure I fully follow the concerns with regard to handling failures? Why
> >> > can't we simply log an error saying that the offset reset for the
> >> deleted
> >> > connector "xyz" failed due to reason "abc"? As long as it's documented
> >> that
> >> > connector deletion and offset resets are asynchronous and a success
> >> > response only means that the request was initiated successfully (which
> >> is

Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-21 Thread Alexander Sorokoumov
Hey Nick,

Thank you for the prototype testing and benchmarking, and sorry for the
late reply!

I agree that it is worth revisiting the WriteBatchWithIndex approach. I
will implement a fork of the current prototype that uses that mechanism to
ensure transactionality and let you know when it is ready for
review/testing in this ML thread.

As for time estimates, I might not have enough time to finish the prototype
in December, so it will probably be ready for review in January.

Best,
Alex

On Fri, Nov 11, 2022 at 4:24 PM Nick Telford  wrote:

> Hi everyone,
>
> Sorry to dredge this up again. I've had a chance to start doing some
> testing with the WIP Pull Request, and it appears as though the secondary
> store solution performs rather poorly.
>
> In our testing, we had a non-transactional state store that would restore
> (from scratch), at a rate of nearly 1,000,000 records/second. When we
> switched it to a transactional store, it restored at a rate of less than
> 40,000 records/second.
>
> I suspect the key issues here are having to copy the data out of the
> temporary store and into the main store on-commit, and to a lesser extent,
> the extra memory copies during writes.
>
> I think it's worth re-visiting the WriteBatchWithIndex solution, as it's
> clear from the RocksDB post[1] on the subject that it's the recommended way
> to achieve transactionality.
>
> The only issue you identified with this solution was that uncommitted
> writes are required to entirely fit in-memory, and RocksDB recommends they
> don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I
> think we'll find that this will be a non-issue for all but the most extreme
> cases, and for those, I think I have a fairly simple solution.
>
> Firstly, when EOS is enabled, the default commit.interval.ms is set to
> 100ms, which provides fairly short intervals that uncommitted writes need
> to be buffered in-memory. If we assume a worst case of 1024 byte records
> (and for most cases, they should be much smaller), then 4MiB would hold
> ~4096 records, which with 100ms commit intervals is a throughput of
> approximately 40,960 records/second. This seems quite reasonable.
>
> For use cases that wouldn't reasonably fit in-memory, my suggestion is that
> we have a mechanism that tracks the number/size of uncommitted records in
> stores, and prematurely commits the Task when this size exceeds a
> configured threshold.
>
> Thanks for your time, and let me know what you think!
> --
> Nick
>
> 1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html
>
> On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov
>  wrote:
>
> > Hey Nick,
> >
> > It is going to be option c. Existing state is considered to be committed
> > and there will be an additional RocksDB for uncommitted writes.
> >
> > I am out of office until October 24. I will update KIP and make sure that
> > we have an upgrade test for that after coming back from vacation.
> >
> > Best,
> > Alex
> >
> > On Thu, Oct 6, 2022 at 5:06 PM Nick Telford 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I realise this has already been voted on and accepted, but it occurred
> to
> > > me today that the KIP doesn't define the migration/upgrade path for
> > > existing non-transactional StateStores that *become* transactional,
> i.e.
> > by
> > > adding the transactional boolean to the StateStore constructor.
> > >
> > > What would be the result, when such a change is made to a Topology,
> > without
> > > explicitly wiping the application state?
> > > a) An error.
> > > b) Local state is wiped.
> > > c) Existing RocksDB database is used as committed writes and new
> RocksDB
> > > database is created for uncommitted writes.
> > > d) Something else?
> > >
> > > Regards,
> > >
> > > Nick
> > >
> > > On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov
> > >  wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Sounds good. I annotated all added StateStore methods (commit,
> recover,
> > > > transactional) with @Evolving.
> > > >
> > > > Best,
> > > > Alex
> > > >
> > > >
> > > >
> > > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Alex,
> > > > >
> > > > > Thanks for the detailed replies, I think that makes sense, and in
> the
> > > > long
> > > > > run we would need some public indicators from StateStore to
> determine
> > > if
> > > > > checkpoints can really be used to indicate clean snapshots.
> > > > >
> > > > > As for the @Evolving label, I think we can still keep it but for a
> > > > > different reason, since as we add more state management
> > functionalities
> > > > in
> > > > > the near future we may need to revisit the public APIs again and
> > hence
> > > > > keeping it as @Evolving would allow us to modify if necessary, in
> an
> > > > easier
> > > > > path than deprecate -> delete after several minor releases.
> > > > >
> > > > > Besides that, I have no further comments about the KIP.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On 

[jira] [Created] (KAFKA-14411) Logging warning when partitions don't exist on assign request

2022-11-21 Thread Paolo Patierno (Jira)
Paolo Patierno created KAFKA-14411:
--

 Summary: Logging warning when partitions don't exist on assign 
request
 Key: KAFKA-14411
 URL: https://issues.apache.org/jira/browse/KAFKA-14411
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Paolo Patierno


{{When using the assign method on a consumer providing a non existing topic 
(and the Kafka cluster has no auto-creation enabled), the log shows messages 
like.}}
{code:java}
Subscribed to partition(s): not-existing-topic-1
Error while fetching metadata with correlation id 3 : 
{not-existing-topic=UNKNOWN_TOPIC_OR_PARTITION}{code}
{{which could make sense if at some point the user create the topic and the 
consumer will be subscribed to it.}}

{{Different is when the topic exists but not the partition requested by the 
consumer.}}
{code:java}
Subscribed to partition(s): existing-topic-1 {code}
{{The above message shows that the consumer is subscribed but it will start to 
get messages only when the partition will be created as well. Anyway, the log 
could be misleading not printing at least a WARNING that the requested 
partition doesn't exist.}}

{{So, as we have an error on fetching metadata logged when topic not exist (no 
auto-creation enabled), it could be useful to have WARNING messages in the log 
about not existing requested partitions.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-844: Transactional State Stores

2022-11-21 Thread Nick Telford
Hi Alex,

Thanks for getting back to me. I actually have most of a working
implementation already. I'm going to write it up as a new KIP, so that it
can be reviewed independently of KIP-844.

Hopefully, working together we can have it ready sooner.

I'll keep you posted on my progress.

Regards,
Nick

On Mon, 21 Nov 2022 at 11:25, Alexander Sorokoumov
 wrote:

> Hey Nick,
>
> Thank you for the prototype testing and benchmarking, and sorry for the
> late reply!
>
> I agree that it is worth revisiting the WriteBatchWithIndex approach. I
> will implement a fork of the current prototype that uses that mechanism to
> ensure transactionality and let you know when it is ready for
> review/testing in this ML thread.
>
> As for time estimates, I might not have enough time to finish the prototype
> in December, so it will probably be ready for review in January.
>
> Best,
> Alex
>
> On Fri, Nov 11, 2022 at 4:24 PM Nick Telford 
> wrote:
>
> > Hi everyone,
> >
> > Sorry to dredge this up again. I've had a chance to start doing some
> > testing with the WIP Pull Request, and it appears as though the secondary
> > store solution performs rather poorly.
> >
> > In our testing, we had a non-transactional state store that would restore
> > (from scratch), at a rate of nearly 1,000,000 records/second. When we
> > switched it to a transactional store, it restored at a rate of less than
> > 40,000 records/second.
> >
> > I suspect the key issues here are having to copy the data out of the
> > temporary store and into the main store on-commit, and to a lesser
> extent,
> > the extra memory copies during writes.
> >
> > I think it's worth re-visiting the WriteBatchWithIndex solution, as it's
> > clear from the RocksDB post[1] on the subject that it's the recommended
> way
> > to achieve transactionality.
> >
> > The only issue you identified with this solution was that uncommitted
> > writes are required to entirely fit in-memory, and RocksDB recommends
> they
> > don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I
> > think we'll find that this will be a non-issue for all but the most
> extreme
> > cases, and for those, I think I have a fairly simple solution.
> >
> > Firstly, when EOS is enabled, the default commit.interval.ms is set to
> > 100ms, which provides fairly short intervals that uncommitted writes need
> > to be buffered in-memory. If we assume a worst case of 1024 byte records
> > (and for most cases, they should be much smaller), then 4MiB would hold
> > ~4096 records, which with 100ms commit intervals is a throughput of
> > approximately 40,960 records/second. This seems quite reasonable.
> >
> > For use cases that wouldn't reasonably fit in-memory, my suggestion is
> that
> > we have a mechanism that tracks the number/size of uncommitted records in
> > stores, and prematurely commits the Task when this size exceeds a
> > configured threshold.
> >
> > Thanks for your time, and let me know what you think!
> > --
> > Nick
> >
> > 1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html
> >
> > On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov
> >  wrote:
> >
> > > Hey Nick,
> > >
> > > It is going to be option c. Existing state is considered to be
> committed
> > > and there will be an additional RocksDB for uncommitted writes.
> > >
> > > I am out of office until October 24. I will update KIP and make sure
> that
> > > we have an upgrade test for that after coming back from vacation.
> > >
> > > Best,
> > > Alex
> > >
> > > On Thu, Oct 6, 2022 at 5:06 PM Nick Telford 
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I realise this has already been voted on and accepted, but it
> occurred
> > to
> > > > me today that the KIP doesn't define the migration/upgrade path for
> > > > existing non-transactional StateStores that *become* transactional,
> > i.e.
> > > by
> > > > adding the transactional boolean to the StateStore constructor.
> > > >
> > > > What would be the result, when such a change is made to a Topology,
> > > without
> > > > explicitly wiping the application state?
> > > > a) An error.
> > > > b) Local state is wiped.
> > > > c) Existing RocksDB database is used as committed writes and new
> > RocksDB
> > > > database is created for uncommitted writes.
> > > > d) Something else?
> > > >
> > > > Regards,
> > > >
> > > > Nick
> > > >
> > > > On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov
> > > >  wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > Sounds good. I annotated all added StateStore methods (commit,
> > recover,
> > > > > transactional) with @Evolving.
> > > > >
> > > > > Best,
> > > > > Alex
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Hello Alex,
> > > > > >
> > > > > > Thanks for the detailed replies, I think that makes sense, and in
> > the
> > > > > long
> > > > > > run we would need some public indicators from StateStore to
> > determine
> > > > if
> > > > > > ch

Request for Kafka Jira account

2022-11-21 Thread Gantigmaa Selenge
Hi team,

Can I please have an account created for Kafka Jira?

Username: tinaselenge
Display name: Gantigmaa Selenge
Email address: tina.sele...@gmail.com

Thanks!

Regards,
Gantigmaa
Posted to dev@kafka.apache.org



Re: Request for Kafka Jira account

2022-11-21 Thread Mickael Maison
Hi,

I've created your account, you should receive an email with all the details.

Thanks,
Mickael

On Mon, Nov 21, 2022 at 3:23 PM Gantigmaa Selenge  wrote:
>
> Hi team,
>
> Can I please have an account created for Kafka Jira?
>
> Username: tinaselenge
> Display name: Gantigmaa Selenge
> Email address: tina.sele...@gmail.com
>
> Thanks!
>
> Regards,
> Gantigmaa
> Posted to dev@kafka.apache.org
> 


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1369

2022-11-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 512952 lines...]
[2022-11-21T14:47:33.438Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > IQv2IntegrationTest > shouldRejectNonRunningActive() PASSED
[2022-11-21T14:47:35.205Z] 
[2022-11-21T14:47:35.205Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldCompactTopicsForKeyValueStoreChangelogs() STARTED
[2022-11-21T14:47:36.158Z] 
[2022-11-21T14:47:36.158Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldCompactTopicsForKeyValueStoreChangelogs() PASSED
[2022-11-21T14:47:36.158Z] 
[2022-11-21T14:47:36.158Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldGetToRunningWithWindowedTableInFKJ() STARTED
[2022-11-21T14:47:37.931Z] 
[2022-11-21T14:47:37.931Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldGetToRunningWithWindowedTableInFKJ() PASSED
[2022-11-21T14:47:37.931Z] 
[2022-11-21T14:47:37.931Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs() STARTED
[2022-11-21T14:47:38.875Z] 
[2022-11-21T14:47:38.875Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > InternalTopicIntegrationTest > 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs() PASSED
[2022-11-21T14:47:39.967Z] 
[2022-11-21T14:47:39.967Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldAggregateSlidingWindows(TestInfo) STARTED
[2022-11-21T14:47:43.626Z] 
[2022-11-21T14:47:43.626Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldAggregateSlidingWindows(TestInfo) PASSED
[2022-11-21T14:47:43.626Z] 
[2022-11-21T14:47:43.626Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows() STARTED
[2022-11-21T14:47:44.579Z] 
[2022-11-21T14:47:44.579Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows() PASSED
[2022-11-21T14:47:44.579Z] 
[2022-11-21T14:47:44.579Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldReduceSlidingWindows(TestInfo) STARTED
[2022-11-21T14:47:45.103Z] 
[2022-11-21T14:47:45.103Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest()
 PASSED
[2022-11-21T14:47:45.103Z] 
[2022-11-21T14:47:45.103Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > FineGrainedAutoResetIntegrationTest > 
shouldThrowStreamsExceptionNoResetSpecified() STARTED
[2022-11-21T14:47:45.103Z] 
[2022-11-21T14:47:45.103Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > FineGrainedAutoResetIntegrationTest > 
shouldThrowStreamsExceptionNoResetSpecified() PASSED
[2022-11-21T14:47:47.495Z] 
[2022-11-21T14:47:47.495Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldGetToRunningWithOnlyGlobalTopology() STARTED
[2022-11-21T14:47:48.547Z] 
[2022-11-21T14:47:48.547Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldGetToRunningWithOnlyGlobalTopology() PASSED
[2022-11-21T14:47:48.547Z] 
[2022-11-21T14:47:48.547Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin() STARTED
[2022-11-21T14:47:49.248Z] 
[2022-11-21T14:47:49.248Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > 
shouldReduceSlidingWindows(TestInfo) PASSED
[2022-11-21T14:47:49.248Z] 
[2022-11-21T14:47:49.248Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > KStreamAggregationIntegrationTest > shouldReduce(TestInfo) 
STARTED
[2022-11-21T14:47:51.915Z] 
[2022-11-21T14:47:51.916Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin() PASSED
[2022-11-21T14:47:51.916Z] 
[2022-11-21T14:47:51.916Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart() STARTED
[2022-11-21T14:47:53.010Z] 
[2022-11-21T14:47:53.010Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > GlobalKTableIntegrationTest > 
shouldRestoreGlobalInMemoryKTableOnRestart() PASSED

[jira] [Created] (KAFKA-14412) Transactional semantics for StateStores

2022-11-21 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-14412:


 Summary: Transactional semantics for StateStores
 Key: KAFKA-14412
 URL: https://issues.apache.org/jira/browse/KAFKA-14412
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford
Assignee: Nicholas Telford


We wish to improve atomicity, consistency and durability of StateStores, 
especially when operating under EOS.

The changes are outlined in [KIP-892: Transactional Semantics for 
StateStores|https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores]

This is an alternative to the opt-in StateStores described in 
[KIP-844|https://cwiki.apache.org/confluence/display/KAFKA/KIP-844:+Transactional+State+Stores]
 and KAFKA-12549



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-21 Thread Nick Telford
Hi everyone,

As I mentioned in the discussion thread for KIP-844, I've been working on
an alternative approach to achieving better transactional semantics for
Kafka Streams StateStores.

I've published this separately as KIP-892: Transactional Semantics for
StateStores
,
so that it can be discussed/reviewed separately from KIP-844.

Alex: I'm especially interested in what you think!

I have a nearly complete implementation of the changes outlined in this
KIP, please let me know if you'd like me to push them for review in advance
of a vote.

Regards,

Nick


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Mickael Maison
Hi,

Thanks for the KIP, this is something that could be really useful!

1) Can you explain how this would work with the GET
/{pluginName}/config endpoint? How do you specify a version for a
connector?

2) Some connectors come bundled with transformations (for example
Debezium). How would multiple versions of a transformation be handled?

3) You mention the latest version will be picked by default if not
specified. The version() method returns a string and currently
enforces no semantics on the value it returns. Can you clarify the new
expected semantics and explain how versions will be compared
(alphabetical, semantics versioning, something else?)

Thanks,
Mickael


On Mon, Nov 21, 2022 at 9:57 AM Snehashis  wrote:
>
> Hi all,
>
> I'd like to start a discussion thread on KIP-891: Running multiple versions
> of a connector.
>
> The KIP aims to add the ability for the connect runtime to run multiple
> versions of a connector.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
>
> Please take a look and let me know what you think.
>
> Thank you
> Snehashis Pal


Re: [DISCUSS] KIP-852 Optimize calculation of size for log in remote tier

2022-11-21 Thread Divij Vaidya
Thank you Jun and Alex for your comments.

Point#1: You are right Jun. As Alex mentioned, the "derived metadata" can
increase the size of cached metadata by a factor of 10 but it should be ok
to cache just the actual metadata. My point about size being a limitation
for using cache is not valid anymore.

Point#2: For a new replica, it would still have to fetch the metadata over
the network to initiate the warm up of the cache and hence, increase the
start time of the archival process. Please also note the repercussions of
the warm up scan that Alex mentioned in this thread as part of #102.2.

100#: Agreed Alex. Thanks for clarifying that. My point about size being a
limitation for using cache is not valid anymore.

101#: Alex, if I understand correctly, you are suggesting to cache the
total size at the leader and update it on archival. This wouldn't work for
cases when the leader restarts where we would have to make a full scan
to update the total size entry on startup. We expect users to store data
over longer duration in remote storage which increases the likelihood of
leader restarts / failovers.

102#.1: I don't think that the current design accommodates the fact that
data corruption could happen at the RLMM plugin (we don't have checksum as
a field in metadata as part of KIP405). If data corruption occurs, w/ or
w/o the cache, it would be a different problem to solve. I would like to
keep this outside the scope of this KIP.

102#.2: Agree. This remains as the main concern for using the cache to
fetch total size.

Regards,
Divij Vaidya



On Fri, Nov 18, 2022 at 12:59 PM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hi Divij,
>
> Thanks for the KIP. Please find some comments based on what I read on
> this thread so far - apologies for the repeats and the late reply.
>
> If I understand correctly, one of the main elements of discussion is
> about caching in Kafka versus delegation of providing the remote size
> of a topic-partition to the plugin.
>
> A few comments:
>
> 100. The size of the “derived metadata” which is managed by the plugin
> to represent an rlmMetadata can indeed be close to 1 kB on average
> depending on its own internal structure, e.g. the redundancy it
> enforces (unfortunately resulting to duplication), additional
> information such as checksums and primary and secondary indexable
> keys. But indeed, the rlmMetadata is itself a lighter data structure
> by a factor of 10. And indeed, instead of caching the “derived
> metadata”, only the rlmMetadata could be, which should address the
> concern regarding the memory occupancy of the cache.
>
> 101. I am not sure I fully understand why we would need to cache the
> list of rlmMetadata to retain the remote size of a topic-partition.
> Since the leader of a topic-partition is, in non-degenerated cases,
> the only actor which can mutate the remote part of the
> topic-partition, hence its size, it could in theory only cache the
> size of the remote log once it has calculated it? In which case there
> would not be any problem regarding the size of the caching strategy.
> Did I miss something there?
>
> 102. There may be a few challenges to consider with caching:
>
> 102.1) As mentioned above, the caching strategy assumes no mutation
> outside the lifetime of a leader. While this is true in the normal
> course of operation, there could be accidental mutation outside of the
> leader and a loss of consistency between the cached state and the
> actual remote representation of the log. E.g. split-brain scenarios,
> bugs in the plugins, bugs in external systems with mutating access on
> the derived metadata. In the worst case, a drift between the cached
> size and the actual size could lead to over-deleting remote data which
> is a durability risk.
>
> The alternative you propose, by making the plugin the source of truth
> w.r.t. to the size of the remote log, can make it easier to avoid
> inconsistencies between plugin-managed metadata and the remote log
> from the perspective of Kafka. On the other hand, plugin vendors would
> have to implement it with the expected efficiency to have it yield
> benefits.
>
> 102.2) As you mentioned, the caching strategy in Kafka would still
> require one iteration over the list of rlmMetadata when the leadership
> of a topic-partition is assigned to a broker, while the plugin can
> offer alternative constant-time approaches. This calculation cannot be
> put on the LeaderAndIsr path and would be performed in the background.
> In case of bulk leadership migration, listing the rlmMetadata could a)
> result in request bursts to any backend system the plugin may use
> [which shouldn’t be a problem for high-throughput data stores but
> could have cost implications] b) increase utilisation timespan of the
> RLM threads for these calculations potentially leading to transient
> starvation of tasks queued for, typically, offloading operations c)
> could have a non-marginal CPU footprint on hardware with strict
>

Re: [VOTE] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-11-21 Thread John Roesler
I'm +1 (binding)

Thanks for the KIP!
-John

On 2022/11/17 21:06:29 Hao Li wrote:
> Hi all,
> 
> I would like start a vote on KIP-884:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams
> 
> 
> Thanks,
> Hao
> 


[jira] [Created] (KAFKA-14413) Separate MirrorMaker configurations for each connector

2022-11-21 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14413:
--

 Summary: Separate MirrorMaker configurations for each connector
 Key: KAFKA-14413
 URL: https://issues.apache.org/jira/browse/KAFKA-14413
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Reporter: Mickael Maison
Assignee: Mickael Maison


Currently all the MirrorMaker configurations are put together in the 
MirrorConnectorConfig. When using the /connector-plugins//config, it 
returns the configurations of all MirrorMaker connectors.

This also makes it hard to generate documentation for the connectors as it's 
not possible to link configurations to specific connectors.

We should split the configuration into different classes to address this issue.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14414) Remove unnecessary usage of ObjectSerializationCache

2022-11-21 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14414:


 Summary: Remove unnecessary usage of ObjectSerializationCache
 Key: KAFKA-14414
 URL: https://issues.apache.org/jira/browse/KAFKA-14414
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Divij Vaidya
 Fix For: 3.4.0
 Attachments: Screenshot 2022-11-21 at 19.23.53.png

We create an instance of ObjectSerializationCache  at 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L113]
 which does not get used at all. We always "add" to the cache but never 
retrieve from it (as is evident by the fact that we don't store the reference 
of the cache anywhere).

Adding information to the cache is expensive because it uses 
System.identityHashCode(Object) which is expensive as demonstrated by the flame 
graph of producer requests over Apache Kafka 3.3.1 plaintext broker. 
{{!Screenshot 2022-11-21 at 19.23.53.png!}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Snehashis
Hi Mickael. Thanks for your input. Addressing the point you mentioned
below.

> 1) Can you explain how this would work with the GET
> /{pluginName}/config endpoint? How do you specify a version for a
> connector?

This API returns the set of configurations for a given connector. Since
between versions
the configurations can change its allow a user given version to return the
correct
configs. The version is added as a query parameter, for example -
 /S3SinkConnector/config?version=v1.1.1.

> 2) Some connectors come bundled with transformations (for example
> Debezium). How would multiple versions of a transformation be handled?

The version of transformations bundled with a particular connector version
will be used
when the connector is run with the corresponding version number. There will
be implicit
isolation between the two transformation as they are part of two separate
plugins and will
be loaded using different plugin classloaders during connector creation.

> 3) You mention the latest version will be picked by default if not
> specified. The version() method returns a string and currently
> enforces no semantics on the value it returns. Can you clarify the new
> expected semantics and explain how versions will be compared
> (alphabetical, semantics versioning, something else?)

The plugin loading mechanism already compares connector versions (new
connectors are created with only the latest version though).
The comparison between version will remain the same as it is currently
and is done using maven artefact versioning plugin. It is a generic
versioning scheme
that supports semantic, alphabetical and combinations with support
additional modifiers
(like alpha, beta, release and snapshot build versions). Please refer to
this javadoc for the
full comparison method.
https://maven.apache.org/ref/3.5.2/maven-artifact/apidocs/org/apache/maven/artifact/versioning/ComparableVersion.html
.
I do not think its necessary to enforce a new semantic with the version.
IMO the existing
versioning scheme is appropriate and flexible enough for all code
versioning methods.


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Snehashis
Hi Mickael. Thanks for your input. Addressing the point you mentioned
below.

> 1) Can you explain how this would work with the GET
> /{pluginName}/config endpoint? How do you specify a version for a
> connector?

This API returns the set of configurations for a given connector. Since
between versions
the configurations can change its allow a user given version to return the
correct
configs. The version is added as a query parameter, for example -
 /S3SinkConnector/config?version=v1.1.1.

> 2) Some connectors come bundled with transformations (for example
> Debezium). How would multiple versions of a transformation be handled?

The version of transformations bundled with a particular connector version
will be used
when the connector is run with the corresponding version number. There will
be implicit
isolation between the two transformation as they are part of two separate
plugins and will
be loaded using different plugin classloaders during connector creation.

> 3) You mention the latest version will be picked by default if not
> specified. The version() method returns a string and currently
> enforces no semantics on the value it returns. Can you clarify the new
> expected semantics and explain how versions will be compared
> (alphabetical, semantics versioning, something else?)

The plugin loading mechanism already compares connector versions (new
connectors are created with only the latest version though).
The comparison between version will remain the same as it is currently
and is done using maven artefact versioning plugin. It is a generic
versioning scheme
that supports semantic, alphabetical and combinations with support
additional modifiers
(like alpha, beta, release and snapshot build versions). Please refer to
this javadoc for the
full comparison method.
https://maven.apache.org/ref/3.5.2/maven-artifact/apidocs/org/apache/maven/artifact/versioning/ComparableVersion.html
.
I do not think its necessary to enforce a new semantic with the version.
IMO the existing
versioning scheme is appropriate and flexible enough for all code
versioning methods.


On Mon, Nov 21, 2022 at 8:37 PM Mickael Maison 
wrote:

> Hi,
>
> Thanks for the KIP, this is something that could be really useful!
>
> 1) Can you explain how this would work with the GET
> /{pluginName}/config endpoint? How do you specify a version for a
> connector?
>
> 2) Some connectors come bundled with transformations (for example
> Debezium). How would multiple versions of a transformation be handled?
>
> 3) You mention the latest version will be picked by default if not
> specified. The version() method returns a string and currently
> enforces no semantics on the value it returns. Can you clarify the new
> expected semantics and explain how versions will be compared
> (alphabetical, semantics versioning, something else?)
>
> Thanks,
> Mickael
>
>
> On Mon, Nov 21, 2022 at 9:57 AM Snehashis 
> wrote:
> >
> > Hi all,
> >
> > I'd like to start a discussion thread on KIP-891: Running multiple
> versions
> > of a connector.
> >
> > The KIP aims to add the ability for the connect runtime to run multiple
> > versions of a connector.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
> >
> > Please take a look and let me know what you think.
> >
> > Thank you
> > Snehashis Pal
>


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1370

2022-11-21 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-11-21 Thread Matthias J. Sax

+1 (binding)

On 11/21/22 7:39 AM, John Roesler wrote:

I'm +1 (binding)

Thanks for the KIP!
-John

On 2022/11/17 21:06:29 Hao Li wrote:

Hi all,

I would like start a vote on KIP-884:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams


Thanks,
Hao



Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-11-21 Thread Chris Egerton
Hi Jorge,

Thanks for the updates, and apologies for the delay. The new diagram
directly under the "Proposed Changes" section is absolutely gorgeous!


Follow-ups:

RE 2: Good point. We can use the same level for these metrics, it's not a
big deal.

RE 3: As long as all the per-record metrics are kept at DEBUG level, it
should be fine to leave JMH benchmarking for a follow-up. If we want to add
new per-record, INFO-level metrics, I would be more comfortable with
including benchmarking as part of the testing plan for the KIP. One
possible compromise could be to propose that these features be merged at
DEBUG level, and then possibly upgraded to INFO level in the future pending
benchmarks to guard against performance degradation.

RE 4: I think for a true "end-to-end" metric, it'd be useful to include the
time taken by the task to actually deliver the record. However, with the
new metric names and descriptions provided in the KIP, I have no objections
with what's currently proposed, and a new "end-to-end" metric can be taken
on later in a follow-up KIP.

RE 6: You're right, existing producer metrics should be enough for now. We
can revisit this later if/when we add delivery-centric metrics for sink
tasks as well.

RE 7: The new metric names in the KIP LGTM; I don't see any need to expand
beyond those but if you'd still like to pursue others, LMK.


New thoughts:

One small thought: instead of "alias" in "alias="{transform_alias}" for the
per-transform metrics, could we use "transform"? IMO it's clearer since we
don't use "alias" in the names of transform-related properties, and "alias"
may be confused with the classloading term where you can use, e.g.,
"FileStreamSource" as the name of a connector class in a connector config
instead of "org.apache.kafka.connect.file.FileStreamSourceConnector".


Cheers,

Chris

On Fri, Nov 18, 2022 at 12:06 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks Mickael!
>
>
> On Wed, 9 Nov 2022 at 15:54, Mickael Maison 
> wrote:
>
> > Hi Jorge,
> >
> > Thanks for the KIP, it is a nice improvement.
> >
> > 1) The per transformation metrics still have a question mark next to
> > them in the KIP. Do you want to include them? If so we'll want to tag
> > them, we should be able to include the aliases in TransformationChain
> > and use them.
> >
>
> Yes, I have added the changes on TransformChain that will be needed to add
> these metrics.
>
>
> >
> > 2) I see no references to predicates. If we don't want to measure
> > their latency, can we say it explicitly?
> >
>
> Good question, I haven't considered these. Though as these are materialized
> as PredicatedTransformation, they should be covered by these changes.
> Adding a note about this.
>
>
> >
> > 3) Should we have sink-record-batch-latency-avg-ms? All other metrics
> > have both the maximum and average values.
> >
> >
> Good question. I will remove it and change the record latency from
> DEBUG->INFO as it already cover the maximum metric.
>
> Hope it's clearer now, let me know if there any additional feedback.
> Thanks!
>
>
>
> > Thanks,
> > Mickael
> >
> > On Thu, Oct 20, 2022 at 9:58 PM Jorge Esteban Quilcate Otoya
> >  wrote:
> > >
> > > Thanks, Chris! Great feedback! Please, find my comments below:
> > >
> > > On Thu, 13 Oct 2022 at 18:52, Chris Egerton 
> > wrote:
> > >
> > > > Hi Jorge,
> > > >
> > > > Thanks for the KIP. I agree with the overall direction and think this
> > would
> > > > be a nice improvement to Kafka Connect. Here are my initial thoughts
> > on the
> > > > details:
> > > >
> > > > 1. The motivation section outlines the gaps in Kafka Connect's task
> > metrics
> > > > nicely. I think it'd be useful to include more concrete details on
> why
> > > > these gaps need to be filled in, and in which cases additional
> metrics
> > > > would be helpful. One goal could be to provide enhanced monitoring of
> > > > production deployments that allows for cluster administrators to set
> up
> > > > automatic alerts for latency spikes and, if triggered, quickly
> > identify the
> > > > root cause of those alerts, reducing the time to remediation. Another
> > goal
> > > > could be to provide more insight to developers or cluster
> > administrators
> > > > who want to do performance testing on connectors in non-production
> > > > environments. It may help guide our decision making process to have a
> > > > clearer picture of the goals we're trying to achieve.
> > > >
> > >
> > > Agree. The Motivation section has been updated.
> > > Thanks for the examples, I see both of them being covered by the KIP.
> > > I see how these could give us a good distinction on whether to position
> > > some metrics at INFO or DEBUG level.
> > >
> > >
> > > > 2. If we're trying to address the alert-and-diagnose use case, it'd
> be
> > > > useful to have as much information as possible at INFO level, rather
> > than
> > > > forcing cluster administrators to possibly reconfigure a connector to
> > emit
> > > > DEBUG or T

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-21 Thread Matthias J. Sax

Thanks for the KIP Victoria. Very well written!


Couple of questions (many might just require to add some more details to 
the KIP):


 (1) Why does the new store not extend KeyValueStore, but StateStore? 
In the end, it's a KeyValueStore?


 (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't 
want to support IQ in this KIP, it might be good to add this interface 
right away to avoid complications for follow up KIPs? Or won't there by 
any complications anyway?


 (3) Why do we not have a `delete(key)` method? I am ok with not 
supporting all methods from existing KV-store, but a `delete(key)` seems 
to be fundamentally to have?


 (4a) Do we need `get(key)`? It seems to be the same as `get(key, 
MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for 
my own clarification (should we add something to the JavaDocs?).


 (4b) Should we throw an exception if a user queries out-of-bound 
instead of returning `null` (in `get(key,ts)`)?
  -> You put it into "rejected alternatives", and I understand your 
argument. Would love to get input from others about this question 
though. -- It seems we also return `null` for windowed stores, so maybe 
the strongest argument is to align to existing behavior? Or do we have 
case for which the current behavior is problematic?


 (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation 
discretion when this is the case)" -> Should we make it a stricter 
contract such that the user can reason about it better (there is WIP to 
make retention time a strict bound for windowed stores atm)
  -> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a 
strict bound, too.


 (5a) Do we need to expose `segmentInterval`? For windowed-stores, we 
also use segments but hard-code it to two (it was exposed in earlier 
versions but it seems not useful, even if we would be open to expose it 
again if there is user demand).


 (5b) JavaDocs says: "Performance degrades as more record versions for 
the same key are collected in a single segment. On the other hand, 
out-of-order writes and reads which access older segments may slow down 
if there are too many segments." -- Wondering if JavaDocs should make 
any statements about expected performance? Seems to be an implementation 
detail?


 (6) validTo timestamp is "exclusive", right? Ie, if I query 
`get(key,ts[=validToV1])` I would get `null` or the "next" record v2 
with validFromV2=ts?


 (7) The KIP says, that segments are stores in the same RocksDB -- for 
this case, how are efficient deletes handled? For windowed-store, we can 
just delete a full RocksDB.


 (8) Rejected alternatives: you propose to not return the validTo 
timestamp -- if we find it useful in the future to return it, would 
there be a clean path to change it accordingly?



-Matthias


On 11/16/22 9:57 PM, Victoria Xia wrote:

Hi everyone,

I have a proposal for introducing versioned state stores in Kafka Streams.
Versioned state stores are similar to key-value stores except they can
store multiple record versions for a single key. This KIP focuses on
interfaces only in order to limit the scope of the KIP.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores

Thanks,
Victoria



Re: [DISCUSS] KIP-890 Server Side Defense

2022-11-21 Thread Matthias J. Sax

Thanks for the KIP.

Couple of clarification questions (I am not a broker expert do maybe 
some question are obvious for others, but not for me with my lack of 
broker knowledge).




(10)


The delayed message case can also violate EOS if the delayed message comes in 
after the next addPartitionsToTxn request comes in. Effectively we may see a 
message from a previous (aborted) transaction become part of the next 
transaction.


What happens if the message come in before the next addPartitionsToTxn 
request? It seems the broker hosting the data partitions won't know 
anything about it and append it to the partition, too? What is the 
difference between both cases?


Also, it seems a TX would only hang, if there is no following TX that is 
either committer or aborted? Thus, for the case above, the TX might 
actually not hang (of course, we might get an EOS violation if the first 
TX was aborted and the second committed, or the other way around).



(20)


Of course, 1 and 2 require client-side changes, so for older clients, those 
approaches won’t apply.


For (1) I understand why a client change is necessary, but not sure why 
we need a client change for (2). Can you elaborate? -- Later you explain 
that we should send a DescribeTransactionRequest, but I am not sure why? 
Can't we not just do an implicit AddPartiitonToTx, too? If the old 
producer correctly registered the partition already, the TX-coordinator 
can just ignore it as it's an idempotent operation?



(30)


To cover older clients, we will ensure a transaction is ongoing before we write 
to a transaction


Not sure what you mean by this? Can you elaborate?


(40)


[the TX-coordinator] will write the prepare commit message with a bumped epoch 
and send WriteTxnMarkerRequests with the bumped epoch.


Why do we use the bumped epoch for both? It seems more intuitive to use 
the current epoch, and only return the bumped epoch to the producer?



(50) "Implicit AddPartitionToTransaction"

Why does the implicitly sent request need to be synchronous? The KIP 
also says



in case we need to abort and need to know which partitions


What do you mean by this?



we don’t want to write to it before we store in the transaction manager


Do you mean TX-coordinator instead of "manager"?


(60)

For older clients and ensuring that the TX is ongoing, you describe a 
race condition. I am not sure if I can follow here. Can you elaborate?




-Matthias



On 11/18/22 1:21 PM, Justine Olshan wrote:

Hey all!

I'd like to start a discussion on my proposal to add some server-side
checks on transactions to avoid hanging transactions. I know this has been
an issue for some time, so I really hope this KIP will be helpful for many
users of EOS.

The KIP includes changes that will be compatible with old clients and
changes to improve performance and correctness on new clients.

Please take a look and leave any comments you may have!

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
JIRA: https://issues.apache.org/jira/browse/KAFKA-14402

Thanks!
Justine



Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1371

2022-11-21 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Ashwin
Hi Snehasis,

This is a really useful feature and thanks for initiating this discussion.

I had the following questions -


1. Can you elaborate on the rejected alternatives ? Suppose connector
config is versioned and has a schema. Then a single plugin (whose
dependencies have not changed) can handle multiple config versions for the
same connector class.

2. Any plans to support assisted migration e.g if a user invokes "POST
connector/config?migrate=latest", the latest version __attempts__ to
transform the existing config to the newer version. This would require
adding a method like "boolean migrate(Version fromVersion)" to the
connector interface.

Thanks,
Ashwin

On Mon, Nov 21, 2022 at 2:27 PM Snehashis  wrote:

> Hi all,
>
> I'd like to start a discussion thread on KIP-891: Running multiple versions
> of a connector.
>
> The KIP aims to add the ability for the connect runtime to run multiple
> versions of a connector.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
>
> Please take a look and let me know what you think.
>
> Thank you
> Snehashis Pal
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1372

2022-11-21 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Snehashis
Thanks for the input Ashwin.

> 1. Can you elaborate on the rejected alternatives ? Suppose connector
> config is versioned and has a schema. Then a single plugin (whose
> dependencies have not changed) can handle multiple config versions for the
> same connector class.

IIUC (please correct me if I am wrong here), what you highlighted above, is
a versioning scheme for a connector config for the same connector (and not
different versions of a connector plugin). That is a somewhat tangential
problem. While it is definitely a useful feature to have, like a log to
check what changes were made over time to the config which might make it
easier to do rollbacks, it is not the focus here. Here by version we mean
to say what underlying version of the plugin should the given configuration
of the connector use. Perhaps it is better to change the name of the
parameter from connector.version to connector.plugin.version or
plugin.version if it was confusing. wdyt?

>  2. Any plans to support assisted migration e.g if a user invokes "POST
> connector/config?migrate=latest", the latest version __attempts__ to
> transform the existing config to the newer version. This would require
> adding a method like "boolean migrate(Version fromVersion)" to the
> connector interface.

This is an enhancement we can think of doing in future. Users can simply do
a PUT call with the updated config which has the updated version number.
The assisted mode could be handy as the user does not need to know the
config but beyond this it does not seem to justify its existence.

Regards
Snehashis

On Tue, Nov 22, 2022 at 10:50 AM Ashwin 
wrote:

> Hi Snehasis,
>
> This is a really useful feature and thanks for initiating this discussion.
>
> I had the following questions -
>
>
> 1. Can you elaborate on the rejected alternatives ? Suppose connector
> config is versioned and has a schema. Then a single plugin (whose
> dependencies have not changed) can handle multiple config versions for the
> same connector class.
>
> 2. Any plans to support assisted migration e.g if a user invokes "POST
> connector/config?migrate=latest", the latest version __attempts__ to
> transform the existing config to the newer version. This would require
> adding a method like "boolean migrate(Version fromVersion)" to the
> connector interface.
>
> Thanks,
> Ashwin
>
> On Mon, Nov 21, 2022 at 2:27 PM Snehashis 
> wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion thread on KIP-891: Running multiple
> versions
> > of a connector.
> >
> > The KIP aims to add the ability for the connect runtime to run multiple
> > versions of a connector.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
> >
> > Please take a look and let me know what you think.
> >
> > Thank you
> > Snehashis Pal
> >
>