Re: [DISCUSSION] KIP-865: Support --bootstrap-server in kafka-streams-application-reset

2022-09-06 Thread Николай Ижиков
Sure, no problem.

Thread name updated.

Link to KIP - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
 



> 6 сент. 2022 г., в 12:20, Jorge Esteban Quilcate Otoya 
>  написал(а):
> 
> Hi there! 
> I think there's a collision of KIP number between this and 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors
>  
> 
> 
> Would you mind bumping your number if that's ok? Let me know if that works.
> Thanks!
> 
> On Tue, 6 Sept 2022 at 07:25, Николай Ижиков  > wrote:
> Hello.
> 
> Do we still want to make parameter names consistent in tools?
> If yes, please, share your feedback on KIP.
> 
> > 31 авг. 2022 г., в 11:50, Николай Ижиков  > > написал(а):
> > 
> > Hello.
> > 
> > I would like to start discussion on small KIP [1]
> > The goal of KIP is to add the same —boostrap-server parameter to 
> > `kafka-streams-appliation-reset.sh` tool as other tools use.
> > Please, share your feedback.
> > 
> > [1] 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> >  
> > 
> > 
> 



Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-09-06 Thread Jorge Esteban Quilcate Otoya
Hi Sagar and Yash,

> the way it's defined in
https://kafka.apache.org/documentation/#connect_monitoring for the metrics

4.1. Got it. Add it to the KIP.

> The only thing I would argue is do we need sink-record-latency-min? Maybe
we
> could remove this min metric as well and make all of the 3 e2e metrics
> consistent

4.2 I see. Will remove it from the KIP.

> Probably users can track the metrics at their end to
> figure that out. Do you think that makes sense?

4.3. Yes, agree. With these new metrics it should be easier for users to
track this.

> I think it makes sense to not have a min metric for either to remain
> consistent with the existing put-batch and poll-batch metrics

5.1. Got it. Same as 4.2

> Another naming related suggestion I had was with the
> "convert-time" metrics - we should probably include transformations in the
> name since SMTs could definitely be attributable to a sizable chunk of the
> latency depending on the specific transformation chain.

5.2. Make sense. I'm proposing to add `sink-record-convert-transform...`
and `source-record-transform-convert...` to represent correctly the order
of operations.

> it seems like both source and sink tasks only record metrics at a "batch"
> level, not on an individual record level. I think it might be additional
> overhead if we want to record these new metrics all at the record level?

5.3. I considered at the beginning to implement all metrics at the batch
level, but given how the framework process records, I fallback to the
proposed approach:
- Sink Task:
  - `WorkerSinkTask#convertMessages(msgs)` already iterates over records,
so there is no additional overhead to capture record latency per record.
-
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514
  - `WorkerSinkTask#convertAndTransformRecord(record)` actually happens
individually. Measuring this operation per batch would include processing
that is not strictly part of "convert and transform"
-
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518
- Source Task:
  - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and
applies transforms and convert record individually as well:
-
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390

> This might require some additional changes -
> for instance, with the "sink-record-latency" metric, we might only want to
> have a "max" metric since "avg" would require recording a value on the
> sensor for each record (whereas we can get a "max" by only recording a
> metric value for the oldest record in each batch).

5.4. Recording record-latency per batch may not be as useful as there is no
guarantee that the oldest record will be representative of the batch.

On Sat, 3 Sept 2022 at 16:02, Yash Mayya  wrote:

> Hi Jorge and Sagar,
>
> I think it makes sense to not have a min metric for either to remain
> consistent with the existing put-batch and poll-batch metrics (it doesn't
> seem particularly useful either anyway). Also, the new
> "sink-record-latency" metric name looks fine to me, thanks for making the
> changes! Another naming related suggestion I had was with the
> "convert-time" metrics - we should probably include transformations in the
> name since SMTs could definitely be attributable to a sizable chunk of the
> latency depending on the specific transformation chain.
>
> I have one high level question with respect to implementation - currently,
> it seems like both source and sink tasks only record metrics at a "batch"
> level, not on an individual record level. I think it might be additional
> overhead if we want to record these new metrics all at the record level?
> Could we instead make all of these new metrics for batches of records
> rather than individual records in order to remain consistent with the
> existing task level metrics? This might require some additional changes -
> for instance, with the "sink-record-latency" metric, we might only want to
> have a "max" metric since "avg" would require recording a value on the
> sensor for each record (whereas we can get a "max" by only recording a
> metric value for the oldest record in each batch).
>
> Thanks,
> Yash
>
> On Fri, Sep 2, 2022 at 3:16 PM Sagar  wrote:
>
> > Hi Jorge,
> >
> > Thanks for the changes.
> >
> > Regarding the metrics, I meant something like this:
> >
> kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
> >
> > the way it's defined in
> > https://kafka.apache.org/documentation/#connect_monitoring for the
> > metrics.
> >
> > I see what you mean by the 3 metrics and how it can be interpreted. The
> > only thing I would argue is do we need sink-record-latency-min? 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-06 Thread David Jacot
Hi Jun,

I have updated the KIP to include your feedback. I have also tried to
clarify the parts which were not cleared.

Best,
David

On Fri, Sep 2, 2022 at 4:18 PM David Jacot  wrote:
>
> Hi Jun,
>
> Thanks for your feedback. Let me start by answering your questions
> inline and I will update the KIP next week.
>
> > Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer
> > RPCs during rebalance and more efficient support of wildcard. A few
> > comments below.
>
> I would also add that the KIP removes the global sync barrier in the
> protocol which is essential to improve group stability and
> scalability, and the KIP also simplifies the client by moving most of
> the logic to the server side.
>
> > 30. ConsumerGroupHeartbeatRequest
> > 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing
> > of the partition assignor in the consumers?
>
> Definitely. The group coordinator will use the assignor used by a
> majority of the members. This allows the group to move from one
> assignor to another by a roll. This is explained in the Assignor
> Selection chapter.
>
> > 30.2 For each field, could you explain whether it's required in every
> > request or the scenarios when it needs to be filled? For example, it's not
> > clear to me when TopicPartitions needs to be filled.
>
> The client is expected to set those fields in case of a connection
> issue (e.g. timeout) or when the fields have changed since the last
> HB. The server populates those fields as long as the member is not
> fully reconciled - the member should acknowledge that it has the
> expected epoch and assignment. I will clarify this in the KIP.
>
> > 31. In the current consumer protocol, the rack affinity between the client
> > and the broker is only considered during fetching, but not during assigning
> > partitions to consumers. Sometimes, once the assignment is made, there is
> > no opportunity for read affinity because no replicas of assigned partitions
> > are close to the member. I am wondering if we should use this opportunity
> > to address this by including rack in GroupMember.
>
> That's an interesting idea. I don't see any issue with adding the rack
> to the members. I will do so.
>
> > 32. On the metric side, often, it's useful to know how busy a group
> > coordinator is. By moving the event loop model, it seems that we could add
> > a metric that tracks the fraction of the time the event loop is doing the
> > actual work.
>
> That's a great idea. I will add it. Thanks.
>
> > 33. Could we add a section on coordinator failover handling? For example,
> > does it need to trigger the check if any group with the wildcard
> > subscription now has a new matching topic?
>
> Sure. When the new group coordinator takes over, it has to:
> * Setup the session timeouts.
> * Trigger a new assignment if a client side assignor is used. We don't
> store the information about the member selected to run the assignment
> so we have to start a new one.
> * Update the topics metadata, verify the wildcard subscriptions, and
> trigger a rebalance if needed.
>
> > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> > ConsumerGroupMemberMetadataValue: Could we document what the epoch field
> > reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect
> > the latest group epoch? What about the one in
> > ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue?
>
> Sure. I will clarify that but it is always the latest group epoch.
> When the group state is updated, the group epoch is bumped so we use
> that one for all the change records related to the update.
>
> > 35. "the group coordinator will ensure that the following invariants are
> > met: ... All members exists." It's possible for a member not to get any
> > assigned partitions, right?
>
> That's right. Here I meant that the members provided by the assignor
> in the assignment must exist in the group. The assignor can not make
> up new member ids.
>
> > 36. "He can rejoins the group with a member epoch equals to 0": When would
> > a consumer rejoin and what member id would be used?
>
> A member is expected to abandon all its partitions and rejoins when it
> receives the FENCED_MEMBER_EPOCH error. In this case, the group
> coordinator will have removed the member from the group. The member
> can rejoin the group with the same member id but with 0 as epoch. Let
> me see if I can clarify this in the KIP.
>
> > 37. "Instead, power users will have the ability to trigger a reassignment
> > by either providing a non-zero reason or by updating the assignor
> > metadata." Hmm, this seems to be conflicting with the deprecation of
> > Consumer#enforeRebalance.
>
> In this case, a new assignment is triggered by the client side
> assignor. When constructing the HB, the consumer will always consult
> the client side assignor and propagate the information to the group
> coordinator. In other words, we don't expect users to call

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.2 #76

2022-09-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 548918 lines...]
[2022-09-06T13:10:54.726Z] 
[2022-09-06T13:10:54.726Z] 
org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldAllowOverridingChangelogConfig PASSED
[2022-09-06T13:10:54.726Z] 
[2022-09-06T13:10:54.726Z] 
org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenBytesConstraintIsViolated STARTED
[2022-09-06T13:11:00.651Z] 
[2022-09-06T13:11:00.651Z] 
org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldShutdownWhenBytesConstraintIsViolated PASSED
[2022-09-06T13:11:00.651Z] 
[2022-09-06T13:11:00.651Z] 
org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldCreateChangelogByDefault STARTED
[2022-09-06T13:11:10.686Z] 
[2022-09-06T13:11:10.687Z] 
org.apache.kafka.streams.integration.SuppressionIntegrationTest > 
shouldCreateChangelogByDefault PASSED
[2022-09-06T13:11:10.687Z] 
[2022-09-06T13:11:10.687Z] 
org.apache.kafka.streams.integration.TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor STARTED
[2022-09-06T13:11:11.729Z] 
[2022-09-06T13:11:11.729Z] 
org.apache.kafka.streams.integration.TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor PASSED
[2022-09-06T13:11:14.030Z] 
[2022-09-06T13:11:14.030Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargeNumConsumers STARTED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargeNumConsumers PASSED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount STARTED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargePartitionCount PASSED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient STARTED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyThreadsPerClient PASSED
[2022-09-06T13:12:07.412Z] 
[2022-09-06T13:12:07.412Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyStandbys STARTED
[2022-09-06T13:12:13.170Z] 
[2022-09-06T13:12:13.170Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyStandbys PASSED
[2022-09-06T13:12:13.170Z] 
[2022-09-06T13:12:13.170Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyThreadsPerClient STARTED
[2022-09-06T13:12:14.219Z] 
[2022-09-06T13:12:14.219Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorManyThreadsPerClient PASSED
[2022-09-06T13:12:14.219Z] 
[2022-09-06T13:12:14.219Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient STARTED
[2022-09-06T13:12:15.270Z] 
[2022-09-06T13:12:15.270Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient PASSED
[2022-09-06T13:12:15.270Z] 
[2022-09-06T13:12:15.270Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount STARTED
[2022-09-06T13:13:01.793Z] 
[2022-09-06T13:13:01.793Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount PASSED
[2022-09-06T13:13:01.793Z] 
[2022-09-06T13:13:01.793Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount STARTED
[2022-09-06T13:13:40.161Z] 
[2022-09-06T13:13:40.161Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount PASSED
[2022-09-06T13:13:40.161Z] 
[2022-09-06T13:13:40.161Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys STARTED
[2022-09-06T13:13:50.998Z] 
[2022-09-06T13:13:50.998Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys PASSED
[2022-09-06T13:13:50.998Z] 
[2022-09-06T13:13:50.998Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyStandbys STARTED
[2022-09-06T13:14:32.473Z] 
[2022-09-06T13:14:32.473Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabil

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1199

2022-09-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 501969 lines...]
[2022-09-06T15:33:18.315Z] 
[2022-09-06T15:33:18.315Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient STARTED
[2022-09-06T15:33:19.245Z] 
[2022-09-06T15:33:19.245Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyThreadsPerClient PASSED
[2022-09-06T15:33:19.245Z] 
[2022-09-06T15:33:19.245Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount STARTED
[2022-09-06T15:34:01.543Z] 
[2022-09-06T15:34:01.543Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargePartitionCount PASSED
[2022-09-06T15:34:01.543Z] 
[2022-09-06T15:34:01.543Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount STARTED
[2022-09-06T15:34:38.428Z] 
[2022-09-06T15:34:38.428Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargePartitionCount PASSED
[2022-09-06T15:34:38.428Z] 
[2022-09-06T15:34:38.428Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys STARTED
[2022-09-06T15:34:44.267Z] 
[2022-09-06T15:34:44.267Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorManyStandbys PASSED
[2022-09-06T15:34:44.267Z] 
[2022-09-06T15:34:44.267Z] StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyStandbys STARTED
[2022-09-06T15:35:28.658Z] 
[2022-09-06T15:35:28.658Z] StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorManyStandbys PASSED
[2022-09-06T15:35:28.658Z] 
[2022-09-06T15:35:28.658Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargeNumConsumers STARTED
[2022-09-06T15:35:29.609Z] 
[2022-09-06T15:35:29.609Z] StreamsAssignmentScaleTest > 
testFallbackPriorTaskAssignorLargeNumConsumers PASSED
[2022-09-06T15:35:29.609Z] 
[2022-09-06T15:35:29.609Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargeNumConsumers STARTED
[2022-09-06T15:35:32.739Z] 
[2022-09-06T15:35:32.739Z] StreamsAssignmentScaleTest > 
testStickyTaskAssignorLargeNumConsumers PASSED
[2022-09-06T15:35:32.739Z] 
[2022-09-06T15:35:32.739Z] AdjustStreamThreadCountTest > 
testConcurrentlyAccessThreads() STARTED
[2022-09-06T15:35:38.725Z] 
[2022-09-06T15:35:38.725Z] AdjustStreamThreadCountTest > 
testConcurrentlyAccessThreads() PASSED
[2022-09-06T15:35:38.725Z] 
[2022-09-06T15:35:38.725Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadReplacement() STARTED
[2022-09-06T15:35:43.392Z] 
[2022-09-06T15:35:43.392Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadReplacement() PASSED
[2022-09-06T15:35:43.392Z] 
[2022-09-06T15:35:43.392Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveThreadsMultipleTimes() STARTED
[2022-09-06T15:35:55.504Z] 
[2022-09-06T15:35:55.504Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveThreadsMultipleTimes() PASSED
[2022-09-06T15:35:55.504Z] 
[2022-09-06T15:35:55.504Z] AdjustStreamThreadCountTest > 
shouldnNotRemoveStreamThreadWithinTimeout() STARTED
[2022-09-06T15:35:57.282Z] 
[2022-09-06T15:35:57.282Z] AdjustStreamThreadCountTest > 
shouldnNotRemoveStreamThreadWithinTimeout() PASSED
[2022-09-06T15:35:57.282Z] 
[2022-09-06T15:35:57.282Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() STARTED
[2022-09-06T15:36:17.318Z] 
[2022-09-06T15:36:17.318Z] AdjustStreamThreadCountTest > 
shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() PASSED
[2022-09-06T15:36:17.318Z] 
[2022-09-06T15:36:17.318Z] AdjustStreamThreadCountTest > 
shouldAddStreamThread() STARTED
[2022-09-06T15:36:22.241Z] 
[2022-09-06T15:36:22.241Z] AdjustStreamThreadCountTest > 
shouldAddStreamThread() PASSED
[2022-09-06T15:36:22.241Z] 
[2022-09-06T15:36:22.241Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThreadWithStaticMembership() STARTED
[2022-09-06T15:36:25.962Z] 
[2022-09-06T15:36:25.962Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThreadWithStaticMembership() PASSED
[2022-09-06T15:36:25.962Z] 
[2022-09-06T15:36:25.962Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThread() STARTED
[2022-09-06T15:36:32.277Z] 
[2022-09-06T15:36:32.277Z] AdjustStreamThreadCountTest > 
shouldRemoveStreamThread() PASSED
[2022-09-06T15:36:32.277Z] 
[2022-09-06T15:36:32.277Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadRemovalTimesOut() STARTED
[2022-09-06T15:36:34.267Z] 
[2022-09-06T15:36:34.267Z] AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadRemovalTimesOut() PASSED
[2022-09-06T15:36:36.522Z] 
[2022-09-06T15:36:36.522Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest()
 STARTED
[2022-09-06T15:36:37.664Z] 
[2022-09-06T15:36:37.664Z] FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest()
 PASSED
[2022-09-06T15:36:37.664Z] 
[2022-09-06T15:36:37.664Z] FineGrainedAutoResetIntegrationTest >

Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-06 Thread Nick Telford
Hi everyone,

I've re-written the KIP, with a new design that I think resolves the issues
you highlighted, and also simplifies usage.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams

Note: I'm still working out the "automatic repartitioning" in my head, as I
don't think it's quite right. It may turn out that the additional overload
(with the Produced argument) is not necessary.

Thanks for all your feedback so far. Let me know what you think!

Regards,

Nick

On Thu, 25 Aug 2022 at 17:46, Nick Telford  wrote:

> Hi Sophie,
>
> The reason I chose to add a new overload of "to", instead of creating a
> new method, is simply because I felt that "to" was about sending records
> "to" somewhere, and that "somewhere" just happens to currently be
> exclusively topics. By re-using "to", we can send records *to other
> KStreams*, including a KStream from an earlier point in the current
> KStreams' pipeline, which would facilitate recursion. Sending records to a
> completely different KStream would be essentially a merge.
>
> However, I'm happy to reduce the scope of this method to focus exclusively
> on recursion: we'd simply need to add a check in to the method that ensures
> the target is an ancestor node of the current KStream node.
>
> Which brings me to your first query...
>
> My argument is simply that a 0-ary method isn't enough to facilitate
> recursive streaming, because you need to be able to communicate which point
> in the process graph you want to feed your records back in to.
>
> Consider my example from the KIP, but re-written with a 0-ary
> "recursively" method:
>
> updates
> .join(parents, (count, parent) -> { KeyValue(parent, count) })
> .recursively()
>
> Where does the join output get fed to?
>
>1. The "updates" (source) node?
>2. The "join" node itself?
>
> It would probably be most intuitive if it simply caused the last step to
> be recursive, but that won't always be what you want. Consider if we add
> some more steps in to the above:
>
> updates
> .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make
> sense in this algorithm, but let's pretend it does
> .join(parents, (count, parent) -> { KeyValue(parent, count) })
> .recursively()
>
> If "recursively" just feeds records back into the "join", it misses out on
> potentially important steps in our recursive algorithm. It also gets even
> worse if the step you're making recursive doesn't contain your terminal
> condition:
>
> foo
> .filter((key, value) -> value <= 0) // <-- terminal condition
> .mapValues((value) -> value - 1)
> .recursively()
>
> If "recursively" feeds records back to the "mapValues" stage in our
> pipeline, and not in to "filter" or "foo", then the terminal condition in
> "filter" won't be evaluated for any values with a starting value greater
> than 0, *causing an infinite loop*.
>
> There's an argument to be had to always feed the values back to the first
> ancestor "source node", in the process-graph, but that might not be
> particularly intuitive, and is likely going to limit some of the recursive
> algorithms that some may want to implement. For example, in the previous
> example, there's no guarantee that "foo" is a source node; it could be the
> result of a "mapValues", for example.
>
> Ultimately, the solution here is to make this method take a parameter,
> explicitly specifying the KStream that records are fed back in to, making
> the above two examples:
>
> updates
> .map((parent, count) -> KeyValue(parent, count + 1))
> .join(parents, (count, parent) -> { KeyValue(parent, count) })
> .recursively(updates)
>
> and:
>
> foo
> .filter((key, value) -> value <= 0)
> .mapValues((value) -> value - 1)
> .recursively(foo)
>
> We could *also* support a 0-ary version of the method that defaults to
> recursively executing the previous node, but I'm worried that users may not
> fully understand the consequences of this, inadvertently creating infinite
> loops that are difficult to debug.
>
> Finally, I'm not convinced that "recursively" is the best name for the
> method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome!
>
> If we want to prevent this method being "abused" to merge different
> streams together, it should be trivial to ensure that the provided argument
> is an ancestor of the current node, by recursively traversing up the
> process graph.
>
> I hope this clarifies your questions. It's clear that the KIP needs more
> work to better elaborate on these points. I haven't had a chance to revise
> it yet, due to more pressing issues with EOS stability that I've been
> looking into.
>
> Regards,
>
> Nick
>
> On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman
>  wrote:
>
>> Hey Nick,
>>
>> Sounds like an interesting KIP, and I agree the current way of achieving
>> this in Streams
>> seems wildly overcomplicated. So I'm definitely +1 on adding a smooth API
>> that abstracts
>> away a lo

[jira] [Resolved] (KAFKA-14179) Improve docs/upgrade.html to talk about metadata.version upgrades

2022-09-06 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-14179.

Fix Version/s: (was: 3.3.0)
   Resolution: Duplicate

> Improve docs/upgrade.html to talk about metadata.version upgrades
> -
>
> Key: KAFKA-14179
> URL: https://issues.apache.org/jira/browse/KAFKA-14179
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Blocker
>  Labels: documentation, kraft
>
> The rolling upgrade documentation for 3.3.0 only talks about software and IBP 
> upgrades. It doesn't talk about metadata.version upgrades.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-06 Thread Guozhang Wang
Hello Nick,

Thanks for the re-written KIP! I read through it again, and so far have
just one quick question on top of my head regarding repartitioning: it
seems to me that when there's an intermediate topic inside the recursion
step, then using this new API would basically give us the same behavior as
using the existing `to` APIs. Of course, with the new API the user can make
it more explicit that it is supposed to be recursive, but efficiency wise
it provides no further optimizations. Is my understanding correct? If yes,
I'm wondering if it's worthy the complexity to allow repartitioning inside
the unary operator, or should we just restrict the recursion inside a
single sub-topology.


Guozhang

On Tue, Sep 6, 2022 at 9:05 AM Nick Telford  wrote:

> Hi everyone,
>
> I've re-written the KIP, with a new design that I think resolves the issues
> you highlighted, and also simplifies usage.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
>
> Note: I'm still working out the "automatic repartitioning" in my head, as I
> don't think it's quite right. It may turn out that the additional overload
> (with the Produced argument) is not necessary.
>
> Thanks for all your feedback so far. Let me know what you think!
>
> Regards,
>
> Nick
>
> On Thu, 25 Aug 2022 at 17:46, Nick Telford  wrote:
>
> > Hi Sophie,
> >
> > The reason I chose to add a new overload of "to", instead of creating a
> > new method, is simply because I felt that "to" was about sending records
> > "to" somewhere, and that "somewhere" just happens to currently be
> > exclusively topics. By re-using "to", we can send records *to other
> > KStreams*, including a KStream from an earlier point in the current
> > KStreams' pipeline, which would facilitate recursion. Sending records to
> a
> > completely different KStream would be essentially a merge.
> >
> > However, I'm happy to reduce the scope of this method to focus
> exclusively
> > on recursion: we'd simply need to add a check in to the method that
> ensures
> > the target is an ancestor node of the current KStream node.
> >
> > Which brings me to your first query...
> >
> > My argument is simply that a 0-ary method isn't enough to facilitate
> > recursive streaming, because you need to be able to communicate which
> point
> > in the process graph you want to feed your records back in to.
> >
> > Consider my example from the KIP, but re-written with a 0-ary
> > "recursively" method:
> >
> > updates
> > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > .recursively()
> >
> > Where does the join output get fed to?
> >
> >1. The "updates" (source) node?
> >2. The "join" node itself?
> >
> > It would probably be most intuitive if it simply caused the last step to
> > be recursive, but that won't always be what you want. Consider if we add
> > some more steps in to the above:
> >
> > updates
> > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make
> > sense in this algorithm, but let's pretend it does
> > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > .recursively()
> >
> > If "recursively" just feeds records back into the "join", it misses out
> on
> > potentially important steps in our recursive algorithm. It also gets even
> > worse if the step you're making recursive doesn't contain your terminal
> > condition:
> >
> > foo
> > .filter((key, value) -> value <= 0) // <-- terminal condition
> > .mapValues((value) -> value - 1)
> > .recursively()
> >
> > If "recursively" feeds records back to the "mapValues" stage in our
> > pipeline, and not in to "filter" or "foo", then the terminal condition in
> > "filter" won't be evaluated for any values with a starting value greater
> > than 0, *causing an infinite loop*.
> >
> > There's an argument to be had to always feed the values back to the first
> > ancestor "source node", in the process-graph, but that might not be
> > particularly intuitive, and is likely going to limit some of the
> recursive
> > algorithms that some may want to implement. For example, in the previous
> > example, there's no guarantee that "foo" is a source node; it could be
> the
> > result of a "mapValues", for example.
> >
> > Ultimately, the solution here is to make this method take a parameter,
> > explicitly specifying the KStream that records are fed back in to, making
> > the above two examples:
> >
> > updates
> > .map((parent, count) -> KeyValue(parent, count + 1))
> > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > .recursively(updates)
> >
> > and:
> >
> > foo
> > .filter((key, value) -> value <= 0)
> > .mapValues((value) -> value - 1)
> > .recursively(foo)
> >
> > We could *also* support a 0-ary version of the method that defaults to
> > recursively executing the previous node, but I'm worried that users may
> not
> > fully understand the consequences of this, inadvertently creating
> i

[jira] [Created] (KAFKA-14202) IQv2: Expose binary store schema to store implementations

2022-09-06 Thread John Roesler (Jira)
John Roesler created KAFKA-14202:


 Summary: IQv2: Expose binary store schema to store implementations
 Key: KAFKA-14202
 URL: https://issues.apache.org/jira/browse/KAFKA-14202
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


One feature of IQv2 is that store implementations can handle custom queries. 
Many custom query handlers will need to process the key or value bytes, for 
example deserializing them to implement some filter or aggregations, or even 
performing binary operations on them.

For the most part, this should be straightforward for users, since they provide 
Streams with the serdes, the store implementation, and the custom queries.

However, Streams will sometimes pack extra data around the data produced by the 
user-provided serdes. For example, the Timestamped store wrappers add a 
timestamp on the beginning of the value byte array. And in Windowed stores, we 
add window timestamps to the key bytes.

It would be nice to have some generic mechanism to communicate those schemas to 
the user-provided inner store layers to support users who need to write custom 
queries. For example, perhaps we can add an extractor class to the state store 
context



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-06 Thread Nick Telford
Hi Guozhang,

I mentioned this in the "Rejected Alternatives" section. Repartitioning
gives us several significant advantages over using an explicit topic and
"to":

   - Repartition topics are automatically created and managed by the
   Streams runtime; explicit topics have to be created and managed by the user.
   - Repartitioning topics have no retention criteria and instead purge
   records once consumed, this prevents data loss. Explicit topics need
   retention criteria, which have to be set large enough to avoid data loss,
   often wasting considerable resources.
   - The "recursively" method requires significantly less code than
   recursion via an explicit topic, and is significantly easier to understand.

Ultimately, I don't think repartitioning inside the unary operator adds
much complexity to the implementation. Certainly no more than other DSL
operations.

Regards,
Nick

On Tue, 6 Sept 2022 at 17:28, Guozhang Wang  wrote:

> Hello Nick,
>
> Thanks for the re-written KIP! I read through it again, and so far have
> just one quick question on top of my head regarding repartitioning: it
> seems to me that when there's an intermediate topic inside the recursion
> step, then using this new API would basically give us the same behavior as
> using the existing `to` APIs. Of course, with the new API the user can make
> it more explicit that it is supposed to be recursive, but efficiency wise
> it provides no further optimizations. Is my understanding correct? If yes,
> I'm wondering if it's worthy the complexity to allow repartitioning inside
> the unary operator, or should we just restrict the recursion inside a
> single sub-topology.
>
>
> Guozhang
>
> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford 
> wrote:
>
> > Hi everyone,
> >
> > I've re-written the KIP, with a new design that I think resolves the
> issues
> > you highlighted, and also simplifies usage.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> >
> > Note: I'm still working out the "automatic repartitioning" in my head,
> as I
> > don't think it's quite right. It may turn out that the additional
> overload
> > (with the Produced argument) is not necessary.
> >
> > Thanks for all your feedback so far. Let me know what you think!
> >
> > Regards,
> >
> > Nick
> >
> > On Thu, 25 Aug 2022 at 17:46, Nick Telford 
> wrote:
> >
> > > Hi Sophie,
> > >
> > > The reason I chose to add a new overload of "to", instead of creating a
> > > new method, is simply because I felt that "to" was about sending
> records
> > > "to" somewhere, and that "somewhere" just happens to currently be
> > > exclusively topics. By re-using "to", we can send records *to other
> > > KStreams*, including a KStream from an earlier point in the current
> > > KStreams' pipeline, which would facilitate recursion. Sending records
> to
> > a
> > > completely different KStream would be essentially a merge.
> > >
> > > However, I'm happy to reduce the scope of this method to focus
> > exclusively
> > > on recursion: we'd simply need to add a check in to the method that
> > ensures
> > > the target is an ancestor node of the current KStream node.
> > >
> > > Which brings me to your first query...
> > >
> > > My argument is simply that a 0-ary method isn't enough to facilitate
> > > recursive streaming, because you need to be able to communicate which
> > point
> > > in the process graph you want to feed your records back in to.
> > >
> > > Consider my example from the KIP, but re-written with a 0-ary
> > > "recursively" method:
> > >
> > > updates
> > > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > > .recursively()
> > >
> > > Where does the join output get fed to?
> > >
> > >1. The "updates" (source) node?
> > >2. The "join" node itself?
> > >
> > > It would probably be most intuitive if it simply caused the last step
> to
> > > be recursive, but that won't always be what you want. Consider if we
> add
> > > some more steps in to the above:
> > >
> > > updates
> > > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't
> make
> > > sense in this algorithm, but let's pretend it does
> > > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > > .recursively()
> > >
> > > If "recursively" just feeds records back into the "join", it misses out
> > on
> > > potentially important steps in our recursive algorithm. It also gets
> even
> > > worse if the step you're making recursive doesn't contain your terminal
> > > condition:
> > >
> > > foo
> > > .filter((key, value) -> value <= 0) // <-- terminal condition
> > > .mapValues((value) -> value - 1)
> > > .recursively()
> > >
> > > If "recursively" feeds records back to the "mapValues" stage in our
> > > pipeline, and not in to "filter" or "foo", then the terminal condition
> in
> > > "filter" won't be evaluated for any values with a starting value
> greater
> > > than 0, *causing an infinite loop

Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-06 Thread Nick Telford
The more I think about this, the more I think that automatic repartitioning
is not required in the "recursively" method itself. I've removed references
to this from the KIP, which further simplifies everything.

I don't see any need to restrict users from repartitioning, either before,
after or inside the "recursively" method. I can't see a scenario where the
recursion would cause problems with it.

Nick

On Tue, 6 Sept 2022 at 18:08, Nick Telford  wrote:

> Hi Guozhang,
>
> I mentioned this in the "Rejected Alternatives" section. Repartitioning
> gives us several significant advantages over using an explicit topic and
> "to":
>
>- Repartition topics are automatically created and managed by the
>Streams runtime; explicit topics have to be created and managed by the 
> user.
>- Repartitioning topics have no retention criteria and instead purge
>records once consumed, this prevents data loss. Explicit topics need
>retention criteria, which have to be set large enough to avoid data loss,
>often wasting considerable resources.
>- The "recursively" method requires significantly less code than
>recursion via an explicit topic, and is significantly easier to understand.
>
> Ultimately, I don't think repartitioning inside the unary operator adds
> much complexity to the implementation. Certainly no more than other DSL
> operations.
>
> Regards,
> Nick
>
> On Tue, 6 Sept 2022 at 17:28, Guozhang Wang  wrote:
>
>> Hello Nick,
>>
>> Thanks for the re-written KIP! I read through it again, and so far have
>> just one quick question on top of my head regarding repartitioning: it
>> seems to me that when there's an intermediate topic inside the recursion
>> step, then using this new API would basically give us the same behavior as
>> using the existing `to` APIs. Of course, with the new API the user can
>> make
>> it more explicit that it is supposed to be recursive, but efficiency wise
>> it provides no further optimizations. Is my understanding correct? If yes,
>> I'm wondering if it's worthy the complexity to allow repartitioning inside
>> the unary operator, or should we just restrict the recursion inside a
>> single sub-topology.
>>
>>
>> Guozhang
>>
>> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford 
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I've re-written the KIP, with a new design that I think resolves the
>> issues
>> > you highlighted, and also simplifies usage.
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
>> >
>> > Note: I'm still working out the "automatic repartitioning" in my head,
>> as I
>> > don't think it's quite right. It may turn out that the additional
>> overload
>> > (with the Produced argument) is not necessary.
>> >
>> > Thanks for all your feedback so far. Let me know what you think!
>> >
>> > Regards,
>> >
>> > Nick
>> >
>> > On Thu, 25 Aug 2022 at 17:46, Nick Telford 
>> wrote:
>> >
>> > > Hi Sophie,
>> > >
>> > > The reason I chose to add a new overload of "to", instead of creating
>> a
>> > > new method, is simply because I felt that "to" was about sending
>> records
>> > > "to" somewhere, and that "somewhere" just happens to currently be
>> > > exclusively topics. By re-using "to", we can send records *to other
>> > > KStreams*, including a KStream from an earlier point in the current
>> > > KStreams' pipeline, which would facilitate recursion. Sending records
>> to
>> > a
>> > > completely different KStream would be essentially a merge.
>> > >
>> > > However, I'm happy to reduce the scope of this method to focus
>> > exclusively
>> > > on recursion: we'd simply need to add a check in to the method that
>> > ensures
>> > > the target is an ancestor node of the current KStream node.
>> > >
>> > > Which brings me to your first query...
>> > >
>> > > My argument is simply that a 0-ary method isn't enough to facilitate
>> > > recursive streaming, because you need to be able to communicate which
>> > point
>> > > in the process graph you want to feed your records back in to.
>> > >
>> > > Consider my example from the KIP, but re-written with a 0-ary
>> > > "recursively" method:
>> > >
>> > > updates
>> > > .join(parents, (count, parent) -> { KeyValue(parent, count) })
>> > > .recursively()
>> > >
>> > > Where does the join output get fed to?
>> > >
>> > >1. The "updates" (source) node?
>> > >2. The "join" node itself?
>> > >
>> > > It would probably be most intuitive if it simply caused the last step
>> to
>> > > be recursive, but that won't always be what you want. Consider if we
>> add
>> > > some more steps in to the above:
>> > >
>> > > updates
>> > > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't
>> make
>> > > sense in this algorithm, but let's pretend it does
>> > > .join(parents, (count, parent) -> { KeyValue(parent, count) })
>> > > .recursively()
>> > >
>> > > If "recursively" just feeds records back into the "join", it misses
>> out
>> > on
>> 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1200

2022-09-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 507466 lines...]
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- maven-archetype-plugin:2.2:jar 
(default-jar) @ streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] Building archetype jar: 
/home/jenkins/workspace/Kafka_kafka_trunk_2/streams/quickstart/java/target/streams-quickstart-java-3.4.0-SNAPSHOT
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- 
maven-archetype-plugin:2.2:integration-test (default-integration-test) @ 
streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- maven-gpg-plugin:1.6:sign 
(sign-artifacts) @ streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- maven-install-plugin:2.5.2:install 
(default-install) @ streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_trunk_2/streams/quickstart/java/target/streams-quickstart-java-3.4.0-SNAPSHOT.jar
 to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart-java/3.4.0-SNAPSHOT/streams-quickstart-java-3.4.0-SNAPSHOT.jar
[2022-09-06T18:58:20.400Z] [INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_trunk_2/streams/quickstart/java/pom.xml to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart-java/3.4.0-SNAPSHOT/streams-quickstart-java-3.4.0-SNAPSHOT.pom
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] --- 
maven-archetype-plugin:2.2:update-local-catalog (default-update-local-catalog) 
@ streams-quickstart-java ---
[2022-09-06T18:58:20.400Z] [INFO] 

[2022-09-06T18:58:20.400Z] [INFO] Reactor Summary for Kafka Streams :: 
Quickstart 3.4.0-SNAPSHOT:
[2022-09-06T18:58:20.400Z] [INFO] 
[2022-09-06T18:58:20.400Z] [INFO] Kafka Streams :: Quickstart 
 SUCCESS [  1.495 s]
[2022-09-06T18:58:20.400Z] [INFO] streams-quickstart-java 
 SUCCESS [  0.817 s]
[2022-09-06T18:58:20.400Z] [INFO] 

[2022-09-06T18:58:20.400Z] [INFO] BUILD SUCCESS
[2022-09-06T18:58:20.400Z] [INFO] 

[2022-09-06T18:58:20.400Z] [INFO] Total time:  2.552 s
[2022-09-06T18:58:20.400Z] [INFO] Finished at: 2022-09-06T18:58:20Z
[2022-09-06T18:58:20.400Z] [INFO] 

[Pipeline] dir
[2022-09-06T18:58:20.915Z] Running in 
/home/jenkins/workspace/Kafka_kafka_trunk_2/streams/quickstart/test-streams-archetype
[Pipeline] {
[Pipeline] sh
[2022-09-06T18:58:22.429Z] 
[2022-09-06T18:58:22.429Z] IQv2IntegrationTest > shouldFetchFromPartition() 
PASSED
[2022-09-06T18:58:22.429Z] 
[2022-09-06T18:58:22.429Z] IQv2IntegrationTest > 
shouldFetchExplicitlyFromAllPartitions() STARTED
[2022-09-06T18:58:23.234Z] + echo Y
[2022-09-06T18:58:23.234Z] + mvn archetype:generate -DarchetypeCatalog=local 
-DarchetypeGroupId=org.apache.kafka 
-DarchetypeArtifactId=streams-quickstart-java -DarchetypeVersion=3.4.0-SNAPSHOT 
-DgroupId=streams.examples -DartifactId=streams.examples -Dversion=0.1 
-Dpackage=myapps
[2022-09-06T18:58:24.410Z] [INFO] Scanning for projects...
[2022-09-06T18:58:24.410Z] [INFO] 
[2022-09-06T18:58:24.410Z] [INFO] --< 
org.apache.maven:standalone-pom >---
[2022-09-06T18:58:24.410Z] [INFO] Building Maven Stub Project (No POM) 1
[2022-09-06T18:58:24.410Z] [INFO] [ pom 
]-
[2022-09-06T18:58:24.410Z] [INFO] 
[2022-09-06T18:58:24.410Z] [INFO] >>> maven-archetype-plugin:3.2.1:generate 
(default-cli) > generate-sources @ standalone-pom >>>
[2022-09-06T18:58:24.410Z] [INFO] 
[2022-09-06T18:58:24.410Z] [INFO] <<< maven-archetype-plugin:3.2.1:generate 
(default-cli) < generate-sources @ standalone-pom <<<
[2022-09-06T18:58:24.410Z] [INFO] 
[2022-09-06T18:58:24.410Z] [INFO] 
[2022-09-06T18:58:24.410Z] [INFO] --- maven-archetype-plugin:3.2.1:generate 
(default-cli) @ standalone-pom ---
[2022-09-06T18:58:24.582Z] 
[2022-09-06T18:58:24.582Z] IQv2IntegrationTest > 
shouldFetchExplicitlyFromAllPartitions() PASSED
[2022-09-06T18:58:24.582Z] 
[2022-09-06T18:58:24.582Z] IQv2IntegrationTest > shouldFailUnknownStore() 
STARTED
[2022-09-06T18:58:24.582Z] 
[2022-09-06T18:58:24.582Z] IQv2IntegrationTest > shouldFailUnknownStore() PASSED
[2022-09-06T18:58:24.582Z] 
[2022-09-06T18:58:24.582Z] IQv2IntegrationTest > shouldRejectNonRunningActive() 
STARTED
[2022-09-06T18:58:25.504Z] [INFO] Generating project in Interactive mode
[2022-09-

[jira] [Created] (KAFKA-14203) KRaft broker should disable snapshot generation after error replaying the metadata log

2022-09-06 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-14203:
--

 Summary: KRaft broker should disable snapshot generation after 
error replaying the metadata log
 Key: KAFKA-14203
 URL: https://issues.apache.org/jira/browse/KAFKA-14203
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.0
Reporter: Jose Armando Garcia Sancio
 Fix For: 3.3.0


The broker skips records for which there was an error when replaying the log. 
This means that the MetadataImage has diverged from the state persistent in the 
log. The broker should disable snapshot generation else the next time a 
snapshot gets generated it will result in inconsistent data getting persisted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14204) QuorumController must correctly handle overly large batches

2022-09-06 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-14204:


 Summary: QuorumController must correctly handle overly large 
batches
 Key: KAFKA-14204
 URL: https://issues.apache.org/jira/browse/KAFKA-14204
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #59

2022-09-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 573767 lines...]
[2022-09-06T20:11:16.653Z] 
[2022-09-06T20:11:16.653Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] STARTED
[2022-09-06T20:11:25.963Z] 
[2022-09-06T20:11:25.963Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = false] PASSED
[2022-09-06T20:11:25.963Z] 
[2022-09-06T20:11:25.963Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED
[2022-09-06T20:11:28.504Z] 
[2022-09-06T20:11:28.505Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] PASSED
[2022-09-06T20:11:28.505Z] 
[2022-09-06T20:11:28.505Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] STARTED
[2022-09-06T20:11:37.802Z] 
[2022-09-06T20:11:37.802Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED
[2022-09-06T20:11:37.802Z] 
[2022-09-06T20:11:37.802Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] STARTED
[2022-09-06T20:11:40.342Z] 
[2022-09-06T20:11:40.343Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED
[2022-09-06T20:11:40.343Z] 
[2022-09-06T20:11:40.343Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] STARTED
[2022-09-06T20:11:49.643Z] 
[2022-09-06T20:11:49.643Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] PASSED
[2022-09-06T20:11:49.643Z] 
[2022-09-06T20:11:49.643Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] STARTED
[2022-09-06T20:11:52.185Z] 
[2022-09-06T20:11:52.185Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] PASSED
[2022-09-06T20:11:52.185Z] 
[2022-09-06T20:11:52.185Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED
[2022-09-06T20:11:53.217Z] 
[2022-09-06T20:11:53.217Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation PASSED
[2022-09-06T20:11:53.217Z] 
[2022-09-06T20:11:53.217Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation STARTED
[2022-09-06T20:11:55.457Z] 
[2022-09-06T20:11:55.457Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation PASSED
[2022-09-06T20:11:56.398Z] 
[2022-09-06T20:11:56.398Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted STARTED
[2022-09-06T20:11:58.348Z] 
[2022-09-06T20:11:58.348Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] PASSED
[2022-09-06T20:11:58.348Z] 
[2022-09-06T20:11:58.348Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] STARTED
[2022-09-06T20:12:03.526Z] 
[2022-09-06T20:12:03.526Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted PASSED
[2022-09-06T20:12:04.042Z] 
[2022-09-06T20:12:04.042Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] PASSED
[2022-09-06T20:12:04.042Z] 
[2022-09-06T20:12:04.042Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] STARTED
[2022-09-06T20:12:06.971Z] 
[2022-09-06T20:12:06.972Z] 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest > 
testHighAvailabilityTaskAssignorLargeNumConsumers STARTED
[2022-09-06T20:12:09.821Z] 
[2022-09-06T20:12:09.821Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED
[2022-09-06T20:12:09.821Z] 
[2022-09-06T20:12:09.821Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] STARTED
[2022-09-06T20:12:16.920Z] 
[2022-09-06T20:12:16.920Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] PASSED
[2022-09-06T20:12:16.920Z] 
[2022-09-06T20:12:16.920Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED
[2022-09-06T20:12:16.920Z] 
[2022-09-06T20:12:16.920Z] 
org.apache.kafka.streams.integration.TaskMetadataI

[jira] [Resolved] (KAFKA-14197) Kraft broker fails to startup after topic creation failure

2022-09-06 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14197.
--
Resolution: Duplicate

> Kraft broker fails to startup after topic creation failure
> --
>
> Key: KAFKA-14197
> URL: https://issues.apache.org/jira/browse/KAFKA-14197
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.3.0
>
>
> In kraft ControllerWriteEvent, we start by trying to apply the record to 
> controller in-memory state, then sent out the record via raft client. But if 
> there is error during sending the records, there's no way to revert the 
> change to controller in-memory state[1].
> The issue happened when creating topics, controller state is updated with 
> topic and partition metadata (ex: broker to ISR map), but the record doesn't 
> send out successfully (ex: RecordBatchTooLargeException). Then, when shutting 
> down the node, the controlled shutdown will try to remove the broker from ISR 
> by[2]:
> {code:java}
> generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]", 
> brokerId, NO_LEADER, records, 
> brokersToIsrs.partitionsWithBrokerInIsr(brokerId));{code}
>  
> After we appending the partitionChangeRecords, and send to metadata topic 
> successfully, it'll cause the brokers failed to "replay" these partition 
> change since these topic/partitions didn't get created successfully 
> previously.
> Even worse, after restarting the node, all the metadata records will replay 
> again, and the same error happened again, cause the broker cannot start up 
> successfully.
>  
> The error and call stack is like this, basically, it complains the topic 
> image can't be found
> {code:java}
> [2022-09-02 16:29:16,334] ERROR Encountered metadata loading fault: Error 
> replaying metadata log record at offset 81 
> (org.apache.kafka.server.fault.LoggingFaultHandler)
> java.lang.NullPointerException
>     at org.apache.kafka.image.TopicDelta.replay(TopicDelta.java:69)
>     at org.apache.kafka.image.TopicsDelta.replay(TopicsDelta.java:91)
>     at org.apache.kafka.image.MetadataDelta.replay(MetadataDelta.java:248)
>     at org.apache.kafka.image.MetadataDelta.replay(MetadataDelta.java:186)
>     at 
> kafka.server.metadata.BrokerMetadataListener.$anonfun$loadBatches$3(BrokerMetadataListener.scala:239)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>     at 
> kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$loadBatches(BrokerMetadataListener.scala:232)
>     at 
> kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:113)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> {code}
>  
> [1] 
> [https://github.com/apache/kafka/blob/ef65b6e566ef69b2f9b58038c98a5993563d7a68/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L779-L804]
>  
> [2] 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L1270]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: Problem with Kafka KRaft 3.1.X

2022-09-06 Thread Keith Paulson
I've had similar errors cause by mmap counts; try with
vm.max_map_count=262144


On 2022/09/01 23:57:54 Paul Brebner wrote:
> Hi all,
>
> I've been attempting to benchmark Kafka KRaft version for an ApacheCon
talk
> and have identified 2 problems:
>
> 1 - it's still impossible to create large number of partitions/topics - I
> can create more than the comparable Zookeeper version but still not
> "millions" - this is with RF=1 (as anything higher needs huge clusters to
> cope with the replication CPU overhead) only, and no load on the clusters
> yet (i.e. purely a topic/partition creation experiment).
>
> 2 - eventually the topic/partition creation command causes the Kafka
> process to fail - looks like a memory error -
>
> java.lang.OutOfMemoryError: Metaspace
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f4f554f9000, 65536, 1) failed; error='Not enough
> space' (errno=12)
>
> or similar error
>
> seems to happen consistently around 30,000+ partitions - this is on a test
> EC2 instance with 32GB Ram, 500,000 file descriptors (increased from
> default) and 64GB disk (plenty spare). I'm not an OS expert, but the kafka
> process and the OS both seem to have plenty of RAM when this error occurs.
>
> So there's 3 questions really: What's going wrong exactly? How to achieve
> more partitions? And should the topic create command (just using the CLI
at
> present to create topics) really be capable of killing the Kafka instance,
> or should it fail and throw an error, and the Kafka instance still
continue
> working...
>
> Regards, Paul Brebner
>


[jira] [Created] (KAFKA-14205) Document how to recover from kraft controller disk failure

2022-09-06 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-14205:
--

 Summary: Document how to recover from kraft controller disk failure
 Key: KAFKA-14205
 URL: https://issues.apache.org/jira/browse/KAFKA-14205
 Project: Kafka
  Issue Type: Sub-task
  Components: documentation
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1201

2022-09-06 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 504777 lines...]
[2022-09-07T05:40:03.402Z] > Task :connect:api:testClasses UP-TO-DATE
[2022-09-07T05:40:03.402Z] > Task :connect:api:testJar
[2022-09-07T05:40:04.445Z] > Task :connect:api:testSrcJar
[2022-09-07T05:40:04.445Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2022-09-07T05:40:04.445Z] > Task :connect:api:publishToMavenLocal
[2022-09-07T05:40:05.664Z] 
[2022-09-07T05:40:05.664Z] > Task :streams:javadoc
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:890:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:919:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:939:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:854:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:890:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:919:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:939:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:84:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:136:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:147:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java:101:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java:167:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: missing '#': "org.apache.kafka.streams.StreamsBuilder()"
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: can't find org.apache.kafka.streams.StreamsBuilder() in 
org.apache.kafka.streams.TopologyConfig
[2022-09-07T05:40:05.664Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java:38:
 warning - Tag @link: reference not found: ProcessorContext#forward(Object, 
Object) forwards
[2022-09-07T05:40:06.708Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/query/Position.java:44:
 warning - Tag @link: can't find query(Query,
[2022-09-07T05:40:06.708Z]  PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-09-07T05:40:06.708Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:109:
 warning - Tag @link: reference not found: this#getResult()
[2022-09-07T05:40:06.708Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:116:
 warning - Tag @link: reference not found: this#getFailureReason()
[2022-09-07T05:40:06.708Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk_2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:116:
 warning - Tag @link: reference not found: this#getFailureM

RE: Last sprint to finish line: Replace EasyMock/Powermock with Mockito

2022-09-06 Thread Christo Lolov
Hello!

This is the (roughly) bi-weekly update on the Mockito migration.

Firstly, the following PRs have been merged since the last email so thank you 
to the writers (Yash and Divij) and reviewers (Dalibor, Mickael, Yash, Bruno 
and Chris):

https://github.com/apache/kafka/pull/12459 

https://github.com/apache/kafka/pull/12473 

https://github.com/apache/kafka/pull/12509 


Secondly, this is the latest list of PRs that are in need of a review to get 
them over the line:

https://github.com/apache/kafka/pull/12409 

https://github.com/apache/kafka/pull/12418 
 (I need to respond to the comments 
on this one, so the first action is on me)
https://github.com/apache/kafka/pull/12465 

https://github.com/apache/kafka/pull/12492 

https://github.com/apache/kafka/pull/12505 
 (I need to respond to Dalibor’s 
comment on this one, but the overall PR could use some more eyes)
https://github.com/apache/kafka/pull/12524 

https://github.com/apache/kafka/pull/12527 


Lastly, I am keeping https://issues.apache.org/jira/browse/KAFKA-14133 
 and 
https://issues.apache.org/jira/browse/KAFKA-14132 
 up to date, so if anyone 
has spare bandwidth and would like to assign themselves some of the unassigned 
tests their contributions would be welcome :)

Best,
Christo