[jira] [Created] (KAFKA-13204) wrong assignor selected if the assignor name is identical

2021-08-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13204:
-

 Summary: wrong assignor selected if the assignor name is identical
 Key: KAFKA-13204
 URL: https://issues.apache.org/jira/browse/KAFKA-13204
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.8.0
Reporter: Luke Chen
Assignee: Luke Chen


We used the partition assignor name to identify which assignor to use in 
consumer coordinator. But we didn't do any assignor name conflict check, which 
will cause the wrong assignor got selected issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13205) Clarify API specification of Kafka Connect endpoint

2021-08-16 Thread Jonathan Kaleve (Jira)
Jonathan Kaleve created KAFKA-13205:
---

 Summary: Clarify API specification of Kafka Connect endpoint
 Key: KAFKA-13205
 URL: https://issues.apache.org/jira/browse/KAFKA-13205
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.7.0
Reporter: Jonathan Kaleve


Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics 
related to a Connector via its REST API (see [the original 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]).


While the original KIP proposed the Response Payload to look as follows: 
{code:java}
{
  "some-source": {
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
{{}}

The documentation by Confluent states the same: 
[https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics]


The [actual 
Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198],
 however, produces a result of the following form:
{code:java}
{
  "some-source": {
"connector": "some-source",
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
 
This poses a problem to some Applications (like [Strimzi 
Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect 
the response to be of type {{Map>>}} (see 
[here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]),
 but in Kafka Connect, the return type is actually {{Map>}} (see [this 
test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884],
 for example)
Which type should be expected here? Is the endpoint intended to return the type 
documented by Confluent, or the type that is actually present in the code? 

Also: I might be overlooking something, but it seems there is no official 
documentation of the specific API endpoints of Kafka Connect. Is that correct?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Apache Kafka 2.8.1 release

2021-08-16 Thread Bill Bejeck
Thanks for volunteering to run the release David.

It's a +1 for me.

Bill

On Mon, Aug 16, 2021 at 1:14 AM Konstantine Karantasis <
kkaranta...@apache.org> wrote:

> Thanks for volunteering David! +1.
>
> Konstantine
>
> On Thu, Aug 12, 2021 at 1:00 AM David Jacot 
> wrote:
>
> > Hi,
> >
> > I'd like to volunteer to be the release manager for the next bugfix
> > release, 2.8.1.
> >
> > I created the release plan on the wiki:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.8.1
> >
> > Best,
> > David
> >
>


Re: [VOTE] KIP-690: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-08-16 Thread Omnia Ibrahim
Thanks Gwen.
Can I get one more +1 binding for the KIP?

On Mon, Aug 2, 2021 at 2:51 AM Gwen Shapira 
wrote:

> +1 (binding). Thank you for your patience and clear explanations, Omnia.
>
> On Mon, Jul 26, 2021 at 3:39 PM Omnia Ibrahim 
> wrote:
>
> > Bumping up this voting thread.
> >
> > On Fri, Jul 16, 2021 at 1:57 PM Omnia Ibrahim 
> > wrote:
> >
> > > Hi,
> > > Can I get 2 more +1 binding for this KIP?
> > > Thanks
> > >
> > > On Fri, Jul 2, 2021 at 5:14 PM Omnia Ibrahim 
> > > wrote:
> > >
> > >> Hi All,
> > >>
> > >> Just thought of bumping this voting thread again to see if we can
> form a
> > >> consensus around this.
> > >>
> > >> Thanks
> > >>
> > >> On Thu, Jun 24, 2021 at 5:55 PM Mickael Maison <
> > mickael.mai...@gmail.com>
> > >> wrote:
> > >>
> > >>> +1 (binding)
> > >>> Thanks for the KIP!
> > >>>
> > >>> On Tue, May 4, 2021 at 3:23 PM Igor Soarez  >
> > >>> wrote:
> > >>> >
> > >>> > Another +1 here, also non-binding.
> > >>> >
> > >>> > Thank you Omnia!
> > >>> >
> > >>> > --
> > >>> > Igor
> > >>> >
> > >>> >
> > >>> > On Fri, Apr 30, 2021, at 3:15 PM, Ryanne Dolan wrote:
> > >>> > > +1 (non-binding), thanks!
> > >>> > >
> > >>> > > On Thu, Jan 21, 2021, 4:31 AM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com>
> > >>> wrote:
> > >>> > >
> > >>> > >> Hi
> > >>> > >> Can I get a vote on this, please?
> > >>> > >>
> > >>> > >> Best
> > >>> > >> Omnia
> > >>> > >>
> > >>> > >> On Tue, Dec 15, 2020 at 12:16 PM Omnia Ibrahim <
> > >>> o.g.h.ibra...@gmail.com>
> > >>> > >> wrote:
> > >>> > >>
> > >>> > >>> If anyone interested in reading the discussions you can find it
> > >>> here
> > >>> > >>>
> https://www.mail-archive.com/dev@kafka.apache.org/msg113373.html
> > >>> > >>>
> > >>> > >>> On Tue, Dec 8, 2020 at 4:01 PM Omnia Ibrahim <
> > >>> o.g.h.ibra...@gmail.com>
> > >>> > >>> wrote:
> > >>> > >>>
> > >>> >  Hi everyone,
> > >>> >  I’m proposing a new KIP for MirrorMaker 2 to add the ability
> to
> > >>> control
> > >>> >  internal topics naming convention. The proposal details are
> here
> > >>> > 
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention
> > >>> > 
> > >>> >  Please vote in this thread.
> > >>> >  Thanks
> > >>> >  Omnia
> > >>> > 
> > >>> > >>>
> > >>> > >
> > >>>
> > >>
> >
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


[jira] [Created] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jun Rao (Jira)
Jun Rao created KAFKA-13206:
---

 Summary: shutting down broker needs to stop fetching as a follower 
in KRaft mode
 Key: KAFKA-13206
 URL: https://issues.apache.org/jira/browse/KAFKA-13206
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.0.0
Reporter: Jun Rao


In the ZK mode, the controller will send a stopReplica(with deletion flag as 
false) request to the shutting down broker so that it will stop the followers 
from fetching. In KRaft mode, we don't have a corresponding logic. This means 
unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13207) Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher

2021-08-16 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-13207:
--

 Summary: Replica fetcher should not update partition state on 
diverging epoch if partition removed from fetcher
 Key: KAFKA-13207
 URL: https://issues.apache.org/jira/browse/KAFKA-13207
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 2.8.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 3.0.0, 2.8.1


{{AbstractFetcherThread#truncateOnFetchResponse}}{color:#24292e} is used with 
IBP 2.7 and above to truncate partitions based on diverging epoch returned in 
fetch responses. Truncation should only be performed for partitions that are 
still owned by the fetcher and this check should be done while holding 
{color}{{partitionMapLock}}{color:#24292e} to ensure that any partitions 
removed from the fetcher thread are not truncated{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka-site] kkonstantine opened a new pull request #366: Add public key for Konstantine Karantasis

2021-08-16 Thread GitBox


kkonstantine opened a new pull request #366:
URL: https://github.com/apache/kafka-site/pull/366


   Available at: 
   https://keys.openpgp.org/search?q=kkaranta...@apache.org


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[VOTE] KIP-748: Add Broker Count Metrics (restarted)

2021-08-16 Thread Colin McCabe
Hi all,

Two months ago we tried to vote on this minor KIP adding two new broker 
metrics. However, things got busy and we weren't able to complete the vote. Now 
that things have quieted down a bit, let's restart the vote!

Note that I made some minor changes to the KIP after some offline discussion 
with Ryan Dielhenn and David Jacot. Specifically, I added a metric for the 
number of fenced brokers.

Here's the KIP:
https://cwiki.apache.org/confluence/x/N4rOCg

Here's the discussion thread:
https://lists.apache.org/thread.html/r308364dfbb3020e6151cef47237c28a4a540e187b8af84ddafec83af%40%3Cdev.kafka.apache.org%3E

Please take a look if you have a chance!

best,
Colin


Re: [VOTE] KIP-748: Add Broker Count Metrics (restarted)

2021-08-16 Thread David Jacot
Hi Colin,

Thanks for restarting this KIP.

+1 (binding)

Best,
David

Le lun. 16 août 2021 à 21:32, Colin McCabe  a écrit :

> Hi all,
>
> Two months ago we tried to vote on this minor KIP adding two new broker
> metrics. However, things got busy and we weren't able to complete the vote.
> Now that things have quieted down a bit, let's restart the vote!
>
> Note that I made some minor changes to the KIP after some offline
> discussion with Ryan Dielhenn and David Jacot. Specifically, I added a
> metric for the number of fenced brokers.
>
> Here's the KIP:
> https://cwiki.apache.org/confluence/x/N4rOCg
>
> Here's the discussion thread:
>
> https://lists.apache.org/thread.html/r308364dfbb3020e6151cef47237c28a4a540e187b8af84ddafec83af%40%3Cdev.kafka.apache.org%3E
>
> Please take a look if you have a chance!
>
> best,
> Colin
>


[GitHub] [kafka-site] kkonstantine commented on pull request #366: Add public key for Konstantine Karantasis

2021-08-16 Thread GitBox


kkonstantine commented on pull request #366:
URL: https://github.com/apache/kafka-site/pull/366#issuecomment-899791557


   Thanks @rhauch !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka-site] kkonstantine merged pull request #366: Add public key for Konstantine Karantasis

2021-08-16 Thread GitBox


kkonstantine merged pull request #366:
URL: https://github.com/apache/kafka-site/pull/366


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Re: [DISCUSS] Apache Kafka 2.8.1 release

2021-08-16 Thread John Roesler
Thanks, David!

+1 from me as well.

-John

On Mon, 2021-08-16 at 12:00 -0400, Bill Bejeck wrote:
> Thanks for volunteering to run the release David.
> 
> It's a +1 for me.
> 
> Bill
> 
> On Mon, Aug 16, 2021 at 1:14 AM Konstantine Karantasis <
> kkaranta...@apache.org> wrote:
> 
> > Thanks for volunteering David! +1.
> > 
> > Konstantine
> > 
> > On Thu, Aug 12, 2021 at 1:00 AM David Jacot 
> > wrote:
> > 
> > > Hi,
> > > 
> > > I'd like to volunteer to be the release manager for the next bugfix
> > > release, 2.8.1.
> > > 
> > > I created the release plan on the wiki:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.8.1
> > > 
> > > Best,
> > > David
> > > 
> > 




Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #408

2021-08-16 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 488227 lines...]
[2021-08-16T21:17:53.247Z] > Task :raft:testClasses UP-TO-DATE
[2021-08-16T21:17:53.247Z] > Task :connect:json:testJar
[2021-08-16T21:17:53.247Z] > Task :connect:json:testSrcJar
[2021-08-16T21:17:53.247Z] > Task :metadata:compileTestJava UP-TO-DATE
[2021-08-16T21:17:53.247Z] > Task :metadata:testClasses UP-TO-DATE
[2021-08-16T21:17:53.247Z] > Task 
:clients:generateMetadataFileForMavenJavaPublication
[2021-08-16T21:17:53.247Z] > Task 
:clients:generatePomFileForMavenJavaPublication
[2021-08-16T21:17:54.192Z] 
[2021-08-16T21:17:54.192Z] > Task :streams:processMessages
[2021-08-16T21:17:54.192Z] Execution optimizations have been disabled for task 
':streams:processMessages' to ensure correctness due to the following reasons:
[2021-08-16T21:17:54.192Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/streams/src/generated/java/org/apache/kafka/streams/internals/generated'.
 Reason: Task ':streams:srcJar' uses this output of task 
':streams:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.1.1/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2021-08-16T21:17:54.192Z] MessageGenerator: processed 1 Kafka message JSON 
files(s).
[2021-08-16T21:17:54.192Z] 
[2021-08-16T21:17:54.192Z] > Task :core:compileScala UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :core:classes UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :core:compileTestJava NO-SOURCE
[2021-08-16T21:17:54.192Z] > Task :streams:compileJava UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :streams:classes UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :streams:copyDependantLibs UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :streams:jar UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :streams:test-utils:compileJava UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task 
:streams:generateMetadataFileForMavenJavaPublication
[2021-08-16T21:17:54.192Z] > Task :core:compileTestScala UP-TO-DATE
[2021-08-16T21:17:54.192Z] > Task :core:testClasses UP-TO-DATE
[2021-08-16T21:17:57.805Z] > Task :connect:api:javadoc
[2021-08-16T21:17:57.805Z] > Task :connect:api:copyDependantLibs UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task :connect:api:jar UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task 
:connect:api:generateMetadataFileForMavenJavaPublication
[2021-08-16T21:17:58.750Z] > Task :connect:json:copyDependantLibs UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task :connect:json:jar UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task 
:connect:json:generateMetadataFileForMavenJavaPublication
[2021-08-16T21:17:58.750Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2021-08-16T21:17:58.750Z] > Task :connect:json:publishToMavenLocal
[2021-08-16T21:17:58.750Z] > Task :connect:api:javadocJar
[2021-08-16T21:17:58.750Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task :connect:api:testClasses UP-TO-DATE
[2021-08-16T21:17:58.750Z] > Task :connect:api:testJar
[2021-08-16T21:17:58.750Z] > Task :connect:api:testSrcJar
[2021-08-16T21:17:58.750Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2021-08-16T21:17:58.750Z] > Task :connect:api:publishToMavenLocal
[2021-08-16T21:18:01.443Z] > Task :streams:javadoc
[2021-08-16T21:18:02.390Z] > Task :streams:javadocJar
[2021-08-16T21:18:02.390Z] > Task :streams:compileTestJava UP-TO-DATE
[2021-08-16T21:18:02.390Z] > Task :streams:testClasses UP-TO-DATE
[2021-08-16T21:18:03.337Z] > Task :streams:testJar
[2021-08-16T21:18:03.337Z] > Task :streams:testSrcJar
[2021-08-16T21:18:03.337Z] > Task 
:streams:publishMavenJavaPublicationToMavenLocal
[2021-08-16T21:18:03.337Z] > Task :streams:publishToMavenLocal
[2021-08-16T21:18:04.282Z] > Task :clients:javadoc
[2021-08-16T21:18:05.228Z] > Task :clients:javadocJar
[2021-08-16T21:18:06.175Z] 
[2021-08-16T21:18:06.175Z] > Task :clients:srcJar
[2021-08-16T21:18:06.175Z] Execution optimizations have been disabled for task 
':clients:srcJar' to ensure correctness due to the following reasons:
[2021-08-16T21:18:06.175Z]   - Gradle detected a problem with the following 
location: 
'/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/clients/src/generated/java'.
 Reason: Task ':clients:srcJar' uses this output of task 
':clients:processMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed. Please refer to 
https://docs.gradle.org/7.1.1/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
[2021-08-16T21:18:07.122Z] 
[2021-08-16T21:18:07.122Z] > Task :clients:testJar
[2021-08-16T21:18:08.069Z] > Tas

[jira] [Created] (KAFKA-13208) Use TopicIdPartition instead of TopicPartition when computing the topic delta

2021-08-16 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13208:
--

 Summary: Use TopicIdPartition instead of TopicPartition when 
computing the topic delta
 Key: KAFKA-13208
 URL: https://issues.apache.org/jira/browse/KAFKA-13208
 Project: Kafka
  Issue Type: Improvement
  Components: kraft, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


{{TopicPartition}} is used as the key when computing the local changes in 
{{TopicsDelta}}. The topic id is included in the Map value return by 
{{localChanges}}. I think that the handling of this code and the corresponding 
code in {{ReplicaManager}} could be simplified if {{localChanges}} instead 
returned something like
{code:java}
{
  deletes: Set[TopicIdPartition],
  leaders: Map[TopicIdPartition, PartitionRegistration],
  followers: Map[TopicIdPartition, PartitionRegistration] 
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Apache Kafka 2.8.1 release

2021-08-16 Thread Satish Duggana
Thanks David for running 2.8.1 release.

+1

On Tue, 17 Aug 2021 at 02:05, John Roesler  wrote:

> Thanks, David!
>
> +1 from me as well.
>
> -John
>
> On Mon, 2021-08-16 at 12:00 -0400, Bill Bejeck wrote:
> > Thanks for volunteering to run the release David.
> >
> > It's a +1 for me.
> >
> > Bill
> >
> > On Mon, Aug 16, 2021 at 1:14 AM Konstantine Karantasis <
> > kkaranta...@apache.org> wrote:
> >
> > > Thanks for volunteering David! +1.
> > >
> > > Konstantine
> > >
> > > On Thu, Aug 12, 2021 at 1:00 AM David Jacot
> 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'd like to volunteer to be the release manager for the next bugfix
> > > > release, 2.8.1.
> > > >
> > > > I created the release plan on the wiki:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.8.1
> > > >
> > > > Best,
> > > > David
> > > >
> > >
>
>
>


Re: Request development permissions to Apache Kafka

2021-08-16 Thread Matthias J. Sax
You should be all set.

On 8/13/21 11:22 PM, Christo NUKK wrote:
> Wiki ID: christo_lolovJira ID: christo_lolov
> 


Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-16 Thread Matthias J. Sax
@Mohan:

For sum(), I actually think that the return type should be same as the
input type. In the end, sum() is a special case of reduce().

@Alexandre:

Not sure if using a `Function` to get a `Comparable` is simpler that to
implement a `Comparator`? Can you elaborate on this point? In the end,
using a `Comparator` seems to provide highest flexibility?


-Matthias

On 8/15/21 10:27 AM, Alexandre Brasil wrote:
>> I am not sure why we would want to pass `Function>
>> func` into `min()`?
> 
> I guess I misread/misunderstood your earlier suggestion.
> 
> My line of thought was that, instead of using a method signature that
> demands a Comparator in
> min()/max(), we might use a property extractor (like the FK extractors on
> some join() overloads) to
> return a Comparable property that min()/max() could use to compare the
> values.
> 
> The benefit of this approach is that It would be simpler than implementing
> comparators when most
> use cases would probably compare properties of the values that already
> implement Comparable (like
> Numbers, Strings, Dates, etc), but on the other hand it would be more
> limiting in disallowing the usage
> of multiple properties of  or on defining how null property values
> should be handled.
> 
> On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax  wrote:
> 
>> I was playing with the code a little bit, but it seems not to be easy to
>> use generics to enforce that V is `Comparable`...
>>
>> We would need to introduce a new interface
>>
>>  interface ComparableStream>
>> extends KStream
>>  {
>> KTable min();
>>  }
>>
>> But it also requires a nasty cast to actually use it:
>>
>>   KStream stream =
>> new StreamsBuilder().stream("");
>>   KTable table =
>> ((ComparableStream) stream).min();
>>
>> If the value-type does not implement `Comparable` the cast would not
>> compile... Or would there be a simpler way to ensure that min() can only
>> be called _if_ V is `Comparable`?
>>
>>
>> So maybe passing in a `Comparator` might be the right way to go;
>> might also be more flexible anyway. -- My original idea was just to
>> maybe avoid the `Comparator` argument, as it would make the API nicer
>> IMHO; fewer parameters is usually better...
>>
>>
>> I am not sure why we would want to pass `Function>
>> func` into `min()`?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
>>> Alex,
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
>> alexandre.bra...@gmail.com>
>>> wrote:
>>>
 Mohan / Mathias,

>> I think extending min/max to non-numeric types makes sense. Wondering
>> why we should require a `Comparator` or if we should require that the
>> types implement `Comparable` instead?
>>
> Good question. This is what it would look like:
>
> KTable min_comparable()
> KTable min_comparator(Comparator comp)

 Not sure if I understood Mathias' proposal correctly, but I think that
 instead of going with
 your original proposal ( KTable
>> min(Function>>> VR> func...)
 or mine (KTable min(Comparator comparator...), we could
>> simplify
 it a
 bit by using a function to extract a Comparable property from the
>> original
 value:

 KTable min(Function> func...)

 I will let Matthias clarify. I am not sure why it is simpler than the
>>> comparator one. Comparable is implemented by the type and not sure
>> exposing
>>> it this way makes it any better.
>>>
 I also think, that min/max should not change the value type. Using
> `Long` for sum() make sense though, and also to require a ` Number>`.

 Are there any reasons to limit the sum() to integers? Why not use a
>> Double
 instead?

 Yeah, if the precision is important, then we should stick with Double.
>>>
>>> -mohan
>>>
>>> Best regards,
 Alexandre

 On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
>> mposde...@gmail.com>
 wrote:

> Matthias,
>
> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
 >
> wrote:
>
>> Hi,
>>
>> I think extending min/max to non-numeric types makes sense. Wondering
>> why we should require a `Comparator` or if we should require that the
>> types implement `Comparable` instead?
>>
>> Good question. This is what it would look like:
>
> KTable min_comparable()
> KTable min_comparator(Comparator comp)
>
> For min_comparable to work, you still need the bounds "V extends
> Comparable<
> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
 it
> has to be at the interface level like this:
>
>  KStream>
>
> which is a little messy unless there is a different way to do the same.
 The
> comparator gives a simple way to define an anonymous function.
>
> What do you think ?
>
>
>> I also think, that min/max should not change the value type. Using
>> `Long` for sum() make sense tho

Re: Request contributor permission

2021-08-16 Thread Matthias J. Sax
Done.

On 8/14/21 12:05 AM, Yanwen Lin wrote:
> Hi Kafka team,
> 
> Can I ask for a contributor permission so that I can assign myself to the 
> Kafka Jira ticket. My Jira id is: ll1124278064
> 
> Thanks!
> 
> Best,
> Yanwen
> 


Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-16 Thread Matthias J. Sax
Thanks for the use case details. I think it actually aligns to my
proposal, especially, as you say you want to react to changes asap: if
you apply an "aggregation" the result would be emitted at the end (or
close) of the window, and you cannot react to changes right away.

But your use case also points out a design flaw in my proposal: we still
want to take the key into account. Ie, we would create a window per key,
and as long as the value does not change, we don't emit anything
downstream. If the value changes for a key, we emit the new value
downstream and "reset" the window start time.

For a "real" de-duplicaton use case, for which each message-id should be
unique (and the value would always be the same for a given key) and one
only wants to filter upstream duplicates writes of the same message, the
operator would still work. But we gain more flexibility to also cover
your use case.

It's still not an aggregation IMHO.

I guess, one question is still about out-of-order data? For the
de-duplication use-case with message-id, it should not make a difference
which message we emit, as both are duplicates anyway (ie, key and value
are always the same for a specific message-id, and if done right, also
the timestamp -- and if not, maybe it does not matter which timestamp we
get).

For the use case you describe it's slightly different though because the
value for a specific key changes: if your senor reports (format ) the following:

  

the output should be , .

If the input has out-of-order data

  

the question is if we should emit  or not? The value did
actually change from 20 to 10, however, the timestamp is smaller than
the current window start-time (ts=12), so we could also drop it. This
question becomes even more tricky for the following example:

ordered:   
unordered:   

For the ordered case, we would emit all 3 records, so for the unordered
case, would it be ok to drop ? The problem is that we cannot
really decide if an out-of-order record is a duplicate or not...

For your particular use case it seems ok to drop? Not sure if emitting
would also be acceptable? Also not sure if other use cases might require
the one or the other semantics?

Maybe we could implement both and make it configurable? Or just pick
one, and only extend the operator with a configuration if it comes up later.


Does this make sense? Would this work for your use case?


-Matthias

On 8/9/21 3:15 PM, Ivan Ponomarev wrote:
>> To recap: Could it be that the idea to apply a DISTINCT-aggregation is
>> for a different use-case than to remove duplicate messages from a
> KStream?
> 
> OK, imagine the following:
> 
> We have 10 thermometers. Each second they transmit the current
> measured temperature. The id of the thermometer is the key, the
> temperature is the value. The temperature measured by a single
> thermometer changes slowly: on average, say once per 30 seconds, so most
> of the messages from a given thermometer are duplicates. But we need to
> react to the change ASAP. And we need a materialized view: a relational
> database table with the columns 'thermometerID' and 'temperature'.
> 
> 1) We don't want to send 100K updates per second to the database.
> 
> 2) But it's still ok if some updates will be duplicates -- the updates
> are idempotent.
> 
> 3) We even can afford to loose a fraction of data -- the data from each
> thermometer will be eventually 'fixed' by its fresh readings.
> 
> This is my use case (actually these are not thermometers and
> temperature, but it doesn't matter :-)
> 
> Is it conceptually different from what you were thinking about?
> 
> Regards,
> 
> Ivan
> 
> 09.08.2021 3:05, Matthias J. Sax пишет:
>> Thanks for sharing your thoughts. I guess my first question about why
>> using the key boils down to the use case, and maybe you have something
>> else in mind than I do.
>>
 I see it this way: we define 'distinct' operation as returning a single
 record per time window per selected key,
>>
>> I believe this sentence explains your way of thinking about it. My way
>> of thinking about it is different though: KStream de-duplication means
>> to "filter/drop" duplicate records, and a record is by definition a
>> duplicate if key AND value are the same. --- Or are the use case, for
>> which there might be an "message ID" and even if the "message ID" is the
>> same, the content of the message might be different? If this holds, do
>> we really de-duplicate records (sounds more like, pick a random one)?
>>
>>
>> Just re-read some of my older replies, and I guess, back than I did just
>> comment about your KeyExtractor idea, without considering the
>> end-the-end picture. Thus, my reply below goes into a different
>> direction now:
>>
>> We only need to apply a window because we need to purge state
>> eventually. To this end, I actually believe that applying a "sliding
>> window" is the best way to go: for each message we encounter, we start
>> the de-duplication window when the message arrives, don

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-16 Thread Matthias J. Sax
Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream stream = ...
KStream stream2 =
  stream.selectKey(/*set superkey*/)
.markAsPartitioned()

We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
> Hi Matthias and Sophie!
> 
> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
> 
> I don't have a strong opinion here, both Sophie's and Matthias' points
> look convincing for me.
> 
> I think we should estimate the following: what is the probability that
> we will ever need to extend `selectKey` etc. with a config for the
> purposes other than `markAsPartitioned`?
> 
> If we find this probability high, then it's just a refactoring to
> deprecate overloads with `Named` and introduce overloads with dedicated
> configs, and we should do it this way.
> 
> If it's low or zero, maybe it's better not to mess with the existing
> APIs and to introduce a single `markAsPartitioned()` method, which
> itself can be easily deprecated if we find a better solution later!
> 
> 
> ==2. The IQ problem==
> 
>> it then has to be the case that
> 
>> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> 
> Sophie, you got this wrong, and Matthias already explained why.
> 
> The actual required property for the mapping function is:
> 
> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
> 
> or, by contraposition law,
> 
> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
> 
> 
> (look at the whiteboard photo that I attached to the KIP).
> 
> There is a big class of such mappings: key -> Tuple(key, anyValue). This
> is actually what we often do before aggregation, and this mapping does
> not require repartition.
> 
> But of course we can extract the original key from Tuple(key, anyValue),
> and this can save IQ and joins!
> 
> This is what I'm talking about when I talk about 'CompositeKey' idea.
> 
> We can do the following:
> 
> 1. implement a 'partitioner wrapper' that recognizes tuples
> (CompositeKeys) and uses only the 'head' to calculate the partition,
> 
> 2. implement
> 
> selectCompositeKey(BiFunction tailSelector) {
>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>   //MARK_AS_PARTITIONED call here,
>   //but this call is an implementation detail and we do not expose
>   //markAsPartitioned publicly!   
> }
> 
> WDYT? (it's just a brainstorming idea)
> 
> 09.08.2021 2:38, Matthias J. Sax пишет:
>> Hi,
>>
>> I originally had a similar thought about `markAsPartitioned()` vs
>> extending `selectKey()` et al. with a config. While I agree that it
>> might be conceptually cleaner to use a config object, I did not propose
>> it as the API impact (deprecating stuff and adding new stuff) is quite
>> big... If we think it's an acceptable price to pay, I am ok with it
>> though.
>>
>> I also do think, that `markAsPartitioned()` could actually be
>> categorized as an operator... We don't expose it in the API as
>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>> one into the other.
>>
>> I also think that the raised concern about "forgetting to remove
>> `markAsPartitioned()`" might not be very strong though. If you have
>> different places in the code that link stuff together, a call to eg.
>> `selectKey().markAsPartitioned()` must always to together. If you have
>> some other place in the code that get a `KStream` passed an input, it
>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>> know anything about the upstream code. Of course, it requires some
>> "coding discipline" to follow this pattern... Also, you can shoot
>> themselves into the foot if they want 

Re: Request contributor permission

2021-08-16 Thread Yanwen Lin
Hi Kafka team,

Please help take a look. I’d like to contribute to Apache Kafka and my Jira ID 
is: ll1124278064

Thanks!

Best,



> On Aug 14, 2021, at 12:05 AM, Yanwen Lin  wrote:
> 
> Hi Kafka team,
> 
> Can I ask for a contributor permission so that I can assign myself to the 
> Kafka Jira ticket. My Jira id is: ll1124278064
> 
> Thanks!
> 
> Best,
> Yanwen



Re: Request contributor permission

2021-08-16 Thread Yanwen Lin
nvm. Just saw the message. Thanks!

> On Aug 16, 2021, at 8:18 PM, Yanwen Lin  wrote:
> 
> Hi Kafka team,
> 
> Please help take a look. I’d like to contribute to Apache Kafka and my Jira 
> ID is: ll1124278064
> 
> Thanks!
> 
> Best,
> 
> 
> 
>> On Aug 14, 2021, at 12:05 AM, Yanwen Lin > > wrote:
>> 
>> Hi Kafka team,
>> 
>> Can I ask for a contributor permission so that I can assign myself to the 
>> Kafka Jira ticket. My Jira id is: ll1124278064
>> 
>> Thanks!
>> 
>> Best,
>> Yanwen
> 



Re: Request contributor permission

2021-08-16 Thread Luke Chen
Hi Yanwen,
I think Matthias has already granted your permission to jira.
Please try to log in and see if you can assign a Kafka ticket to yourselves.

Thanks.
Luke

On Tue, Aug 17, 2021 at 11:15 AM Yanwen Lin  wrote:

> Hi Kafka team,
>
> Please help take a look. I’d like to contribute to Apache Kafka and my
> Jira ID is: ll1124278064
>
> Thanks!
>
> Best,
>
>
>
> > On Aug 14, 2021, at 12:05 AM, Yanwen Lin 
> wrote:
> >
> > Hi Kafka team,
> >
> > Can I ask for a contributor permission so that I can assign myself to
> the Kafka Jira ticket. My Jira id is: ll1124278064
> >
> > Thanks!
> >
> > Best,
> > Yanwen
>
>


Re: Request contributor permission

2021-08-16 Thread Yanwen Lin
Yes, I missed Matthias’s msg just now. Have confirmed this. Thanks again!

> On Aug 16, 2021, at 8:20 PM, Luke Chen  wrote:
> 
> Hi Yanwen,
> I think Matthias has already granted your permission to jira.
> Please try to log in and see if you can assign a Kafka ticket to yourselves.
> 
> Thanks.
> Luke
> 
> On Tue, Aug 17, 2021 at 11:15 AM Yanwen Lin  wrote:
> 
>> Hi Kafka team,
>> 
>> Please help take a look. I’d like to contribute to Apache Kafka and my
>> Jira ID is: ll1124278064
>> 
>> Thanks!
>> 
>> Best,
>> 
>> 
>> 
>>> On Aug 14, 2021, at 12:05 AM, Yanwen Lin 
>> wrote:
>>> 
>>> Hi Kafka team,
>>> 
>>> Can I ask for a contributor permission so that I can assign myself to
>> the Kafka Jira ticket. My Jira id is: ll1124278064
>>> 
>>> Thanks!
>>> 
>>> Best,
>>> Yanwen
>> 
>> 



Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-16 Thread Mohan Parthasarathy
On Mon, Aug 16, 2021 at 5:28 PM Matthias J. Sax  wrote:
>
> @Mohan:
>
> For sum(), I actually think that the return type should be same as the
> input type. In the end, sum() is a special case of reduce().
>
If you define something like this as you suggested (without defining
new interfaces):

 E sumG(Function func)

Internally the implementation still has to extract the "value" from
the return object "E" using longValue or doubleValue (to avoid loss of
precision) to do the sum. Once you do that, casting back to "E"  will
have an Unchecked cast error. I think the return type has to be fixed.
Possibilities:

Double sum(Function func)
Long sum(Function func)
 Long sum(Function func)
 Double sum(Function func)

Let me know If i am missing something.

Thanks
Mohan





> @Alexandre:
>
> Not sure if using a `Function` to get a `Comparable` is simpler that to
> implement a `Comparator`? Can you elaborate on this point? In the end,
> using a `Comparator` seems to provide highest flexibility?
>
>
> -Matthias
>
> On 8/15/21 10:27 AM, Alexandre Brasil wrote:
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >
> > I guess I misread/misunderstood your earlier suggestion.
> >
> > My line of thought was that, instead of using a method signature that
> > demands a Comparator in
> > min()/max(), we might use a property extractor (like the FK extractors on
> > some join() overloads) to
> > return a Comparable property that min()/max() could use to compare the
> > values.
> >
> > The benefit of this approach is that It would be simpler than implementing
> > comparators when most
> > use cases would probably compare properties of the values that already
> > implement Comparable (like
> > Numbers, Strings, Dates, etc), but on the other hand it would be more
> > limiting in disallowing the usage
> > of multiple properties of  or on defining how null property values
> > should be handled.
> >
> > On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax  wrote:
> >
> >> I was playing with the code a little bit, but it seems not to be easy to
> >> use generics to enforce that V is `Comparable`...
> >>
> >> We would need to introduce a new interface
> >>
> >>  interface ComparableStream>
> >> extends KStream
> >>  {
> >> KTable min();
> >>  }
> >>
> >> But it also requires a nasty cast to actually use it:
> >>
> >>   KStream stream =
> >> new StreamsBuilder().stream("");
> >>   KTable table =
> >> ((ComparableStream) stream).min();
> >>
> >> If the value-type does not implement `Comparable` the cast would not
> >> compile... Or would there be a simpler way to ensure that min() can only
> >> be called _if_ V is `Comparable`?
> >>
> >>
> >> So maybe passing in a `Comparator` might be the right way to go;
> >> might also be more flexible anyway. -- My original idea was just to
> >> maybe avoid the `Comparator` argument, as it would make the API nicer
> >> IMHO; fewer parameters is usually better...
> >>
> >>
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> >>> Alex,
> >>>
> >>>
> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> >> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
>  Mohan / Mathias,
> 
> >> I think extending min/max to non-numeric types makes sense. Wondering
> >> why we should require a `Comparator` or if we should require that the
> >> types implement `Comparable` instead?
> >>
> > Good question. This is what it would look like:
> >
> > KTable min_comparable()
> > KTable min_comparator(Comparator comp)
> 
>  Not sure if I understood Mathias' proposal correctly, but I think that
>  instead of going with
>  your original proposal ( KTable
> >> min(Function  VR> func...)
>  or mine (KTable min(Comparator comparator...), we could
> >> simplify
>  it a
>  bit by using a function to extract a Comparable property from the
> >> original
>  value:
> 
>  KTable min(Function> func...)
> 
>  I will let Matthias clarify. I am not sure why it is simpler than the
> >>> comparator one. Comparable is implemented by the type and not sure
> >> exposing
> >>> it this way makes it any better.
> >>>
>  I also think, that min/max should not change the value type. Using
> > `Long` for sum() make sense though, and also to require a ` > Number>`.
> 
>  Are there any reasons to limit the sum() to integers? Why not use a
> >> Double
>  instead?
> 
>  Yeah, if the precision is important, then we should stick with Double.
> >>>
> >>> -mohan
> >>>
> >>> Best regards,
>  Alexandre
> 
>  On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> >> mposde...@gmail.com>
>  wrote:
> 
> > Matthias,
> >
> > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>   >>
> > wrote:
> >
> >> Hi,
> >>
> >> I thi