[jira] [Resolved] (KAFKA-14209) Optimize stream stream self join to use single state store

2023-04-11 Thread Vicky Papavasileiou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vicky Papavasileiou resolved KAFKA-14209.
-
Resolution: Implemented

> Optimize stream stream self join to use single state store
> --
>
> Key: KAFKA-14209
> URL: https://issues.apache.org/jira/browse/KAFKA-14209
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vicky Papavasileiou
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> For stream-stream joins that join the same source, we can omit one state 
> store since they contain the same data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14731) Upgrade ZooKeeper to 3.6.4

2023-04-11 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14731.

Resolution: Fixed

> Upgrade ZooKeeper to 3.6.4
> --
>
> Key: KAFKA-14731
> URL: https://issues.apache.org/jira/browse/KAFKA-14731
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.0.2, 3.1.2, 3.4.0, 3.2.3, 3.3.2, 3.5.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3
>
>
> We have https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14661 
> opened to upgrade ZooKeeper from 3.6.3 to 3.8.1, and that will likely be 
> actioned in time for 3.5.0.  But in the meantime, ZooKeeper 3.6.4 has been 
> released, so we should take the patch version bump in trunk now and also 
> apply the bump to the next patch releases of 3.0, 3.1, 3.2, 3.3, and 3.4.
> Note that KAFKA-14661 should *not* be applied to branches prior to trunk (and 
> presumably 3.5).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-11 Thread Sagar
Hi Chris,

Thanks for your detailed feedback!

nits: I have taken care of them now. Thanks for pointing those out.

non-nits:

6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> only use case for being able to emit offsets without also emitting source
> records that's been identified so far is for CDC source connectors like
> Debezium.


I am aware of atleast one more case where the non production of offsets
(due to non production of records ) leads to the failure of connectors when
the source purges the records of interest. This happens in File based
source connectors  (like s3/blob storage ) in which if the last record from
a file is fiterterd due to an SMT, then that particular file is never
committed to the source partition and eventually when the file is deleted
from the source and the connector is restarted due to some reason, it fails.
Moreover, I feel the reason this support should be there in the Kafka
Connect framework is because this is a restriction of the framework and
today the framework provides no support for getting around this limitation.
Every connector has it's own way of handling offsets and having each
connector handle this restriction in its own way can make it complex.
Whether we choose to do it the way this KIP prescribes or any other way is
up for debate but IMHO, the framework should provide a way of
getting around this limitation.

7. If a task produces heartbeat records and source records that use the
> same source partition, which offset will ultimately be committed?


The idea is to add the records returned by the `produceHeartbeatRecords`
to  the same `toSend` list within `AbstractWorkerSourceTask#execute`. The
`produceHeartbeatRecords` would be invoked before we make the `poll` call.
Hence, the offsets committed would be in the same order in which they would
be written. Note that, the onus is on the Connector implementation to not
return records which can lead to data loss or data going out of order. The
framework would just commit based on whatever is supplied. Also, AFAIK, 2
`normal` source records can also produce the same source partitions and
they are committed in the order in which they are written.

8. The SourceTask::produceHeartbeatRecords method returns a
> List, and users can control the heartbeat topic for a
> connector via the (connector- or worker-level) "heartbeat.records.topic"
> property. Since every constructor for the SourceRecord class [2] requires a
> topic to be supplied, what will happen to that topic? Will it be ignored?
> If so, I think we should look for a cleaner solution.


Sorry, I couldn't quite follow which topic will be ignored in this case.

9. A large concern raised in the discussion for KAFKA-3821 was the allowing
> connectors to control the ordering of these special "offsets-only"
> emissions and the regular source records returned from SourceTask::poll.
> Are we choosing to ignore that concern? If so, can you add this to the
> rejected alternatives section along with a rationale?


One thing to note is that the for every connector, the condition to emit
the heartbeat record is totally up to the connector, For example, for a
connector which is tracking transactions for an ordered log, if there are
open transactions, it might not need to emit heartbeat records when the
timer expires while for file based connectors, if the same file is being
processed again and again due to an SMT or some other reasons, then it can
choose to emit that partition. The uber point here is that every connector
has it's own requirements and the framework can't really make an assumption
about it. What the KIP is trying to do is to provide a mechanism to the
connector to commit new offsets. With this approach, as far as I can think
so far, there doesn't seem to be a case of out of order processing. If you
have other concerns/thoughts I would be happy to know them.

10. If, sometime in the future, we wanted to add framework-level support
> for sending heartbeat records that doesn't require connectors to implement
> any new APIs...


The main purpose of producing heartbeat records is to be able to emit
offsets w/o any new records. We are using heartbeat records to solve the
primary concern of offsets getting stalled. The reason to do that was once
we get SourceRecords, then the rest of the code is already in place to
write it to a topic of interest and commit offsets and that seemed the most
non invasive in terms of framework level changes. If in the future we want
to do a framework-only heartbeat record support, then this would create
confusion as you pointed out. Do you think the choice of the name heartbeat
records is creating confusion in this case? Maybe we can call these special
records something else (not sure what at this point) which would then
decouple the 2 logically and implementation wise as well?

Thanks!
Sagar.

On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton 
wrote:

> Hi Sagar,
>
> Thanks for the KIP! I have some thoughts.
>
> 

[jira] [Created] (KAFKA-14890) Kafka initiates shutdown due to connectivity problem with Zookeeper and FatalExitError from ChangeNotificationProcessorThread

2023-04-11 Thread Denis Razuvaev (Jira)
Denis Razuvaev created KAFKA-14890:
--

 Summary: Kafka initiates shutdown due to connectivity problem with 
Zookeeper and FatalExitError from ChangeNotificationProcessorThread
 Key: KAFKA-14890
 URL: https://issues.apache.org/jira/browse/KAFKA-14890
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: Denis Razuvaev


Hello, 

We have faced several times the deadlock in Kafka, the similar issue is - 
https://issues.apache.org/jira/browse/KAFKA-13544 

The question - is it expected behavior that Kafka decided to shut down due to 
connectivity problems with Zookeeper? Seems like it is related to the inability 
to read data from */feature* Zk node and the _ZooKeeperClientExpiredException_ 
thrown from _ZooKeeperClient_ class. This exception is thrown and it is caught 
only in catch block of _doWork()_ method in 
{_}ChangeNotificationProcessorThread{_}, and it leads to {_}FatalExitError{_}. 

This problem with shutdown is reproduced in the new versions of Kafka (which 
already have fix regarding deadlock from 13544). 
It is hard to write a synthetic test to reproduce problem, but it can be 
reproduced locally via debug mode with the following steps: 
1) Start Zookeeper and start Kafka in debug mode. 
2) Emulate connectivity problem between Kafka and Zookeeper, for example 
connection can be closed via Netcrusher library. 
3) Put a breakpoint in _updateLatestOrThrow()_ method in _FeatureCacheUpdater_ 
class, before _zkClient.getDataAndVersion(featureZkNodePath)_ line execution. 
4) Restore connection between Kafka and Zookeeper after session expiration. 
Kafka execution should be stopped on the breakpoint.
5) Resume execution until Kafka starts to execute line 
_zooKeeperClient.handleRequests(remainingRequests)_ in 
_retryRequestsUntilConnected_ method in _KafkaZkClient_ class. 
6) Again emulate connectivity problem between Kafka and Zookeeper and wait 
until session will be expired. 
7) Restore connection between Kafka and Zookeeper. 
8) Kafka begins shutdown process, due to: 
_ERROR [feature-zk-node-event-process-thread]: Failed to process feature ZK 
node change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)_
 

The following problems on the real environment can be caused by some network 
problems and periodic disconnection and connection to the Zookeeper in a short 
time period. 

I started mail thread in 
[https://lists.apache.org/thread/gbk4scwd8g7mg2tfsokzj5tjgrjrb9dw] regarding 
this problem, but have no answers.

For me it seems like defect, because Kafka initiates shutdown after restoring 
connection between Kafka and Zookeeper, and should be fixed. 

Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Lucas Brutschy
Hi,

No concerns at all, just a clarifying question from my side: for
detecting out-of-order records, I need both new and old timestamp, I
suppose I get it for the new record via timestamp extractor, can I not
get it the same way from the old record that is passed down to the
aggregation after KIP-904?

Thanks,
Lucas

On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax  wrote:
>
> Thanks.
>
> One question: for the repartition topic format change, do we want to
> re-use flag=2, or should we introduce flag=3, and determine when
> compiling the DSL into the Topology if we want/need to include the
> timestamp, and if not, use format version=2 to avoid unnecessary overhead?
>
>
> -Matthias
>
> On 4/10/23 5:47 PM, Victoria Xia wrote:
> > Hi everyone,
> >
> > While wrapping up the implementation for KIP-914, I have discovered that
> > two more DSL processors require semantic updates in the presence of
> > versioned tables:
> >
> > - The table filter processor has an optimization to drop nulls if the
> > previous filtered value is also null. When the upstream table is 
> > versioned,
> > this optimization should be disabled in order to preserve proper version
> > history in the presence of out-of-order data.
> > - When performing an aggregation over a versioned table, only the latest
> > value by timestamp (per key) should be included in the final aggregate
> > value. This is not happening today in the presence of out-of-order data,
> > due to the way that TableSourceNodes call `get(key)` in order to 
> > determine
> > the "old value" which is to be removed from the aggregate as part of
> > applying an update. To fix this, aggregations should ignore out-of-order
> > records when aggregating versioned tables.
> >- In order to implement this change, table aggregate processors need
> >a way to determine whether a record is out-of-order or not. This
> > cannot be
> >done by querying the source table value getter as that store belongs 
> > to a
> >different subtopology (because a repartition occurs before
> > aggregation). As
> >such, an additional timestamp must be included in the repartition 
> > topic.
> >The 3.5 release already includes an update to the repartition
> > topic format
> >(with upgrade implications properly handled) via KIP-904
> >
> > ,
> >so making an additional change to the repartition topic format to 
> > add a
> >timestamp comes at no additional cost to users.
> >
> >
> > I have updated the KIP
> > 
> > itself with more detail about each of these changes. Please let me know if
> > there are any concerns. In the absence of dissent, I'd like to include
> > these changes along with the rest of KIP-914 in the 3.5 release.
> >
> > Apologies for not noticing these additional semantics implications earlier,
> > Victoria
> >
> > -- Forwarded message -
> > From: Victoria Xia 
> > Date: Wed, Mar 22, 2023 at 10:08 AM
> > Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
> > To: 
> >
> >
> > Thanks for voting, everyone! We have three binding yes votes with no
> > objections during four full days of voting. I will close the vote and mark
> > the KIP as accepted, right in time for the 3.5 release.
> >
> > Thanks,
> > Victoria
> >
> > On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:
> >
> >> +1 (binding)
> >>
> >> Thanks Victoria!
> >>
> >> Best,
> >> Bruno
> >>
> >> On 20.03.23 17:13, Matthias J. Sax wrote:
> >>> +1 (binding)
> >>>
> >>> On 3/20/23 9:05 AM, Guozhang Wang wrote:
>  +1, thank you Victoria!
> 
>  On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
>   wrote:
> >
> > Hi all,
> >
> > I'd like to start a vote on KIP-914 for updating the Kafka Streams join
> > processors to use proper timestamp-based semantics in applications with
> > versioned stores:
> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >
> > To avoid compatibility concerns, I'd like to include the changes from
> > this
> > KIP together with KIP-889
> > <
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>
> > (for introducing versioned stores) in the upcoming 3.5 release. I will
> > close the vote on the 3.5 KIP deadline, March 22, if there are no
> > objections before then.
> >
> > Thanks,
> > Victoria
> >>
> >


[jira] [Created] (KAFKA-14891) Fix rack-aware range assignor to improve rack-awareness with co-partitioning

2023-04-11 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-14891:
--

 Summary: Fix rack-aware range assignor to improve rack-awareness 
with co-partitioning
 Key: KAFKA-14891
 URL: https://issues.apache.org/jira/browse/KAFKA-14891
 Project: Kafka
  Issue Type: Bug
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


We currently check all states for rack-aware assignment with co-partitioning 
([https://github.com/apache/kafka/blob/396536bb5aa1ba78c71ea824d736640b615bda8a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L176).]
 We should check each group of co-partitioned states separately so that we can 
use rack-aware assignment with co-partitioning for subsets of topics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14783) Implement new STOPPED state for connectors

2023-04-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14783.
---
Fix Version/s: 3.5.0
   Resolution: Done

> Implement new STOPPED state for connectors
> --
>
> Key: KAFKA-14783
> URL: https://issues.apache.org/jira/browse/KAFKA-14783
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> Implement the {{STOPPED}} state [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Kafka's problems

2023-04-11 Thread Ignacio Piedrafita
Hi, I try to install kafka in a raspberry but the terminal of linux say me:
Error: It hasn't find or load the principal class 
org.apache.zookeeper.server.quorum.QuorumPeer.Main

I'd like that you help me,  solving my problem.

I wait your response as soon as you can.


Re: [VOTE] KIP-875: First-class offsets support in Kafka Connect

2023-04-11 Thread Chris Egerton
Hi all,

A couple slight tweaks to the design have been proposed during
implementation and I'd like to report them here to make sure that they're
acceptable to all who previously voted for this KIP. I've updated the KIP
to include these changes but will be happy to revert and/or amend if there
are any concerns.

1. We would like to refrain from using a transaction when resetting source
connector offsets in the worker's global offsets topic when exactly-once
support is enabled. We would continue to use a transaction when resetting
offsets in the connector's offsets topic. Discussed in [1].

2. We would like to use a transactional ID of ${groupId}-${connector} to
alter/reset source connector offsets when exactly-once support is enabled,
where ${groupId} is the group ID of the Connect cluster and ${connector} is
the name of the connector. This is raised here because it would introduce
an additional ACL requirement for this API. A less-elegant alternative that
would obviate the additional ACL requirement is to use the transactional ID
that would be used by task 0 of the connector, but this may be confusing to
users as it could indicate that the task is actually running. Discussed in
[2].

[1] - https://github.com/apache/kafka/pull/13465/#issuecomment-1486718538
[2] - https://github.com/apache/kafka/pull/13465/#discussion_r1159694956

Cheers,

Chris

On Fri, Mar 3, 2023 at 10:22 AM Chris Egerton  wrote:

> Hi all,
>
> Thanks for the votes! I'll cast a final +1 myself and close the vote out.
>
> This KIP passes with the following +1 votes (and no +0 or -1 votes):
>
> • Greg Harris
> • Yash Mayya
> • Knowles Atchison Jr
> • Mickael Maison (binding)
> • Tom Bentley (binding)
> • Josep Prat (binding)
> • Chris Egerton (binding, author)
>
> I'll write up Jira tickets and begin implementing things next week.
>
> Cheers,
>
> Chris
>
> On Fri, Mar 3, 2023 at 10:07 AM Josep Prat 
> wrote:
>
>> Hi Chris,
>>
>> Thanks for the KIP. I have a non-blocking comment on the DISCUSS thread.
>>
>> +1 (binding).
>>
>> Best,
>>
>> On Wed, Mar 1, 2023 at 12:16 PM Tom Bentley  wrote:
>>
>> > Hi Chris,
>> >
>> > Thanks for the KIP.
>> >
>> > +1 (binding).
>> >
>> > Cheers,
>> >
>> > Tom
>> >
>> > On Wed, 15 Feb 2023 at 16:11, Chris Egerton 
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > Thanks to everyone who's voted so far! Just wanted to bump this thread
>> > and
>> > > see if we could get a few more votes; currently we're at +3
>> non-binding
>> > > and +1 binding. Hoping we can get this approved, reviewed, and merged
>> in
>> > > time for 3.5.0.
>> > >
>> > > Cheers,
>> > >
>> > > Chris
>> > >
>> > > On Tue, Jan 31, 2023 at 2:52 AM Mickael Maison <
>> mickael.mai...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > Thanks Chris for the KIP, this is a much needed feature!
>> > > >
>> > > > +1 (binding)
>> > > >
>> > > >
>> > > > On Tue, Jan 24, 2023 at 3:45 PM Knowles Atchison Jr
>> > > >  wrote:
>> > > > >
>> > > > > +1 (non binding)
>> > > > >
>> > > > > On Tue, Jan 24, 2023 at 5:24 AM Yash Mayya 
>> > > wrote:
>> > > > >
>> > > > > > Hi Chris,
>> > > > > >
>> > > > > > I'm +1 (non-binding). Thanks again for proposing this extremely
>> > > > > > valuable addition to Kafka Connect!
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Yash
>> > > > > >
>> > > > > > On Thu, Jan 19, 2023 at 12:11 AM Chris Egerton
>> > > > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > > I'd like to call for a vote on KIP-875, which adds support for
>> > > > viewing
>> > > > > > and
>> > > > > > > manipulating the offsets of connectors to the Kafka Connect
>> REST
>> > > API.
>> > > > > > >
>> > > > > > > The KIP:
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
>> > > > > > >
>> > > > > > > The discussion thread:
>> > > > > > >
>> https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > >
>> > > > > > > Chris
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> >
>>
>>
>> --
>> [image: Aiven] 
>>
>> *Josep Prat*
>> Open Source Engineering Director, *Aiven*
>> josep.p...@aiven.io   |   +491715557497
>> aiven.io    |   <
>> https://www.facebook.com/aivencloud>
>>      <
>> https://twitter.com/aiven_io>
>> *Aiven Deutschland GmbH*
>> Alexanderufer 3-7, 10117 Berlin
>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> Amtsgericht Charlottenburg, HRB 209739 B
>>
>


[jira] [Created] (KAFKA-14892) Harmonize package names in storage module

2023-04-11 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-14892:
---

 Summary: Harmonize package names in storage module
 Key: KAFKA-14892
 URL: https://issues.apache.org/jira/browse/KAFKA-14892
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma


We currently have:
 # org.apache.kafka.server.log.remote.storage: public api in storage-api module
 # org.apache.kafka.server.log.remote: private api in storage module
 # org.apache.kafka.storage.internals.log: private api in storage module

A way to make this consistent could be:
 # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api in 
storage-api module
 # org.apache.kafka.storage.internals.log.remote: private api in storage module
 # org.apache.kafka.storage.internals.log: private api in storage module (stays 
the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14893) Public API for reporting Yammer metrics

2023-04-11 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14893:
--

 Summary: Public API for reporting Yammer metrics
 Key: KAFKA-14893
 URL: https://issues.apache.org/jira/browse/KAFKA-14893
 Project: Kafka
  Issue Type: Improvement
  Components: core, metrics
Reporter: Mickael Maison
Assignee: Mickael Maison


Server side metrics registered via the Yammer library are currently exposed via 
the KafkaMetricsReporter interface. This is configured by setting 
kafka.metrics.reporters in the server configuration.

However the interface is defined in Scala in the core module so it is not part 
of the public API. This API also assumes implementations can access 
KafkaYammerMetrics.defaultRegistry(), which is also not part of the public API, 
in order to report metrics.

Also this API should support reconfigurable configurations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1748

2023-04-11 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-14894) MetadataLoader must call finishSnapshot after loading a snapshot

2023-04-11 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14894:


 Summary: MetadataLoader must call finishSnapshot after loading a 
snapshot
 Key: KAFKA-14894
 URL: https://issues.apache.org/jira/browse/KAFKA-14894
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Matthias J. Sax
If we send old and new value as two messages, this should work I guess? 
Victory could confirm. -- But not if we send old/new as a single message 
in case the new-key does not change?


-Matthias

On 4/11/23 5:25 AM, Lucas Brutschy wrote:

Hi,

No concerns at all, just a clarifying question from my side: for
detecting out-of-order records, I need both new and old timestamp, I
suppose I get it for the new record via timestamp extractor, can I not
get it the same way from the old record that is passed down to the
aggregation after KIP-904?

Thanks,
Lucas

On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax  wrote:


Thanks.

One question: for the repartition topic format change, do we want to
re-use flag=2, or should we introduce flag=3, and determine when
compiling the DSL into the Topology if we want/need to include the
timestamp, and if not, use format version=2 to avoid unnecessary overhead?


-Matthias

On 4/10/23 5:47 PM, Victoria Xia wrote:

Hi everyone,

While wrapping up the implementation for KIP-914, I have discovered that
two more DSL processors require semantic updates in the presence of
versioned tables:

 - The table filter processor has an optimization to drop nulls if the
 previous filtered value is also null. When the upstream table is versioned,
 this optimization should be disabled in order to preserve proper version
 history in the presence of out-of-order data.
 - When performing an aggregation over a versioned table, only the latest
 value by timestamp (per key) should be included in the final aggregate
 value. This is not happening today in the presence of out-of-order data,
 due to the way that TableSourceNodes call `get(key)` in order to determine
 the "old value" which is to be removed from the aggregate as part of
 applying an update. To fix this, aggregations should ignore out-of-order
 records when aggregating versioned tables.
- In order to implement this change, table aggregate processors need
a way to determine whether a record is out-of-order or not. This
cannot be
done by querying the source table value getter as that store belongs to 
a
different subtopology (because a repartition occurs before
aggregation). As
such, an additional timestamp must be included in the repartition topic.
The 3.5 release already includes an update to the repartition
topic format
(with upgrade implications properly handled) via KIP-904

,
so making an additional change to the repartition topic format to add a
timestamp comes at no additional cost to users.


I have updated the KIP

itself with more detail about each of these changes. Please let me know if
there are any concerns. In the absence of dissent, I'd like to include
these changes along with the rest of KIP-914 in the 3.5 release.

Apologies for not noticing these additional semantics implications earlier,
Victoria

-- Forwarded message -
From: Victoria Xia 
Date: Wed, Mar 22, 2023 at 10:08 AM
Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
To: 


Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:


+1 (binding)

Thanks Victoria!

Best,
Bruno

On 20.03.23 17:13, Matthias J. Sax wrote:

+1 (binding)

On 3/20/23 9:05 AM, Guozhang Wang wrote:

+1, thank you Victoria!

On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
 wrote:


Hi all,

I'd like to start a vote on KIP-914 for updating the Kafka Streams join
processors to use proper timestamp-based semantics in applications with
versioned stores:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores


To avoid compatibility concerns, I'd like to include the changes from
this
KIP together with KIP-889
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores



(for introducing versioned stores) in the upcoming 3.5 release. I will
close the vote on the 3.5 KIP deadline, March 22, if there are no
objections before then.

Thanks,
Victoria






Re: [VOTE] KIP-875: First-class offsets support in Kafka Connect

2023-04-11 Thread Greg Harris
Chris & Yash,

1. Since the global offsets topic does not have transactions on it already,
I don't think adding transactions just for these reset operations would be
an improvement. The transactional produce would not exclude other
non-transactional producers, but hanging transactions on the global offsets
topic would negatively impact the general cluster health. Your proposed
strategy seems reasonable to me.

2. While it may be the connector performing the offset reset and not the
task, I think it would be preferable for the connector to use task 0's
task-id and 'impersonate' the task for the purpose of changing the offsets.
I think the complication elsewhere (getting users to provide a new ACL,
expanding fencing to also fence the connector transaction id, etc) is not
practically worth it to change 1 string value in the logs.
I would find a separate transaction ID beneficial if the connector could be
given a different principal from the task, and be given distinct ACLs.
However, I don't think this is possible or desirable, and so I don't think
it's relevant right now. Let me know if there are any other ways that the
connector transaction ID would be useful.

Thanks for all the effort on this feature!
Greg

On Tue, Apr 11, 2023 at 7:52 AM Chris Egerton 
wrote:

> Hi all,
>
> A couple slight tweaks to the design have been proposed during
> implementation and I'd like to report them here to make sure that they're
> acceptable to all who previously voted for this KIP. I've updated the KIP
> to include these changes but will be happy to revert and/or amend if there
> are any concerns.
>
> 1. We would like to refrain from using a transaction when resetting source
> connector offsets in the worker's global offsets topic when exactly-once
> support is enabled. We would continue to use a transaction when resetting
> offsets in the connector's offsets topic. Discussed in [1].
>
> 2. We would like to use a transactional ID of ${groupId}-${connector} to
> alter/reset source connector offsets when exactly-once support is enabled,
> where ${groupId} is the group ID of the Connect cluster and ${connector} is
> the name of the connector. This is raised here because it would introduce
> an additional ACL requirement for this API. A less-elegant alternative that
> would obviate the additional ACL requirement is to use the transactional ID
> that would be used by task 0 of the connector, but this may be confusing to
> users as it could indicate that the task is actually running. Discussed in
> [2].
>
> [1] - https://github.com/apache/kafka/pull/13465/#issuecomment-1486718538
> [2] - https://github.com/apache/kafka/pull/13465/#discussion_r1159694956
>
> Cheers,
>
> Chris
>
> On Fri, Mar 3, 2023 at 10:22 AM Chris Egerton  wrote:
>
> > Hi all,
> >
> > Thanks for the votes! I'll cast a final +1 myself and close the vote out.
> >
> > This KIP passes with the following +1 votes (and no +0 or -1 votes):
> >
> > • Greg Harris
> > • Yash Mayya
> > • Knowles Atchison Jr
> > • Mickael Maison (binding)
> > • Tom Bentley (binding)
> > • Josep Prat (binding)
> > • Chris Egerton (binding, author)
> >
> > I'll write up Jira tickets and begin implementing things next week.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Mar 3, 2023 at 10:07 AM Josep Prat 
> > wrote:
> >
> >> Hi Chris,
> >>
> >> Thanks for the KIP. I have a non-blocking comment on the DISCUSS thread.
> >>
> >> +1 (binding).
> >>
> >> Best,
> >>
> >> On Wed, Mar 1, 2023 at 12:16 PM Tom Bentley 
> wrote:
> >>
> >> > Hi Chris,
> >> >
> >> > Thanks for the KIP.
> >> >
> >> > +1 (binding).
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> > On Wed, 15 Feb 2023 at 16:11, Chris Egerton 
> >> > wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Thanks to everyone who's voted so far! Just wanted to bump this
> thread
> >> > and
> >> > > see if we could get a few more votes; currently we're at +3
> >> non-binding
> >> > > and +1 binding. Hoping we can get this approved, reviewed, and
> merged
> >> in
> >> > > time for 3.5.0.
> >> > >
> >> > > Cheers,
> >> > >
> >> > > Chris
> >> > >
> >> > > On Tue, Jan 31, 2023 at 2:52 AM Mickael Maison <
> >> mickael.mai...@gmail.com
> >> > >
> >> > > wrote:
> >> > >
> >> > > > Thanks Chris for the KIP, this is a much needed feature!
> >> > > >
> >> > > > +1 (binding)
> >> > > >
> >> > > >
> >> > > > On Tue, Jan 24, 2023 at 3:45 PM Knowles Atchison Jr
> >> > > >  wrote:
> >> > > > >
> >> > > > > +1 (non binding)
> >> > > > >
> >> > > > > On Tue, Jan 24, 2023 at 5:24 AM Yash Mayya <
> yash.ma...@gmail.com>
> >> > > wrote:
> >> > > > >
> >> > > > > > Hi Chris,
> >> > > > > >
> >> > > > > > I'm +1 (non-binding). Thanks again for proposing this
> extremely
> >> > > > > > valuable addition to Kafka Connect!
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Yash
> >> > > > > >
> >> > > > > > On Thu, Jan 19, 2023 at 12:11 AM Chris Egerton
> >> > >  >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi all,
> >> > > > > > >
> >> > > > > > > I

Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Guozhang Wang
Thanks Victoria.

1) I have no concerns about the filter operator's proposed semantics.

2) For aggregations, I have meta question in mind to discuss first,
which is for those operators that generate a table, which is
materialized as versioned, how we should emit out of order data if the
operator still needs to. A quick look at the following operations
today:

a. Read topic as a table source
b. KStream#toTable
c. Aggregated from a stream
d. Aggregated from a table
e. From a table-table join.
f. From a stateless table operator like a `table.filter`.

Per this KIP, case d) and e) should not emit out-of-order records to
the downstream any more, so we only need to consider the others: today
when we send out the old/new pairs downstream, the old value is just
the value read right before being overwritten from the materialized
store. If the store is versioned, however, then this old value is
already been sent before as part of the new/old pair, so it's actually
correct to just indicate the old/new pair as just the out-of-order
record itself? More specifically: say given a table source operator,
with the topic's incoming records for the same key:

(A, t10), (B, t20), (C, t15)

If the store is not versioned, we would emit, in the form of old/new:

A10/null, B20/A10, C15/B20

While if there's no such out-of-ordering, the ideal emit ordering should be:

A10/null, C15/A10, B20/C15

So I'm thinking, if the store is versioned, we should try to emit in a
way that is as coherent with the ideal ordering as possible, for the
downstream operators to handle:

A10/null, B20/A10, C15/null, null/C15 (or to be succinct, just
A10/null, B20/A10, C15/C15)

This is because, the A10 is already sent as part of the B20/A10 before
C comes, in order for downstream operators to negate its effect; so
when C comes, we only need to let the downstream know that "there was
a C coming at t15 between A at 10 and B at 20, which is already
obsoleted because of the later B20 that I sent you before".

This gives the underlying operator the correct information, which can
handle it accordingly:

* For aggregate operators, it can simply ignore the C15/C15 from upstream.
* For stateless operators, it just apply the filter still on C15/C15
and forward downwards.
* For join operators, as this KIP indicated, it would apply the join
if necessary and not emit the older join results.

If we can do that, then maybe we do not even need to change the
repartition topic format again?


Guozhang

On Tue, Apr 11, 2023 at 11:17 AM Matthias J. Sax  wrote:
>
> If we send old and new value as two messages, this should work I guess?
> Victory could confirm. -- But not if we send old/new as a single message
> in case the new-key does not change?
>
> -Matthias
>
> On 4/11/23 5:25 AM, Lucas Brutschy wrote:
> > Hi,
> >
> > No concerns at all, just a clarifying question from my side: for
> > detecting out-of-order records, I need both new and old timestamp, I
> > suppose I get it for the new record via timestamp extractor, can I not
> > get it the same way from the old record that is passed down to the
> > aggregation after KIP-904?
> >
> > Thanks,
> > Lucas
> >
> > On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax  wrote:
> >>
> >> Thanks.
> >>
> >> One question: for the repartition topic format change, do we want to
> >> re-use flag=2, or should we introduce flag=3, and determine when
> >> compiling the DSL into the Topology if we want/need to include the
> >> timestamp, and if not, use format version=2 to avoid unnecessary overhead?
> >>
> >>
> >> -Matthias
> >>
> >> On 4/10/23 5:47 PM, Victoria Xia wrote:
> >>> Hi everyone,
> >>>
> >>> While wrapping up the implementation for KIP-914, I have discovered that
> >>> two more DSL processors require semantic updates in the presence of
> >>> versioned tables:
> >>>
> >>>  - The table filter processor has an optimization to drop nulls if the
> >>>  previous filtered value is also null. When the upstream table is 
> >>> versioned,
> >>>  this optimization should be disabled in order to preserve proper 
> >>> version
> >>>  history in the presence of out-of-order data.
> >>>  - When performing an aggregation over a versioned table, only the 
> >>> latest
> >>>  value by timestamp (per key) should be included in the final 
> >>> aggregate
> >>>  value. This is not happening today in the presence of out-of-order 
> >>> data,
> >>>  due to the way that TableSourceNodes call `get(key)` in order to 
> >>> determine
> >>>  the "old value" which is to be removed from the aggregate as part of
> >>>  applying an update. To fix this, aggregations should ignore 
> >>> out-of-order
> >>>  records when aggregating versioned tables.
> >>> - In order to implement this change, table aggregate processors 
> >>> need
> >>> a way to determine whether a record is out-of-order or not. This
> >>> cannot be
> >>> done by querying the source table value getter as that store 
> >>> belongs 

Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Victoria Xia
Thanks for your comments and suggestions, Matthias, Lucas, and Guozhang!

I was just in the process of responding when I saw Guozhang's message. I
came up with a different approach to simplify my proposal with regards to
the table aggregate processor, as part of mulling over comments from
Matthias and Lucas: when aggregating a versioned table, instead of dropping
out-of-order records at the aggregate processor (after the repartition), we
can equivalently drop them before the repartition at the repartition map
processor. With this new approach, no changes to the repartition topic
format are necessary as part of this KIP.

As an example, consider a table aggregation which counts the number of
appearances of each value of the table. The repartition prior to the
aggregation will map values (pre-repartition) to keys (post-repartition) as
part of the groupBy(), in order for subsequent aggregation over the
(original) values to occur.

source record --> repartition record(s)
(key=k, value=v1, ts=2) --> (key=v1, newValue=v1, oldValue=null, newTs=2,
oldTs=-1)
(key=k, value=v2, ts=1) --> (key=v2, newValue=v2, oldValue=null, newTs=1,
oldTs=2), (key=v1, newValue=null, oldValue=v1, newTs=1, oldTs=2)

Under the old proposal, the aggregate processor would see the last two
repartition records and drop them as out-of-order (because newTs is older
than oldTs). Under the new proposal, the repartition map processor would
not send any repartition records upon seeing the second source record (with
value=v2) because its timestamp is older than the latest timestamp seen for
the key (k).

This new approach is simpler than what's currently written in the KIP
because no repartition topic format change is required, and also saves on
unnecessary repartition topic records.

In comparing this suggestion to Guozhang's suggestion:

   - A main difference between the two proposals is that under this
   proposal, the "old value" when an out-of-order record is forwarded
   downstream from a versioned table would be the current latest (by
   timestamp) value, rather than the same as the new value (as in Guozhang's
   proposal).
   - In both proposals, the various processors (table aggregate, joins,
   suppress) actually do not need to call get(key) on the materialization to
   determine whether the record is out-of-order or not. If the "old value" is
   the latest record, then a timestamp comparison would suffice (assuming that
   the old record timestamp is added into the `Change` object). If the "old
   value" is the same as the "new value", then an equality check on the two
   values is sufficient.
   - In both proposals, the repartition topic format does not need to be
   updated.

I think regardless of which of the two implementations we go with, the net
effect will be hidden from users, in which case it may be better to discuss
which to pick as part of implementation rather than on this KIP itself. (I
can tell I will need more time to process the tradeoffs :-) ) Regardless, I
will update the KIP to reflect that no repartition topic format change is
required, which is indeed a great simplification.

> for the repartition topic format change, do we want to re-use flag=2, or
should we introduce flag=3, and determine when compiling the DSL into the
Topology if we want/need to include the timestamp, and if not, use format
version=2 to avoid unnecessary overhead?

I believe the new proposal above is even more efficient in terms of
avoiding unnecessary overhead. LMK what you think.

> for detecting out-of-order records, I need both new and old timestamp, I
suppose I get it for the new record via timestamp extractor, can I not get
it the same way from the old record that is passed down to the aggregation
after KIP-904?

Also subsumed by the updated proposal above, but I think the answer is not
necessarily, both for the reason Matthias gave (when the old and new
records are sent in the same message, there is only one timestamp) and
because even when the old record is sent on its own, it is sent with the
new record's timestamp in the timestamp field (link
).
So, if the timestamp extractor extracts the timestamp from the key/value of
the message, then yes all is well, but if the timestamp extractor uses the
record timestamp, then this timestamp will not be accurate for the old
record.

On Tue, Apr 11, 2023 at 2:26 PM Guozhang Wang 
wrote:

> Thanks Victoria.
>
> 1) I have no concerns about the filter operator's proposed semantics.
>
> 2) For aggregations, I have meta question in mind to discuss first,
> which is for those operators that generate a table, which is
> materialized as versioned, how we should emit out of order data if the
> operator still needs to. A quick look at the following operations
> today:
>
> a. Read topic as a table source
> b. KStream#toTable
> c. Aggre

[jira] [Created] (KAFKA-14895) Move AddPartitionsToTxnManager files to java

2023-04-11 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14895:
--

 Summary: Move AddPartitionsToTxnManager files to java
 Key: KAFKA-14895
 URL: https://issues.apache.org/jira/browse/KAFKA-14895
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


Followup task to move the files from scala to java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14896) TransactionsBounceTest causes a thread leak

2023-04-11 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14896:
--

 Summary: TransactionsBounceTest causes a thread leak
 Key: KAFKA-14896
 URL: https://issues.apache.org/jira/browse/KAFKA-14896
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan
Assignee: Justine Olshan


On several PR builds I see a test fail with ["Producer closed forcefully" 
|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/21/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupId__/]
and then many other tests fail with initialization errors due to 
[controller-event-thread,daemon-broker-bouncer-EventThread|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/21/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___executionError/]

In TransactionsBounceTest.testBrokerFailure, we create this thread to bounce 
the brokers. There is a finally block to shut it down but it seems to not be 
working. We should shut it down correctly.

Examples of failures:
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/21/#showFailuresLink]
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/17/#showFailuresLink]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1749

2023-04-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1750

2023-04-11 Thread Apache Jenkins Server
See