[jira] [Created] (KAFKA-14342) KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

2022-10-31 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14342:
--

 Summary: KafkaOffsetBackingStore should clear offsets for source 
partitions on tombstone messages
 Key: KAFKA-14342
 URL: https://issues.apache.org/jira/browse/KAFKA-14342
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[KafkaOffsetBackingStore|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L70]
 is used to track source connector offsets using a backing Kafka topic. It 
implements interface methods to get and set offsets using a 
[KafkaBasedLog|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L80].
 It also maintains an in-memory map containing \{partition, offset} entries for 
source connectors (which is populated via the consumer callback mechanism from 
the KafkaBasedLog). When a tombstone offset (i.e. Kafka message with a null 
value) is encountered for a source partition, the map is simply updated to make 
the value null for the corresponding partition key. For certain source 
connectors which have a lot of source partitions that are "closed" frequently, 
this can be very problematic. Imagine a file source connector which reads data 
from all files in a directory line-by-line (and where file appends are not 
tracked) - each file corresponds to a source partition here, and the offset 
would be the line number in the file. If there are millions of files being 
read, this can bring down the Connect worker due to JVM heap exhaustion (OOM) 
caused by the in-memory map in KafkaOffsetBackingStore growing too large. Even 
if the connector writes tombstone offsets for the last record in a source 
partition, this doesn't help completely since we don't currently remove entries 
from KafkaOffsetBackingStore's in-memory offset map (so the source partition 
keys will stick around) - even though we indicate 
[here|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java#L37]
 that tombstones can be used to "delete" offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14343) Write upgrade/downgrade tests for enabling the state updater

2022-10-31 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-14343:
--

 Summary: Write upgrade/downgrade tests for enabling the state 
updater 
 Key: KAFKA-14343
 URL: https://issues.apache.org/jira/browse/KAFKA-14343
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Lucas Brutschy
Assignee: Lucas Brutschy


Write a test that verifies the upgrade from a version of Streams with state 
updater disabled to a version with state updater enabled and vice versa, so 
that we can offer a save upgrade path.
 * upgrade test from a version of Streams with state updater disabled to a 
version with state updater enabled (probably a system test since the old code 
path will be removed from the code base)

 * downgrade test from a version of Streams with state updater enabled to a 
version with state updater disabled (probably a system test since the old code 
path will be removed from the code base)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1328

2022-10-31 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-31 Thread David Arthur
Happy Monday, everyone! I've updated the KIP with the following changes:

* Clarified MetadataType metric usages (broker vs controller)
* Added ZkMigrationReady tagged field to ApiVersionsResponse (for use by
KRaft controller quorum)
* Added MigrationRecord with two states: Started and Finished
* Documented ZK configs for KRaft controller
* Simplified state machine description (internally, more states will exist,
but only the four documented are interesting to operators)
* Clarified some things in Controller Migration section
* Removed KRaft -> ZK parts of Broker Registration
* Added Misconfigurations section to Failure Modes

Let me know if I've missed anything from the past two weeks of discussion.

Thanks again to everyone who has reviewed this KIP so far!

-David

On Fri, Oct 28, 2022 at 2:26 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply.
>
> 20/21. Sounds good.
>
> Could you update the doc with all the changes being discussed?
>
> Thanks,
>
> Jun
>
> On Fri, Oct 28, 2022 at 10:11 AM David Arthur
>  wrote:
>
> > Jun,
> >
> > 20/21. I was also wondering about a "migration" record. In addition to
> the
> > scenario you mentioned, we also need a way to prevent the cluster from
> > re-entering the dual write mode after the migration has been finalized. I
> > could see this happening inadvertently via a change in some configuration
> > management system. How about we add a record that marks the beginning and
> > end of the dual-write mode. The first occurrence of the record could be
> > included in the metadata transaction when we migrate data from ZK.
> >
> > With this, the active controller would decide whether to enter dual write
> > mode, finalize the migration based, or fail based on:
> >
> > * Metadata log state
> > * It's own configuration ("kafka.metadata.migration.enable",
> > "zookeeper.connect", etc)
> > * The other controllers configuration (via ApiVersionsResponse)
> >
> > WDYT?
> >
> > 22. Since we will need the fencing anyways as a safe-guard, then I agree
> > would could skip the registration of KRaft brokers in ZK to simply
> things a
> > bit.
> >
> > Thanks,
> > David
> >
> >
> >
> > On Thu, Oct 27, 2022 at 5:11 PM Jun Rao 
> wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the reply.
> > >
> > > 20/21. Relying upon the presence of ZK configs to determine whether the
> > > KRaft controller is in a dual write mode seems a bit error prone. If
> > > someone accidentally adds a ZK configuration to a brand new KRaft
> > cluster,
> > > ideally it shouldn't cause the controller to get into a weird state.
> Have
> > > we considered storing the migration state in a metadata record?
> > >
> > > 22. If we have the broker fencing logic, do we need to write the broker
> > > registration path in ZK for KRaft brokers at all?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 27, 2022 at 1:02 PM David Arthur
> > >  wrote:
> > >
> > > > Jun,
> > > >
> > > > 20/21. A KRaft controller will recover the migration state by reading
> > the
> > > > "/migration" ZNode. If the migration enable config is set, and the ZK
> > > > migration is complete, it will enter the dual-write mode. Before an
> > > > operator can decommission ZK, they will need to finalize the
> migration
> > > > which involves removing the migration config and the ZK config. I'll
> > > > clarify this in the KIP.
> > > >
> > > > 22. Yea, we could see an incorrect broker ID during that window.  If
> we
> > > > ended up with a state where we saw a ZK broker ID that conflicted
> with
> > a
> > > > KRaft broker ID, we would need to fence one of them. I would probably
> > opt
> > > > to fence the KRaft broker in that case since broker registration and
> > > > fencing is more robust in KRaft. Hopefully this is a rare case.
> > > >
> > > > 26. Sounds good.
> > > >
> > > > Thanks!
> > > > David
> > > >
> > > >
> > > > On Thu, Oct 27, 2022 at 1:34 PM Jun Rao 
> > > wrote:
> > > >
> > > > > Hi, David,
> > > > >
> > > > > Thanks for the reply. A few more comments.
> > > > >
> > > > > 20/21. Using a tagged field in ApiVersionRequest could work.
> Related
> > to
> > > > > this, how does a KRaft controller know that it's in the dual write
> > > mode?
> > > > > Does it need to read the /controller path from ZK? After the
> > migration,
> > > > > people may have the ZK cluster decommissioned, but still have the
> ZK
> > > > > configs left in the KRaft controller. Will this cause the KRaft
> > > > controller
> > > > > to be stuck because it doesn't know which mode it is in?
> > > > >
> > > > > 22. Using the ephemeral node matches the current ZK-based broker
> > > behavior
> > > > > better. However, it leaves a window for incorrect broker
> registration
> > > to
> > > > > sneak in during KRaft controller failover.
> > > > >
> > > > > 26. Then, we could just remove Broker Registration in that section.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Oct 26, 2022 at 2:21 PM David Arthur
> > > > >  wrote:
> >

[jira] [Created] (KAFKA-14344) Build EmbeddedKafkaCluster with common configs used for all clients

2022-10-31 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-14344:
-

 Summary: Build EmbeddedKafkaCluster with common configs used for 
all clients
 Key: KAFKA-14344
 URL: https://issues.apache.org/jira/browse/KAFKA-14344
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, unit tests
Reporter: Omnia Ibrahim
Assignee: Omnia Ibrahim


Connect and MirrorMaker's integration test use `EmbeddedKafkaCluster` and 
`EmbeddedConnectCluster` to setup connect cluster during testing. Both classes 
are easy to setup if the test needs vanilla clusters, however, it's a lot of 
work to make it set it up with more advanced config (for example authentication 
and authorization) where admin, consumer and producer clients need more 
configuration. 

1. I am proposing adding extra parameter `additionalClientConfigs` to 
`EmbeddedKafkaCluster` constructor. The new parameter will be used
 - Setup Producer Client in `EmbeddedKafkaCluster.doStart` which is 
initializing `producer` client that is used in `EmbeddedKafkaCluster.produce`

 - Setup Producer Client in `EmbeddedKafkaCluster.createProducer` used in 
`EmbeddedKafkaCluster.transactionalProducer`

 - Setup Admin Client in `EmbeddedKafkaCluster.createAdminClient` used in 
`EmbeddedKafkaCluster.createTopic`, `EmbeddedKafkaCluster.consumeAll`, 
`EmbeddedKafkaCluster.describeTopics` and `EmbeddedKafkaCluster.deleteTopic`

 - Setup Consumer Client in `EmbeddedKafkaCluster.createConsumer` used in 
`EmbeddedKafkaCluster.createConsumerAndSubscribeTo` and 
`EmbeddedKafkaCluster.consumeAll`

2. And add `EmbeddedConnectCluster.Builder.additionalKafkaClusterClientConfigs`.

 

Tests impacted by this 
- MirrorMaker integration tests
- `org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-876: Time based cluster metadata snapshots

2022-10-31 Thread José Armando García Sancio
Hi all,

During the implementation of this KIP and some offline discussion with
Jason Gustafson, I realized now that Kafka is generating a snapshot
every hour and the default metadata retention is to delete snapshots
after 7 days, every cluster metadata partition will have 168 (1
snapshot per hour * 24 hours per day * 7 days) snapshots. If we assume
that in most cases the size of the snapshot is determined by the
number of partitions in a cluster, a cluster with 100K partitions will
have a snapshot size of roughly 10MB (100 bytes per partition * 100k
partitions). For this kind of clusters the cluster metadata partition
will always consume around 1.7GB.

We should change the KIP and the default value for
`metadata.max.retention.bytes` to 100MB. This should limit the size of
the cluster metadata partition for large clusters but keep 7 days
worth of snapshots for small clusters.

Thanks,
-- 
-José


Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-31 Thread Bruno Cadonna

Hi Sophie,

Thank you for the KIP!

1.
I do not understand how autoscaling should work with a Streams topology 
with a stateful sub-topology that reads from the input topics. The 
simplest example is a topology that consists of only one stateful 
sub-topology. As far as I understand the upstream producer would route 
existing keys to different partitions after the partition expansion than 
before the expansion. That means Streams would -- in general -- not read 
the same keys with the same stream thread after the expansion. I think 
you proposed the solution to this in your last e-mail with the following:



Essentially whoever is responsible for calculating how many partitions 
are needed should also be responsible for directing whichever new keys 
are supposed to go into those new partitions, then pass it along to the 
upstream producer to encode in the record itself.



But I am not 100% sure if you really meant what I understand. If I 
understand it correctly, you propose that the user is responsible to 
produce the records with existing keys to the same partitions as before 
the expansion upstream. I think that is an important information that 
should be pointed out in the KIP.



2.
I would log an error and shutdown the Streams application if a custom 
partitioner is used anywhere in the topology. I think that would make 
the limitations clearer and would reduce perceived unexpected behavior 
by the users. Are there any specific reasons you propose to ignore it 
and log a warning?


Best,
Bruno

On 28.10.22 04:51, Sophie Blee-Goldman wrote:

Thanks all! I'll try to address everything but don't hesitate to call me
out if anything is missed

Colt/Lucas:

Thanks for clarifying, I think I understand your example now. Something I
didn't think to mention
earlier but hopefully clears up how this would be used in practice is that
the partitioning decision/
logic doesn't need to -- and perhaps explicitly should not be -- internal
to the StaticStreamPartitioner
interface alone. I would imagine a realistic scenario would have the
partition essentially determined
upstream of the actual application, specifically integrated with whatever
system (or person) is
making the decision to add new partition(s) in the first place. Then the
partitioner is just reading out
some field in the record key/value, possibly doing some translation to
derive the final partition number
from something like a userId if it's not encoded directly, and not actually
computing anything itself.
Does that make sense? Essentially whoever is responsible for calculating
how many partitions are
needed should also be responsible for directing whichever new keys are
supposed to go into those
new partitions, then pass it along to the upstream producer to encode in
the record itself.

In sum, I second what Lucas said about your scenario actually being a good
example of one way
to approach implementing static partitioning, ie based on time. It's just
that the semantics/logic to
interpret the target partition based on time would be external to the
application and not isolated in
the actual StaticStreamPartitioner class. Imo this makes perfect sense, as
something like IQ is
also going to be situated outside of the Streams application itself, so
presumably it can talk to
the system that is responsible for the partitioning logic for any partition
information it needs.

Bill/Sagar:

I've been going back and forth a lot on whether to open this feature up to
stateless applications or
even stateful ones as well, but feel like I've settled on having it
targeted towards both (but only) the
stateless and statically partitioned cases. Bill, my only concern about the
stateless apps was the
possibility for trouble when repartitioning a stateless application that
feeds into a stateful application
downstream. But now that I think about it, users would actually need to
ensure that any/all apps
in that pipeline could handle partition increases, so it would be
impossible for someone to mess up
something downstream with corrupted partitioning because any changes to the
output topics would
of course mean changing the input topics of those downstream apps, and they
would just shut down
if not set up to handle this -- that's the whole point of this KIP. So
I'm +1 on including the stateless folks

As for stateful applications, I feel pretty strongly that we should
discourage users from trying to use
the autoscaling feature when state is involved. However, as I touch on
again briefly in the API discussion
below, there's no way to truly prevent someone from abusing this feature if
they are determined to. So
the idea is really for us to stress and heavily document which kinds of
applications can and cannot
enable autoscaling and/or be repartitioned without resulting in significant
corruption of the results.

As for key skew, technically anything is possible -- but (a) we're
entrusting users to make smart choices
throughout this KIP, which includes being careful with the partiti

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2022-10-31 Thread Omnia Ibrahim
Hi Luke and Justine,

> For (3), you said:
> > - I have some concerns about the impact of this option on the
> transactional
> producers, for example, what will happen to an ongoing transaction
> associated with an expired PID? Would this leave the transactions in a
> "hanging" state?
>
> - How will we notify the client that the transaction can't continue due to
> an expired PID?
>
> - If PID got marked as `expired` this will mean that
> `admin.DescribeProducers` will not list them which will make
> *`kafka-transactions.sh
> --list`* a bit tricky as we can't identify if there are transactions linked
> to this expired PID or not. The same concern applies to
> *`kafka-transactions.sh
> --find-hanging`*.
>
> --> Yes, you're right. Those are also concerns for this solution.
> Currently, there's no way to notify clients about the expiration.
> Also, the ongoing transactions will be hanging. For the admin cli, we've
> never thought about it. Good point.
> In summary, to adopt this solution, there are many issues needed to get
> fixed.
>

Justin already clarified that if PID is attached to a transaction it will
not expire so identifying the transactions shouldn't be a concern anymore.
The only concern here will be that this solution will not solve the problem
if the rouge producer is a transactional producer with hanging
transactions.
If anyone faced this situation they will need to abort the hanging
transactions manually and then the solution to expire a PID can then work.

--> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is also
> workable.
> It's just these 2 attributes are not required.
> That is, it's possible we take all clients as the same one: {default
> KafkaPrinciple + default clientID}, and apply throttling on it.
> Do you have any thoughts about it?
> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>

Throttling for default KafkaPrinciple and default ClientID is useful when
we need to have a hard limit on the whole cluster and whoever is running
the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is reused
between different producer applications.
I usually find it helpful to have a way to apply throttling only on the
rough clients only once I identify them without punishing everyone on the
cluster. However, there are two problems with this
- There's no easy way at the moment to link PIDs to clientId or
KafkaPrinciple. This need to be addressed first.
- Is Justin's comment on the throttling, and the fact that will mean we
either block all requests or have to store the request in memory which in
both cases has side downs on the producer experince.

I recently had another discussion with my team and it does seem like there
> should be a way to make it more clear to the clients what is going on. A
> lot of this protocol is implicit. I'm wondering if maybe there is a way to
> improve the story for newer clients. (Ie if we choose to expire based on a
> size limit, we should include a response indicating the ID has expired.) We
> also discussed ways to redefine the guarantees so that users who have
> stronger idempotency requirements can ensure them (over availability/memory
> concerns). Let me know if you have any ideas here.
>

It may be easier to improve the experience for new clients. However, if we
improved only the new clients we may need a way to help teams who run Kafka
with rough clients on old versions by at least giving them an easy way to
identify the clientId/ or KafkaPrinciple that generated these PIDs.

For context, it's very tricky to even identify which clientId is creating
all these PIDs that caused OOM, which is a contributing part of the issue
at the moment. So maybe one option here could be adding a new metric that
tracks the number of generated PIDs per clientId. This will help the team
who runs the Kafka cluster to
- contact these rough clients and ask them to fix their clients or upgrade
to a new client if the new client version has a better experience.
- or if ended with a throttling solution this may help identify which
clientId needs to be throttled.

Maybe we can start with a solution for identifying the rough clients first
and keep looking for a solution to limit them, what do you think?

Thanks

On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan 
wrote:

> Oops.  I realized I just replied to Omnia 🤦‍♀️
>
> Here was my response for the mailing thread:
>
> Hey Omnia,
> Sorry to hear this is a problem for you as well. :(
> > * I have some concerns about the impact of this option on the
> transactional producers, for example, what will happen to an ongoing
> transaction associated with an expired PID? Would this leave the
> transactions in a "hanging" state?*
> We currently check if a transaction is ongoing and do not expire the
> producer ID if it has an ongoing transaction. I suspect we will continue to
> do this with any solution we pick.
>
> My team members and I looked 

Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-31 Thread Jun Rao
Hi, David,

Thanks for the updated KIP. A few more comments.

30. LeaderAndIsrRequest/StopReplicaRequest both have a controllerId field.
Should we add a KRaftControllerId field like UpdateMetadata?

31. "If a migration has been finalized, but the KRaft quroum comes up with
kafka.metadata.migration.enable, we must not re-enter the migration mode.
In this case, while replaying the log, the controller can see the second
MigrationRecord and know that the migration is finalized and should not be
resumed. " Hmm, do we want to keep the MigrationRecord in the snapshot and
the metadata log forever after migration is finalized? If not, we can't
know for sure whether a migration has happened or not. Also, it might be
useful to support switching back to ZK mode after the migration is
finalized, with the understanding of potential metadata loss. In that case,
we could just trim all metadata log and recopy the ZK metadata back.

32. The /migration node in ZK: Do we need last_update_time_ms since ZK
Stats already has an MTime? Also, how do we plan to use the
kraft_controller_id and kraft_controller_epoch fields?

33. Controller migration: We will force a write to the "/controller" and
"/controller_epoch" ZNodes before copying ZK data, right?

34. "Operator can remove the persistent "/controller" and
"/controller_epoch" nodes allowing for ZK controller election to take
place". I guess the operator only needs to remove the /controller path?

Thanks,

Jun

On Mon, Oct 31, 2022 at 7:17 AM David Arthur
 wrote:

> Happy Monday, everyone! I've updated the KIP with the following changes:
>
> * Clarified MetadataType metric usages (broker vs controller)
> * Added ZkMigrationReady tagged field to ApiVersionsResponse (for use by
> KRaft controller quorum)
> * Added MigrationRecord with two states: Started and Finished
> * Documented ZK configs for KRaft controller
> * Simplified state machine description (internally, more states will exist,
> but only the four documented are interesting to operators)
> * Clarified some things in Controller Migration section
> * Removed KRaft -> ZK parts of Broker Registration
> * Added Misconfigurations section to Failure Modes
>
> Let me know if I've missed anything from the past two weeks of discussion.
>
> Thanks again to everyone who has reviewed this KIP so far!
>
> -David
>
> On Fri, Oct 28, 2022 at 2:26 PM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the reply.
> >
> > 20/21. Sounds good.
> >
> > Could you update the doc with all the changes being discussed?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Oct 28, 2022 at 10:11 AM David Arthur
> >  wrote:
> >
> > > Jun,
> > >
> > > 20/21. I was also wondering about a "migration" record. In addition to
> > the
> > > scenario you mentioned, we also need a way to prevent the cluster from
> > > re-entering the dual write mode after the migration has been
> finalized. I
> > > could see this happening inadvertently via a change in some
> configuration
> > > management system. How about we add a record that marks the beginning
> and
> > > end of the dual-write mode. The first occurrence of the record could be
> > > included in the metadata transaction when we migrate data from ZK.
> > >
> > > With this, the active controller would decide whether to enter dual
> write
> > > mode, finalize the migration based, or fail based on:
> > >
> > > * Metadata log state
> > > * It's own configuration ("kafka.metadata.migration.enable",
> > > "zookeeper.connect", etc)
> > > * The other controllers configuration (via ApiVersionsResponse)
> > >
> > > WDYT?
> > >
> > > 22. Since we will need the fencing anyways as a safe-guard, then I
> agree
> > > would could skip the registration of KRaft brokers in ZK to simply
> > things a
> > > bit.
> > >
> > > Thanks,
> > > David
> > >
> > >
> > >
> > > On Thu, Oct 27, 2022 at 5:11 PM Jun Rao 
> > wrote:
> > >
> > > > Hi, David,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 20/21. Relying upon the presence of ZK configs to determine whether
> the
> > > > KRaft controller is in a dual write mode seems a bit error prone. If
> > > > someone accidentally adds a ZK configuration to a brand new KRaft
> > > cluster,
> > > > ideally it shouldn't cause the controller to get into a weird state.
> > Have
> > > > we considered storing the migration state in a metadata record?
> > > >
> > > > 22. If we have the broker fencing logic, do we need to write the
> broker
> > > > registration path in ZK for KRaft brokers at all?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 27, 2022 at 1:02 PM David Arthur
> > > >  wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > 20/21. A KRaft controller will recover the migration state by
> reading
> > > the
> > > > > "/migration" ZNode. If the migration enable config is set, and the
> ZK
> > > > > migration is complete, it will enter the dual-write mode. Before an
> > > > > operator can decommission ZK, they will need to finalize the
> > migration
> > > > > which 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1329

2022-10-31 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-31 Thread David Arthur
30. I think we can keep the single ControllerId field in those requests
since they are only used for fencing (as far as I know). Internally, the
broker components that handle those requests will compare the ControllerId
with that of MetadataCache (which is updated via UMR).

The reason we need the separate KRaftControllerId in the UpdateMetadata
code path so that we can have different connection behavior for a KRaft
controller vs ZK controller.

31. It seems reasonable to keep the MigrationRecord in the snapshot. I was
thinking the same thing in terms of understanding the loss for a
migration-after-finalization. However, once a snapshot has been taken that
includes the final MigrationRecord, we can't easily see which records came
after it.

32. You're correct, we can just use the modify time from the Stat. The
other two fields are primarily informational and are there for operators
who want to inspect the state of the migration. They aren't required for
correctness

33. Yes that's right. I detail that in "Controller Leadership" section

34. Right, I'll fix that.

Thanks,
David

On Mon, Oct 31, 2022 at 2:55 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the updated KIP. A few more comments.
>
> 30. LeaderAndIsrRequest/StopReplicaRequest both have a controllerId field.
> Should we add a KRaftControllerId field like UpdateMetadata?
>
> 31. "If a migration has been finalized, but the KRaft quroum comes up with
> kafka.metadata.migration.enable, we must not re-enter the migration mode.
> In this case, while replaying the log, the controller can see the second
> MigrationRecord and know that the migration is finalized and should not be
> resumed. " Hmm, do we want to keep the MigrationRecord in the snapshot and
> the metadata log forever after migration is finalized? If not, we can't
> know for sure whether a migration has happened or not. Also, it might be
> useful to support switching back to ZK mode after the migration is
> finalized, with the understanding of potential metadata loss. In that case,
> we could just trim all metadata log and recopy the ZK metadata back.
>
> 32. The /migration node in ZK: Do we need last_update_time_ms since ZK
> Stats already has an MTime? Also, how do we plan to use the
> kraft_controller_id and kraft_controller_epoch fields?
>
> 33. Controller migration: We will force a write to the "/controller" and
> "/controller_epoch" ZNodes before copying ZK data, right?
>
> 34. "Operator can remove the persistent "/controller" and
> "/controller_epoch" nodes allowing for ZK controller election to take
> place". I guess the operator only needs to remove the /controller path?
>
> Thanks,
>
> Jun
>
> On Mon, Oct 31, 2022 at 7:17 AM David Arthur
>  wrote:
>
> > Happy Monday, everyone! I've updated the KIP with the following changes:
> >
> > * Clarified MetadataType metric usages (broker vs controller)
> > * Added ZkMigrationReady tagged field to ApiVersionsResponse (for use by
> > KRaft controller quorum)
> > * Added MigrationRecord with two states: Started and Finished
> > * Documented ZK configs for KRaft controller
> > * Simplified state machine description (internally, more states will
> exist,
> > but only the four documented are interesting to operators)
> > * Clarified some things in Controller Migration section
> > * Removed KRaft -> ZK parts of Broker Registration
> > * Added Misconfigurations section to Failure Modes
> >
> > Let me know if I've missed anything from the past two weeks of
> discussion.
> >
> > Thanks again to everyone who has reviewed this KIP so far!
> >
> > -David
> >
> > On Fri, Oct 28, 2022 at 2:26 PM Jun Rao 
> wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the reply.
> > >
> > > 20/21. Sounds good.
> > >
> > > Could you update the doc with all the changes being discussed?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Oct 28, 2022 at 10:11 AM David Arthur
> > >  wrote:
> > >
> > > > Jun,
> > > >
> > > > 20/21. I was also wondering about a "migration" record. In addition
> to
> > > the
> > > > scenario you mentioned, we also need a way to prevent the cluster
> from
> > > > re-entering the dual write mode after the migration has been
> > finalized. I
> > > > could see this happening inadvertently via a change in some
> > configuration
> > > > management system. How about we add a record that marks the beginning
> > and
> > > > end of the dual-write mode. The first occurrence of the record could
> be
> > > > included in the metadata transaction when we migrate data from ZK.
> > > >
> > > > With this, the active controller would decide whether to enter dual
> > write
> > > > mode, finalize the migration based, or fail based on:
> > > >
> > > > * Metadata log state
> > > > * It's own configuration ("kafka.metadata.migration.enable",
> > > > "zookeeper.connect", etc)
> > > > * The other controllers configuration (via ApiVersionsResponse)
> > > >
> > > > WDYT?
> > > >
> > > > 22. Since we will need the fencing anyways as a safe-guard, then I
> > agree
>

[jira] [Created] (KAFKA-14345) Flakey DynamicConnectionQuotaTest should use correct error bounds

2022-10-31 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14345:
---

 Summary: Flakey DynamicConnectionQuotaTest should use correct 
error bounds
 Key: KAFKA-14345
 URL: https://issues.apache.org/jira/browse/KAFKA-14345
 Project: Kafka
  Issue Type: Test
  Components: network
Reporter: Greg Harris
Assignee: Greg Harris


The DynamicConnectionQuotaTest is an integration test targeting the throttling 
behavior of listeners' accept thread. This test has been flaking out recently 
with errors such as the following:
{noformat}
Caused by: org.opentest4j.AssertionFailedError: Listener PLAINTEXT connection 
rate 14.558271396827829 must be below 14.399 ==> expected:  
but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
at 
app//kafka.network.DynamicConnectionQuotaTest.verifyConnectionRate(DynamicConnectionQuotaTest.scala:412)
at 
app//kafka.network.DynamicConnectionQuotaTest.$anonfun$testDynamicListenerConnectionCreationRateQuota$4(DynamicConnectionQuotaTest.scala:227){noformat}
The test appears to be using a hard-coded error bound of 1.2f, which does not 
appear to be correct given the windowed algorithm. Instead of a hardcoded 
value, the bound should conform to the test execution to assert a more accurate 
bound.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1331

2022-10-31 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 418782 lines...]
[2022-11-01T05:13:22.359Z] 
[2022-11-01T05:13:22.359Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > AdjustStreamThreadCountTest > 
testConcurrentlyAccessThreads() PASSED
[2022-11-01T05:13:22.359Z] 
[2022-11-01T05:13:22.359Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThreadReplacement() STARTED
[2022-11-01T05:13:24.994Z] 
[2022-11-01T05:13:24.994Z] > Task :core:integrationTest
[2022-11-01T05:13:24.994Z] 
[2022-11-01T05:13:24.994Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
offsetsShouldNotGoBackwards() PASSED
[2022-11-01T05:13:24.994Z] 
[2022-11-01T05:13:24.994Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow() STARTED
[2022-11-01T05:13:29.616Z] 
[2022-11-01T05:13:29.616Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
shouldFollowLeaderEpochBasicWorkflow() PASSED
[2022-11-01T05:13:29.616Z] 
[2022-11-01T05:13:29.616Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
shouldNotAllowDivergentLogs() STARTED
[2022-11-01T05:13:34.409Z] 
[2022-11-01T05:13:34.409Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
shouldNotAllowDivergentLogs() PASSED
[2022-11-01T05:13:34.409Z] 
[2022-11-01T05:13:34.409Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
logsShouldNotDivergeOnUncleanLeaderElections() STARTED
[2022-11-01T05:13:40.304Z] 
[2022-11-01T05:13:40.304Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > EpochDrivenReplicationProtocolAcceptanceTest > 
logsShouldNotDivergeOnUncleanLeaderElections() PASSED
[2022-11-01T05:13:40.304Z] 
[2022-11-01T05:13:40.304Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts() STARTED
[2022-11-01T05:13:43.890Z] 
[2022-11-01T05:13:43.890Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts() PASSED
[2022-11-01T05:13:43.890Z] 
[2022-11-01T05:13:43.890Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() STARTED
[2022-11-01T05:13:47.568Z] 
[2022-11-01T05:13:47.568Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() PASSED
[2022-11-01T05:13:47.568Z] 
[2022-11-01T05:13:47.568Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse() STARTED
[2022-11-01T05:13:48.508Z] 
[2022-11-01T05:13:48.508Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse() PASSED
[2022-11-01T05:13:48.508Z] 
[2022-11-01T05:13:48.508Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > ReplicationUtilsTest > testUpdateLeaderAndIsr() STARTED
[2022-11-01T05:13:49.448Z] 
[2022-11-01T05:13:49.448Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > ReplicationUtilsTest > testUpdateLeaderAndIsr() PASSED
[2022-11-01T05:13:49.448Z] 
[2022-11-01T05:13:49.448Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > AllocateProducerIdsRequestTest > 
testAllocateProducersIdSentToController() > 
unit.kafka.server.AllocateProducerIdsRequestTest.testAllocateProducersIdSentToController()[1]
 STARTED
[2022-11-01T05:13:50.386Z] 
[2022-11-01T05:13:50.386Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > AllocateProducerIdsRequestTest > 
testAllocateProducersIdSentToController() > 
unit.kafka.server.AllocateProducerIdsRequestTest.testAllocateProducersIdSentToController()[1]
 PASSED
[2022-11-01T05:13:50.386Z] 
[2022-11-01T05:13:50.386Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 165 > AllocateProducerIdsRequestTest > 
testAllocateProducersIdSentToNonController() > 
unit.kafka.server.AllocateProducerIdsRequestTest.testAllocateProducersIdSentToNonController()[1]
 STARTED
[2022-11-01T05:13:53.361Z] 
[2022-11-01T05:13:53.361Z] > Task :streams:integrationTest
[2022-11-01T05:13:53.361Z] 
[2022-11-01T05:13:53.361Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 168 > AdjustStreamThreadCountTest > 
shouldResizeCacheAfterThr

Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-31 Thread Luke Chen
Hi Sophie,

Thanks for the KIP. A very useful proposal!
Some questions:

1. the staticPartition method in the interface is commented out.

2. For error handling, as you can imagine, there could be errors happening
during partition expansion.That means, the operation would be (1) take long
time to complete, or (2) get stuck somewhere with fatal errorI'd like to
know how we handle these 2 situations? For (1) I'm thinking if we should
expose some metrics for monitoring, ex: state, topics to be autoscaled, ...
etc. For (2), I'm not sure if some partitions got expanded and some not
will cause any weird issues. If no, maybe just expose a metric for
autoscaling state, and have a state said "failed" something like that

3. Could this operation get aborted? I don't think so. Maybe there should
be a note in the KIP

Thank you.
Luke


On Tue, Nov 1, 2022 at 2:15 AM Bruno Cadonna  wrote:

> Hi Sophie,
>
> Thank you for the KIP!
>
> 1.
> I do not understand how autoscaling should work with a Streams topology
> with a stateful sub-topology that reads from the input topics. The
> simplest example is a topology that consists of only one stateful
> sub-topology. As far as I understand the upstream producer would route
> existing keys to different partitions after the partition expansion than
> before the expansion. That means Streams would -- in general -- not read
> the same keys with the same stream thread after the expansion. I think
> you proposed the solution to this in your last e-mail with the following:
>
> 
> Essentially whoever is responsible for calculating how many partitions
> are needed should also be responsible for directing whichever new keys
> are supposed to go into those new partitions, then pass it along to the
> upstream producer to encode in the record itself.
> 
>
> But I am not 100% sure if you really meant what I understand. If I
> understand it correctly, you propose that the user is responsible to
> produce the records with existing keys to the same partitions as before
> the expansion upstream. I think that is an important information that
> should be pointed out in the KIP.
>
>
> 2.
> I would log an error and shutdown the Streams application if a custom
> partitioner is used anywhere in the topology. I think that would make
> the limitations clearer and would reduce perceived unexpected behavior
> by the users. Are there any specific reasons you propose to ignore it
> and log a warning?
>
> Best,
> Bruno
>
> On 28.10.22 04:51, Sophie Blee-Goldman wrote:
> > Thanks all! I'll try to address everything but don't hesitate to call me
> > out if anything is missed
> >
> > Colt/Lucas:
> >
> > Thanks for clarifying, I think I understand your example now. Something I
> > didn't think to mention
> > earlier but hopefully clears up how this would be used in practice is
> that
> > the partitioning decision/
> > logic doesn't need to -- and perhaps explicitly should not be -- internal
> > to the StaticStreamPartitioner
> > interface alone. I would imagine a realistic scenario would have the
> > partition essentially determined
> > upstream of the actual application, specifically integrated with whatever
> > system (or person) is
> > making the decision to add new partition(s) in the first place. Then the
> > partitioner is just reading out
> > some field in the record key/value, possibly doing some translation to
> > derive the final partition number
> > from something like a userId if it's not encoded directly, and not
> actually
> > computing anything itself.
> > Does that make sense? Essentially whoever is responsible for calculating
> > how many partitions are
> > needed should also be responsible for directing whichever new keys are
> > supposed to go into those
> > new partitions, then pass it along to the upstream producer to encode in
> > the record itself.
> >
> > In sum, I second what Lucas said about your scenario actually being a
> good
> > example of one way
> > to approach implementing static partitioning, ie based on time. It's just
> > that the semantics/logic to
> > interpret the target partition based on time would be external to the
> > application and not isolated in
> > the actual StaticStreamPartitioner class. Imo this makes perfect sense,
> as
> > something like IQ is
> > also going to be situated outside of the Streams application itself, so
> > presumably it can talk to
> > the system that is responsible for the partitioning logic for any
> partition
> > information it needs.
> >
> > Bill/Sagar:
> >
> > I've been going back and forth a lot on whether to open this feature up
> to
> > stateless applications or
> > even stateful ones as well, but feel like I've settled on having it
> > targeted towards both (but only) the
> > stateless and statically partitioned cases. Bill, my only concern about
> the
> > stateless apps was the
> > possibility for trouble when repartitioning a stateless application that
> > feeds into a stateful application
> > downstream. But now that I think about