Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-03 Thread Greg Harris
Frank,

The inconsistentConnectors method is related to an extremely specific
inconsistency that can happen when a worker writes some task
configurations, and then disconnects without writing a following "commit
tasks record" to the config topic.
This is a hold-over from the early days of connect from before Kafka's
transactions support, and is mostly an implementation detail.
See the `KafkaConfigBackingStore::putTaskConfigs` and
`KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
It is not expected that this method is in regular use, and is primarily for
diagnostic purposes.

What the Strimzi issue seems to describe (and probably the issue you are
facing) occurs at a higher level, when a worker is deciding whether to
write new task configs at all.
The relevant code is here:
https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
In that snippet, new task configs generated by the connector are only
written to the config topic if they differ from the current contents of the
config topic. And this comparison is done on the post-transformation
configurations, after ConfigProviders have been resolved.
And critical for this bug, that resolution is done twice in quick
succession, when the old and new configuration could evaluate to the same
final result.
The code snippet also shows why your workaround works: the other condition
for writing all of the task configs to the config topic is that the number
of configurations has changed.

I believe this bug is captured in
https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
progressed in some time.
There is a potentially lower-impact workaround that involves adding a nonce
to your connector configuration that changes each time you apply a new
configuration to the connector, which most connectors will pass directly to
their tasks.
But this unfortunately does not work in general, as connectors could
exclude the nonce when generating task configurations.

I hope this gives some more insight to the behavior you're seeing.

Thanks,
Greg Harris

On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes 
wrote:

> Hi, we're investigating an issue where occasionally config changes don't
> propagate to connectors/tasks.
>
> When this occurs, the only way to ensure that the configuration takes
> effect is to resize the number of tasks back down to 1 and then resize back
> up to the original number of tasks.
> In searching for others who have been bitten by this scenario we found the
> following thread on the Strimzi discussions pages:
> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
> Both the symptoms and workaround described there match what we've
> seen.We've been doing some digging into the Kafka Connect codebase to
> better understand how config.storage.topic is consumed.
> In the interest of brevity I won't repeat that entire thread of discussion
> here.
> However, I was wondering if anyone knows whether the JavaDoc suggestion on
> ClusterConfigState.inconsistentConnectors() is actually implemented in the
> clustered Worker code.i.e. "When a worker detects a connector in this
> state, it should request that the connector regenerate its task
> configurations."
> The reason I ask is because I couldn't find any references to that API
> call anywhere but in the KafkaConfigBackingStoreTest unit test cases.
> Thanks!
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-03 Thread Greg Harris
Frank,

I realized I didn't respond to the title directly, sorry about that.
The reason that `ClusterConfigState::inconsistentConnectors` is not used,
is that the effect of an inconsistent connector is applied via
`ClusterConfigState::tasks`.
If a connector is inconsistent, then the tasks method will not return any
task configurations.
This will cause the outer logic to believe that there are 0 tasks defined,
and so any connector which does request a task reconfiguration will write
any task configs that are generated by the connector.

And a task reconfiguration occurs on each connector start, and each time a
connector requests a reconfiguration.
If a reconfiguration failed (which is how the connector became
inconsistent) then it will be retried.
If the worker that had the reconfiguration fail then leaves the cluster,
then the rebalance algorithm will start the connector somewhere else, which
will trigger another task reconfiguration.

Given the above, there does not appear to be any way to have long-term
inconsistent connectors without a reconfiguration consistently failing.
If you are seeing the symptoms of long-term inconsistency (no tasks at all
for a connector) then I'd be very interested in a reproduction case for
that.

Thanks!
Greg Harris

On Fri, Feb 3, 2023 at 1:05 PM Greg Harris  wrote:

> Frank,
>
> The inconsistentConnectors method is related to an extremely specific
> inconsistency that can happen when a worker writes some task
> configurations, and then disconnects without writing a following "commit
> tasks record" to the config topic.
> This is a hold-over from the early days of connect from before Kafka's
> transactions support, and is mostly an implementation detail.
> See the `KafkaConfigBackingStore::putTaskConfigs` and
> `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
> It is not expected that this method is in regular use, and is primarily
> for diagnostic purposes.
>
> What the Strimzi issue seems to describe (and probably the issue you are
> facing) occurs at a higher level, when a worker is deciding whether to
> write new task configs at all.
> The relevant code is here:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> In that snippet, new task configs generated by the connector are only
> written to the config topic if they differ from the current contents of the
> config topic. And this comparison is done on the post-transformation
> configurations, after ConfigProviders have been resolved.
> And critical for this bug, that resolution is done twice in quick
> succession, when the old and new configuration could evaluate to the same
> final result.
> The code snippet also shows why your workaround works: the other condition
> for writing all of the task configs to the config topic is that the number
> of configurations has changed.
>
> I believe this bug is captured in
> https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
> progressed in some time.
> There is a potentially lower-impact workaround that involves adding a
> nonce to your connector configuration that changes each time you apply a
> new configuration to the connector, which most connectors will pass
> directly to their tasks.
> But this unfortunately does not work in general, as connectors could
> exclude the nonce when generating task configurations.
>
> I hope this gives some more insight to the behavior you're seeing.
>
> Thanks,
> Greg Harris
>
> On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes
>  wrote:
>
>> Hi, we're investigating an issue where occasionally config changes don't
>> propagate to connectors/tasks.
>>
>> When this occurs, the only way to ensure that the configuration takes
>> effect is to resize the number of tasks back down to 1 and then resize back
>> up to the original number of tasks.
>> In searching for others who have been bitten by this scenario we found
>> the following thread on the Strimzi discussions pages:
>> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
>> Both the symptoms and workaround described there match what we've
>> seen.We've been doing some digging into the Kafka Connect codebase to
>> better understand how config.storage.topic is consumed.
>> In the interest of brevity I won't repeat that entire thread of
>> discussion here.
>> However, I was wondering if anyone knows whether the JavaDoc suggestion
>> on ClusterConfigState.inconsistentConnectors() is actually implemented in
>> the clustered Worker code.i.e. "When a worker detects a connector in this
>> state, it should request that the connector regenerate its task
>> configurations."
>> The reason I ask is because I couldn't find any references to that API
>> call anywhere but in the KafkaConfigBackingStoreTest unit test cases.
>> Thanks!
>>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Greg Harris
Frank,

I think you're right that the KAFKA-9228 ticket doesn't capture every
possible reconfiguration that might result in a dropped restart.
The ticket calls out the FileStream connectors, which generate their
configurations by dropping unknown config values, which is relatively
uncommon.
This means that even changes to non-externalized configurations may not
trigger a restart.

We now know that dropped restarts can happen to non-FileStream connectors
with externalized config values, but a fix for one should also fix the
other.
If you're interested in contributing a fix, we would welcome the
contribution. Otherwise, I'll look into this and see what we can do about
it.
Please keep in mind the known workarounds for this bug that can improve the
behavior before a fix lands.

Thanks!
Greg

On Mon, Feb 6, 2023 at 8:50 AM Frank Grimes 
wrote:

>  Hi Greg,
> The "long-term inconsistency" we have observed is not with no tasks at
> all, but instead with all the previously running tasks remaining in a
> running state but with a previous config.
> If I'm understanding the original bug report correctly, the scope of the
> problem was thought to only affect the following built-in connectors:
> FileStreamSourceConnector and the FileStreamSinkConnector
> see
> https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990
> However, we are seeing this issue with a number of 3rd-party connectors
> not provided as part of the Kafka project as well.e.g.- Confluent's
> kafka-connect-s3 connector (
> https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's
> connector: (
> https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview
> )
> We're wondering if it would be possible to re-evaluate the impact of this
> bug and look at addressing it either with the pre-existing PR (
> https://github.com/apache/kafka/pull/7823) or a new one.
> Thanks!On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris
>  wrote:
>
>  Frank,
>
> I realized I didn't respond to the title directly, sorry about that.
> The reason that `ClusterConfigState::inconsistentConnectors` is not used,
> is that the effect of an inconsistent connector is applied via
> `ClusterConfigState::tasks`.
> If a connector is inconsistent, then the tasks method will not return any
> task configurations.
> This will cause the outer logic to believe that there are 0 tasks defined,
> and so any connector which does request a task reconfiguration will write
> any task configs that are generated by the connector.
>
> And a task reconfiguration occurs on each connector start, and each time a
> connector requests a reconfiguration.
> If a reconfiguration failed (which is how the connector became
> inconsistent) then it will be retried.
> If the worker that had the reconfiguration fail then leaves the cluster,
> then the rebalance algorithm will start the connector somewhere else, which
> will trigger another task reconfiguration.
>
> Given the above, there does not appear to be any way to have long-term
> inconsistent connectors without a reconfiguration consistently failing.
> If you are seeing the symptoms of long-term inconsistency (no tasks at all
> for a connector) then I'd be very interested in a reproduction case for
> that.
>
> Thanks!
> Greg Harris
>
> On Fri, Feb 3, 2023 at 1:05 PM Greg Harris  wrote:
>
> > Frank,
> >
> > The inconsistentConnectors method is related to an extremely specific
> > inconsistency that can happen when a worker writes some task
> > configurations, and then disconnects without writing a following "commit
> > tasks record" to the config topic.
> > This is a hold-over from the early days of connect from before Kafka's
> > transactions support, and is mostly an implementation detail.
> > See the `KafkaConfigBackingStore::putTaskConfigs` and
> > `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant
> code.
> > It is not expected that this method is in regular use, and is primarily
> > for diagnostic purposes.
> >
> > What the Strimzi issue seems to describe (and probably the issue you are
> > facing) occurs at a higher level, when a worker is deciding whether to
> > write new task configs at all.
> > The relevant code is here:
> >
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> > In that snippet, new task configs generated by the connector are only
> > written to the config topic if they diff

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Greg Harris
Frank,

I don't think that the fix needs to necessarily follow the #12450 PR, we
can choose to start from scratch now that we know more about the issue.
If that PR is useful as a starting point, we can also include it, that is
up to you.

Greg

On Mon, Feb 6, 2023 at 10:21 AM Frank Grimes
 wrote:

>
> Hi Greg,
> I actually just found the following comment on this PR for
> https://issues.apache.org/jira/browse/KAFKA-13809:
> https://github.com/apache/kafka/pull/12450
> > we get the same behavior (KAFKA-9228 notwithstanding) by passing the
> original properties through to tasks transparently
> It seems we're not the first to notice that the issue isn't limited to
> connectors who selectively propagate properties to the task configs.FWIW,
> the kafka-connect-s3 connector also does not seem to prune any configs from
> the tasks:
> https://github.com/confluentinc/kafka-connect-storage-cloud/blob/d21662e78286d79b938e7b9affa418b863a6299f/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnector.java#L57-L77
> Is the patch provided on https://github.com/apache/kafka/pull/7823 still
> the right approach?
> Thanks!
>
>
>
>


Re: Mirror maker worker can't issue with REST uri

2023-02-07 Thread Greg Harris
Anup,

This is the expected behavior of the MirrorMaker2 application when a
connector attempts to reconfigure it's tasks.
It is a limitation of the MirrorMaker2 distributed mode, and has an
improvement in-progress that I don't believe has been released yet. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters
for more details.
As a workaround, I believe you can restart the MirrorMaker2 connectors to
force a reconfiguration.

I hope this helps,
Greg Harris

On Tue, Feb 7, 2023, 10:47 PM Shirolkar, Anup
 wrote:

> Hi,
>
> I have deployed a 3-node mirror maker cluster version 3.2.1
> I have configured the connect-mirror-maker.properties file and started the
> mirror service using connect-mirror-maker.sh
>
> It runs fine but one of the three workers always gets below exception.
> If I restart the connect worker with the error, another worker gets the
> same error.
>
>
> [2023-02-08 06:11:03,509] ERROR [Worker clientId=connect-2,
> groupId=ruh-mm2] Request to leader to reconfigure connector tasks failed
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1610)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Error
> trying to forward REST request: Invalid URI host: null (authority: null)
> at
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:147)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.IllegalArgumentException: Invalid URI host: null
> (authority: null)
> at
> org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521)
> at
> org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506)
> at
> org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464)
> at
> org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453)
> at
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107)
> ... 6 more
>
> What could be wrong here can you please advise.
>
>
> Thanks,
> Anup Shirolkar.
>
>
>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-08 Thread Greg Harris
Frank,

> I'm operating on the assumption that the connectors in question get stuck
in an inconsistent state
> Another thought... if an API exists to list all connectors in such a
state, then at least some monitoring/alerting could be put in place, right?

There is two different inconsistencies relevant to this discussion.
Inconsistent state (A) is when a new connector config (generation 2) has
been written to the config topic, but a full set of old task configs
(generation 1) exist in the config topic.
Inconsistent state (B) is when a new connector config (generation 2) has
been written to the config topic, and there is an incomplete set of new
task configs (generation 2) in the config topic.
Since this is a reconfiguration, it could also be a mix of generation 1 and
generation task 2 configs in the config topic.

Inconsistent state (A) is a normal part of updating a connector config; the
cluster writes the connector config to the config topic durably, and then
begins to try to asynchronously regenerate task configs.
Inconsistent state (B) is not normal, and happens when a worker is unable
to atomically write a full set of task configurations to the config topic.
The inconsistentConnectors method will report connectors in state (B), but
will not report connectors in state (A).
In state (A), you will see tasks running stale configurations. In state (B)
you will see no tasks running, as the framework will prevent starting tasks
which do not have consistent (B) task configs.
As you're not seeing the no-tasks symptom, I would put state (B) out of
mind and assume that the KafkaConfigBackingStore will give you atomic read
and write semantics for a full set of task configs at once.

> I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to
be believed, a ConnectException is thrown "if the task configurations do
not resolve inconsistencies found in the existing root and task
configurations."

It looks like ConnectExceptions are thrown after a failure to read or write
to the config topic, which is a pretty broad class of errors.
But if any such error occurs, the code cannot be guaranteed that a full set
of task configs + task commit message was written, and the topic may be in
state (A) or state (B).
This method is called specifically to resolve state (A) or state (B), and
the exception is just indicating that whatever inconsistency was present
before the call may still be there.

> Am I right that there is only one retry on that exception in
DistributedHerder.reconfigureConnectorTasksWithRetry?

No, the implementation of `reconfigureTasksWithRetry` calls itself in its
error callback, and calls reconfigureConnector to retry the operation.
This is a recursive loop, but isn't obvious because it's inside a callback.

This is the condition which is causing the issue:
https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
The DistributedHerder is comparing the generation 1 and generation 2
configurations, and erroneously believes that they are equal, when in fact
the underlying secrets have changed.
This prevents the DistributedHerder from writing generation 2 task configs
to the KafkaConfigBackingStore entirely, leaving it in state (A) without
realizing it.

Thanks for working on this issue,
Greg

On Wed, Feb 8, 2023 at 8:38 AM Frank Grimes 
wrote:

> Another thought... if an API exists to list all connectors in such a
> state, then at least some monitoring/alerting could be put in place, right?


Re: Mirror maker worker can't issue with REST uri

2023-02-08 Thread Greg Harris
Anup,

Here's the best workaround I can think of:

I think you can reconfigure the mechanisms which trigger task
reconfiguration with:
* `refresh.topics.enabled`
* `refresh.topics.interval.seconds`
* `refresh.groups.enabled`
* `refresh.groups.interval.seconds`

Disabling these mechanisms will prevent the errors entirely, but will also
disable these features operating in the background.
As an additional workaround, you can have an external process periodically
restart the MirrorSourceConnector and MirrorCheckpointConnector instances
to force refresh the topics and groups respectively.
This will replace the functionality of the MM2 feature, while still
avoiding the errors.

A lower effort but generally worse/more dangerous option would be to
temporarily disable logging of errors for the DistributedHerder.

You can find details on how to do this here:
https://rmoff.net/2019/01/29/kafka-connect-change-log-level-and-write-log-to-file/
and here
https://rmoff.net/2020/01/16/changing-the-logging-level-for-kafka-connect-dynamically/
This is not a very good solution, as this will mask other more important
logs from the DistributedHerder that you will certainly be interested in.
But if you're an hour away from a disk filling up, any solution is better
than no solution.

Good luck!
Greg

On Wed, Feb 8, 2023 at 3:39 AM Shirolkar, Anup
 wrote:

> Yes, that makes sense thanks.
> But the side effect of this is there is enormous amount of log generated.
> Is there a quick solution possible to slow down the logs.
>
> Cheers.
>
> From: Greg Harris 
> Date: Wednesday, 8 February 2023 at 1:08 pm
> To: users@kafka.apache.org 
> Subject: Re: Mirror maker worker can't issue with REST uri
> NetApp Security WARNING: This is an external email. Do not click links or
> open attachments unless you recognize the sender and know the content is
> safe.
>
>
>
>
> Anup,
>
> This is the expected behavior of the MirrorMaker2 application when a
> connector attempts to reconfigure it's tasks.
> It is a limitation of the MirrorMaker2 distributed mode, and has an
> improvement in-progress that I don't believe has been released yet. See
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters
> for more details.
> As a workaround, I believe you can restart the MirrorMaker2 connectors to
> force a reconfiguration.
>
> I hope this helps,
> Greg Harris
>
> On Tue, Feb 7, 2023, 10:47 PM Shirolkar, Anup
>  wrote:
>
> > Hi,
> >
> > I have deployed a 3-node mirror maker cluster version 3.2.1
> > I have configured the connect-mirror-maker.properties file and started
> the
> > mirror service using connect-mirror-maker.sh
> >
> > It runs fine but one of the three workers always gets below exception.
> > If I restart the connect worker with the error, another worker gets the
> > same error.
> >
> >
> > [2023-02-08 06:11:03,509] ERROR [Worker clientId=connect-2,
> > groupId=ruh-mm2] Request to leader to reconfigure connector tasks failed
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1610)
> > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Error
> > trying to forward REST request: Invalid URI host: null (authority: null)
> > at
> >
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:147)
> > at
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607)
> > at
> >
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> > at
> > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> > at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > at java.base/java.lang.Thread.run(Thread.java:829)
> > Caused by: java.lang.IllegalArgumentException: Invalid URI host: null
> > (authority: null)
> > at
> > org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521)
> > at
> > org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506)
> > at
> > org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464)
> > at
> > org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453)
> > at
> >
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107)
> > ... 6 more
> >
> > What could be wrong here can you please advise.
> >
> >
> > Thanks,
> > Anup Shirolkar.
> >
> >
> >
> >
>


Re: Kafka Mirror maker stops replicating

2023-02-08 Thread Greg Harris
Arpit,

Unfortunately from that description nothing specific is coming to mind.
The max.poll.interval indicates that the consumer is losing contact with
the Kafka cluster, but that may be caused by the replication application
hanging somewhere else.

Some clarifying questions, and things you can look into:
1. Are you using MirrorMaker, or MirrorMaker 2.0?
2. What version of MM or MM2 are you using, and with what Kafka broker
version?
3. How is your replication flow configured?
4. What is the frequency and duration of these interruptions?
5. When did the interruptions start?
6. Has anything changed in your environment recently, such as new
partitions or an upgrade?
7. Are you seeing any ERROR logs or other unique logs from the replication
flow?
8. Have you tried enabling more detailed logs and watched the progress of
the replication flow around the time it stops replicating?

Thanks,
Greg Harris


On Tue, Feb 7, 2023 at 6:27 AM Arpit Jain  wrote:

> Hi,
>
> Hope this is the right forum to ask for Kafka mirror maker issues.
> We are facing an issue where the mirror maker replicates the trades and
> then doesn't work for long time and again replicates.
> Also seeing the warning message to increase the poll interval or decrease
> the maximum batch size (max.poll.records).
>
> I have tried reducing max.poll.records to 250 but still same issues.
>
> Could anyone suggest what could be wrong?
>
> Thanks
>


Re: Kafka Mirror maker stops replicating

2023-02-08 Thread Greg Harris
Arpit,

I am not very familiar with MirrorMaker unfortunately so I won't be able to
give you any specific advice.
I also don't see any MirrorMaker-specific changes that would be relevant,
except for some minor arguments changes and the deprecation landing in 3.0.

> Its very random. It replicates for couple of hours fine and than stops for
a day.

Hopefully logging will help you to understand why the replication flow
starts and stops.
Do you have any very long timeouts which would correspond to the day-long
downtime?
Looking at MirrorMaker options, theres a `abort.on.send.failure`
configuration that may be able to cause MirrorMaker to fail fast in some
cases to allow you to debug it, and possibly auto-restart it.

> Could you tell me how can I enable more logging ?

I believe you can configure the logging by changing the KAFKA_LOG4J_OPTS
environment variable before running the mirror maker script.
For example, you could copy and modify the existing tools config:
https://github.com/apache/kafka/blob/trunk/config/tools-log4j.properties
and provide the copy to the MirrorMaker tool with:

export
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/path/to/your/tools-log4j.properties"

One additional thing you might be able to do if you catch it when it's
stalled is to take a heap dump or stacktrace of the containing JVM.
This will let you see what the process is doing, and maybe see if there are
stuck threads, excess memory, or other things preventing the replication
from progressing.

Good luck with your investigation!
Greg


On Wed, Feb 8, 2023 at 2:20 PM Arpit Jain  wrote:

> Hi Greg,
>
> Thanks for getting back to me. Please find more details below
>
> 1. Are you using MirrorMaker, or MirrorMaker 2.0?
> Mirror maker
> 2. What version of MM or MM2 are you using, and with what Kafka broker
> version?
> 3.2.3
> 3. How is your replication flow configured?
> We have upstream brokers (3 node kafka cluster) and we have one kafka
> consumers for each lower environments and it is producing message for lower
> environment kafka cluster
> 4. What is the frequency and duration of these interruptions?
> Its very random. It replicates for couple of hours fine and than stops for
> a day.
> 5. When did the interruptions start?
> Not sure about that. It could be after we moved to 3.2.3
> 6. Has anything changed in your environment recently, such as new
> partitions or an upgrade?
> No
> 7. Are you seeing any ERROR logs or other unique logs from the replication
> flow?
> Only the warning to increase max.poll.interval or decrease poll.records
> 8. Have you tried enabling more detailed logs and watched the progress of
> the replication flow around the time it stops replicating?
> Could you tell me how can I enable more logging ?
>
> Thanks,
> Arpit
>
> On Wed, Feb 8, 2023, 18:16 Greg Harris 
> wrote:
>
> > Arpit,
> >
> > Unfortunately from that description nothing specific is coming to mind.
> > The max.poll.interval indicates that the consumer is losing contact with
> > the Kafka cluster, but that may be caused by the replication application
> > hanging somewhere else.
> >
> > Some clarifying questions, and things you can look into:
> > 1. Are you using MirrorMaker, or MirrorMaker 2.0?
> > 2. What version of MM or MM2 are you using, and with what Kafka broker
> > version?
> > 3. How is your replication flow configured?
> > 4. What is the frequency and duration of these interruptions?
> > 5. When did the interruptions start?
> > 6. Has anything changed in your environment recently, such as new
> > partitions or an upgrade?
> > 7. Are you seeing any ERROR logs or other unique logs from the
> replication
> > flow?
> > 8. Have you tried enabling more detailed logs and watched the progress of
> > the replication flow around the time it stops replicating?
> >
> > Thanks,
> > Greg Harris
> >
> >
> > On Tue, Feb 7, 2023 at 6:27 AM Arpit Jain  wrote:
> >
> > > Hi,
> > >
> > > Hope this is the right forum to ask for Kafka mirror maker issues.
> > > We are facing an issue where the mirror maker replicates the trades and
> > > then doesn't work for long time and again replicates.
> > > Also seeing the warning message to increase the poll interval or
> decrease
> > > the maximum batch size (max.poll.records).
> > >
> > > I have tried reducing max.poll.records to 250 but still same issues.
> > >
> > > Could anyone suggest what could be wrong?
> > >
> > > Thanks
> > >
> >
>


Re: Mirror maker worker can't issue with REST uri

2023-02-08 Thread Greg Harris
Anup,

I realized I forgot to mention this in the previous message, sorry about
that:

The additional workarounds to restart the connectors or dynamically
reconfigure the log level will only work for MirrorMaker 2.0 running on a
regular Connect cluster which has the REST API enabled.
The MirrorMaker2 nodes started by the entry script connect-mirror-maker.sh
will not have the REST API enabled, and cannot perform these operations.

Greg

On Wed, Feb 8, 2023 at 10:02 AM Greg Harris  wrote:

> Anup,
>
> Here's the best workaround I can think of:
>
> I think you can reconfigure the mechanisms which trigger task
> reconfiguration with:
> * `refresh.topics.enabled`
> * `refresh.topics.interval.seconds`
> * `refresh.groups.enabled`
> * `refresh.groups.interval.seconds`
>
> Disabling these mechanisms will prevent the errors entirely, but will also
> disable these features operating in the background.
> As an additional workaround, you can have an external process periodically
> restart the MirrorSourceConnector and MirrorCheckpointConnector instances
> to force refresh the topics and groups respectively.
> This will replace the functionality of the MM2 feature, while still
> avoiding the errors.
>
> A lower effort but generally worse/more dangerous option would be to
> temporarily disable logging of errors for the DistributedHerder.
>
> You can find details on how to do this here:
> https://rmoff.net/2019/01/29/kafka-connect-change-log-level-and-write-log-to-file/
> and here
> https://rmoff.net/2020/01/16/changing-the-logging-level-for-kafka-connect-dynamically/
> This is not a very good solution, as this will mask other more important
> logs from the DistributedHerder that you will certainly be interested in.
> But if you're an hour away from a disk filling up, any solution is better
> than no solution.
>
> Good luck!
> Greg
>
> On Wed, Feb 8, 2023 at 3:39 AM Shirolkar, Anup
>  wrote:
>
>> Yes, that makes sense thanks.
>> But the side effect of this is there is enormous amount of log generated.
>> Is there a quick solution possible to slow down the logs.
>>
>> Cheers.
>>
>> From: Greg Harris 
>> Date: Wednesday, 8 February 2023 at 1:08 pm
>> To: users@kafka.apache.org 
>> Subject: Re: Mirror maker worker can't issue with REST uri
>> NetApp Security WARNING: This is an external email. Do not click links or
>> open attachments unless you recognize the sender and know the content is
>> safe.
>>
>>
>>
>>
>> Anup,
>>
>> This is the expected behavior of the MirrorMaker2 application when a
>> connector attempts to reconfigure it's tasks.
>> It is a limitation of the MirrorMaker2 distributed mode, and has an
>> improvement in-progress that I don't believe has been released yet. See
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters
>> for more details.
>> As a workaround, I believe you can restart the MirrorMaker2 connectors to
>> force a reconfiguration.
>>
>> I hope this helps,
>> Greg Harris
>>
>> On Tue, Feb 7, 2023, 10:47 PM Shirolkar, Anup
>>  wrote:
>>
>> > Hi,
>> >
>> > I have deployed a 3-node mirror maker cluster version 3.2.1
>> > I have configured the connect-mirror-maker.properties file and started
>> the
>> > mirror service using connect-mirror-maker.sh
>> >
>> > It runs fine but one of the three workers always gets below exception.
>> > If I restart the connect worker with the error, another worker gets the
>> > same error.
>> >
>> >
>> > [2023-02-08 06:11:03,509] ERROR [Worker clientId=connect-2,
>> > groupId=ruh-mm2] Request to leader to reconfigure connector tasks failed
>> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1610)
>> > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Error
>> > trying to forward REST request: Invalid URI host: null (authority: null)
>> > at
>> >
>> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:147)
>> > at
>> >
>> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607)
>> > at
>> >
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> > at
>> > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> > at
>> >
>> java.base/java.util.concurrent.ThreadPool

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-09 Thread Greg Harris
Frank,

The configs are being compared after ConfigProviders have been resolved.
This is happening both as a Connector config (by
ClusterConfigState::connectorConfig) and as task configs (by
ClusterConfigState::taskConfig).
This means that two configurations that have different direct contents (the
path to a secret changed) can resolve to the same value if both paths
produce the same value after resolving the config provider.
This also means that if you change the secret on disk and re-submit the
config, the new secret will be resolved in each of the ClusterConfigState
calls, and also end up looking equivalent.

> Would capturing a new generation value within the config itself on every
submitted change be a possible fix/workaround?

This is the workaround I proposed earlier in this conversation for external
users to force updates, to add a nonce to their connector configuration.
I don't think it's reasonable for the framework to do this unconditionally,
so maybe we need to find an alternative if we want to fix this for everyone
by default.

Greg

On Thu, Feb 9, 2023 at 8:26 AM Frank Grimes 
wrote:

>  I'm still having trouble understanding how the configs could match in the
> code you highlighted when we change connector and/or task config values
> when no keys are being pruned by the connector implementations in
> question.Would capturing a new generation value within the config itself on
> every submitted change be a possible fix/workaround?The possible slightly
> negative consequence of that change would be that re-submitting the same
> config which would effectively be a no-op in the current implementation
> would now force task reconfigures/restarts?
> On Wednesday, February 8, 2023, 12:47:19 PM EST, Greg Harris
>  wrote:
> > This is the condition which is causing the issue:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931>
> The DistributedHerder is comparing the generation 1 and generation 2
> configurations, and erroneously believes that they are equal, when in fact
> the underlying secrets have changed.> This prevents the DistributedHerder
> from writing generation 2 task configs to the KafkaConfigBackingStore
> entirely, leaving it in state (A) without realizing it.
>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-15 Thread Greg Harris
Frank,

> I don't think forcing the API users to introduce the nonce is desirable.

I agree. That is why the nonce is a workaround, and not a proper solution.
It's something to alleviate the symptoms in the short-term until a bugfix &
upgrade can fix it.

> Have you had any ideas on how this can be implemented within Kafka
Connect itself so that it works as expected for all users?

I have not looked into solutions in enough depth to recommend one. If I
had, the PR would be open :)

> We tried adding tasks to trigger a propagation of the task configs
(increased from 36 to 40 tasks) however that did not unblock it.So
triggering this code path did not seem to work:

You may be affected by _another_ bug which is preventing tasks from being
reconfigured which is specific to MM2:
https://issues.apache.org/jira/browse/KAFKA-10586
You can see evidence for this in the DistributedHerder ERROR log "Request
to leader to reconfigure connector tasks failed". A fix for this is
in-flight already.
It appears that Strimzi is using the kafka-mirror-maker.sh script, which is
affected:
https://github.com/strimzi/strimzi-kafka-operator/blob/97b48461d724a9c59505a9ad31b3d184476a83d7/docker-images/kafka-based/kafka/scripts/kafka_mirror_maker_run.sh#L121

> Are there any other (not overly-verbose) classes you recommend we enable
DEBUG logging on

I think you've covered the interesting ones. You can also look and see if
the Mirror*Connector classes are behaving themselves, but it doesn't appear
that the reconfiguration code path has any relevant logs.

> Also, would making the inconsistent connectors (assuming they're being
identified as such by Kafka Connect when this happens) through an API call
also make sense so that this can be detected/monitored more easily?

Unless we have evidence that the config topic being in inconsistent state
(B) as a common problem, I don't think adding monitorability for it has a
high enough ROI to be implemented.
If you feel strongly about it, then you can consider opening a KIP to
describe the public interface changes and how those interfaces would be
used for monitoring.

Inconsistent state (A) however, seems very common. I've seen it in
production myself, it's implicated in KAFKA-9228, KAFKA-10586, and is
clearly causing disturbance to real users.
Fixing the conditions which lead to state (A) is what I'm most interested
in seeing, and should be prioritized first because it's what is going to
have the highest ROI.

Right now you can find connectors in inconsistent state (A) with the
following:
* You can hand-inspect the task configs with the `GET
/{connector}/tasks-config` endpoint since 2.8.0. This does not work for
dedicated MM2 (right now) for precisely the same reason that KAFKA-10586
occurs: the REST API isn't running.
* For mechanical alerting, you can read the config topic and track the
offset for the most recent connector config and compare it with the offset
for the most recent task config. This depends on internal implementation
though, and isn't supported across releases.

I think a way of detecting state (A) via the REST API would be a valuable
addition that could get accepted, if someone is willing to do the legwork
to design, propose and implement it.
It would be valuable even without any bugs present, as the connectors have
to transit through state (A) on each reconfiguration. We can look into this
after getting some tactical fixes in place to avoid the long-term state (A).

Thanks,
Greg Harris



On Wed, Feb 15, 2023 at 9:34 AM Frank Grimes
 wrote:

> So we've just hit this issue again just with the MM2 connector and trying
> to add a new mirrored topic.We're running MirrorMaker 2 in Strimzi. i.e.
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector"We have 6 worker
> nodes.We changed the config to add a new mirror topic. i.e. append a new
> topic to the MirrorSourceConnector's "topics" config.The MM2 config topic
> reflects the change, as does viewing the config using Kowl UI.However, no
> tasks run to mirror the newly-added topic.We also do not see any updates on
> the MM2 status topic for the mirroring of that newly-added topic.
> We tried adding tasks to trigger a propagation of the task configs
> (increased from 36 to 40 tasks) however that did not unblock it.So
> triggering this code path did not seem to work:
> https://github.com/apache/kafka/blob/8cb0a5e9d3441962896b79163d141607e94d9b54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1569-L1572
> Only restarting the workers seemed to unblock the propagation of the new
> task config for the new mirrored topic.
> Hopefully this can help us narrow things down a bit...
> In the meantime we've s

Re: where is KafkaYammerMetrics.java moved to?

2023-03-22 Thread Greg Harris
Zhou,

It appears that this class was moved most recently in this PR:
https://github.com/apache/kafka/pull/11970
Checking the build on trunk, the class does appear to be in the
server-common JAR:

$ jar -tf server-common/build/libs/kafka-server-common-3.5.0-SNAPSHOT.jar |
grep Yammer
org/apache/kafka/server/metrics/KafkaYammerMetrics.class

KafkaYammerMetrics does not appear to be part of the documented API in the
most recent release https://kafka.apache.org/34/javadoc/index.html or in
prior releases.
I believe this means that you are depending on internal classes of Kafka,
and are subject to breakages like this when updating, even in patch
releases.
This is not recommended, and you should look into a long-term replacement
for the KafkaYammerMetrics class.

I hope this helps,
Greg Harris

On Wed, Mar 22, 2023 at 6:27 AM Rui  wrote:

> Hi Kafka group:
> recently I upgrade these libs to:
> kafka-clients 3.4.0
> kafka_2.13:3.4.0
> spring-kafka-test:3.0.4
>
> but I got a Class Not found issue:(stacktrace)
>
> Java.lang.NoClassDefFoundError:
> org/apache/kafka/server/metrics/KafkaYammerMetrics
> KafkaServer
> org.springframework.kafka.test.EmbeddedKafkaBroker..
>
> It seems this file was removed from kafka_2.13 lib after checking the
> github,
>
> https://github.com/apache/kafka/blob/dd62ef2eda571576a222d757beabcd7690dd8c16/core/src/main/java/kafka/metrics/KafkaYammerMetrics.java
>
> So my question is which lib I should include as the dependency?
>
> thanks
> Zhou Rui
>


Re: Sudden imbalance between partitions

2023-03-24 Thread Greg Harris
Meg,

What version are your clients, and what partitioner are you using for these
records?

If you're using the DefaultPartitioner from 2.4.0+, it has a known
imbalance flaw that is described and addressed by this KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
which was released in 3.3.0.
In order to make sure you're using the patched partitioner, the clients jar
should be on 3.3.0+ and your application should not set the
`partitioner.class` configuration, to let the producer choose the behavior.

In the short term, pausing, throttling, or restarting producers may help
resolve the imbalance, since the poor balance is caused by the state of the
producer buffers.
Adding nodes to the cluster and spreading partitions thinner may also help
increase the tolerance of each broker before it becomes unbalanced.
However, this will not solve the problem on its own, and may make it
temporarily worse while partitions are being replicated to the added nodes.
If you're already running the patched version of the partitioner, then a
more detailed investigation will be necessary.

I hope some of this helps!
Greg Harris

On Fri, Mar 24, 2023 at 11:57 AM Margaret Figura
 wrote:

> Hi,
>
> We have a 22-node Kafka 3.3.1 cluster on K8s. All data is sent with null
> partitionId and null key from 20 Java producers, so it should be
> distributed evenly across partitions. All was good for days, but a couple
> hours ago, broker 21 started receiving about 2x the data of the other
> brokers for a few topics (but not all). These topics are all 1x replicated
> and the 96 partitions are distributed evenly across brokers (each broker
> has 4 or 5 partitions). This was detected in Grafana, but I can also see
> the offsets increasing much faster for the partitions owned by broker 21 in
> KafkaOffsetsShell. What could cause this? I didn't see anything unusual in
> the broker 21 logs or the controller logs.
>
> Looking back, I noticed that broker 11 also becomes a bit unbalanced each
> day at the time when we are processing the most data, but it is only 10-15%
> higher than the others. All other brokers are quite even, including broker
> 21 until today.
>
> Any ideas on what I can check? Unfortunately we'll probably have to
> restart Kafka and/or the producers pretty soon.
>
> Thanks a lot!
> Meg
>


Re: MM2 -- failed to copy old data

2023-05-02 Thread Greg Harris
Andrew,

The broker appears to be rejecting the timestamp of the replicated record,
which is the same as the source record timestamp in MM2. I think you will
need to relax the timestamp validation, which is controlled by these
configurations:

https://kafka.apache.org/documentation/#topicconfigs_message.timestamp.difference.max.ms
https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.difference.max.ms

I believe that if you want 5-day old timestamps to be preserved, that this
configuration must be at least 5 days either globally or just for your
destination topic.

I hope this helps!
Greg Harris

On Tue, May 2, 2023 at 12:49 PM An, Hongguo (CORP)
 wrote:

> Hi:
> When I am running MM2 (3.4.0), any message older than 1 days are not
> copying, the topic has retention period as 5 days.
> Got error:
>
> org.apache.kafka.common.errors.InvalidTimestampException: Timestamp
> 1682664968019 of message with offset 0 is out of range. The timestamp
> should be within [1682965965321, 1683138765321]
>
> Please help
>
> Thanks
> Andrew
>
>
>
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>


Re: Kafka connect process listens to an unknown port

2023-05-19 Thread Greg Harris
Hey Jorge,

I looked into it, and can reproduce the second LISTEN port in a
vanilla Kafka Connect cluster without any connectors running.

Using jstack, I see that there are two threads that appear to be
waiting in the corresponding accept methods:

"RMI TCP Accept-0" #15 daemon prio=5 os_prio=31 cpu=0.37ms
elapsed=790.61s tid=0x0001370f5a00 nid=0x6d03 runnable
[0x00017255e000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method)

"qtp1530870688-33-acceptor-0@45f75f75-http_8083@43826ec{HTTP/1.1,
(http/1.1)}{0.0.0.0:8083}" #33 prio=3 os_prio=31 cpu=0.17ms
elapsed=789.45s tid=0x00012712e400 nid=0x8707 runnable
[0x0001737ca000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method)

The latter appears to be the 8083 port which is serving the REST API,
and is expected.
The former appears to be a Java Remote Method Invocation socket on
"port 0" which ends up selecting a random open port on each startup.
I do see that RMI is often used to export JMX metrics, and it appears
that kafka-run-class.sh (which is used when starting Kafka Connect) by
default enables JMX metrics.
I was able to disable the RMI port by setting the
KAFKA_JMX_OPTS="-Dkey=value" environment variable before running
bin/connect-distributed.sh. This isn't a recommendation for you, but
proves that the default KAFKA_JMX_OPTS settings from the
kafka-run-class.sh are relevant.

Thanks for the question, I learned something new!
Greg Harris

On Fri, May 19, 2023 at 4:45 AM Jorge Martin Cristobal
 wrote:
>
> Hi all,
>
> I'm testing apache kafka connect for a project and I found that the main
> process listens to two different ports, the one to provide REST api, 8083
> by default, and a different unprivileged port that changes its number each
> restart. For instance, this is fragment of the output from netstat command:
> tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java
> tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS
>
> What's the purpose of that port?. Is there any public definition of that
> interface? I haven't found any documentation wrt that port number.
> Kindly regards,
>
> Jorge M.


Re: Kafka 3.2.1 performance issue with JDK 11

2023-05-21 Thread Greg Harris
Vic,

I found an open JIRA issue that previously reported this problem:
https://issues.apache.org/jira/browse/KAFKA-10877 .
I believe one workaround is to use log4j 1.x, such as reload4j. Kafka
still relies on log4j 1.x until the planned upgrade to log4j 2.x in
kafka 4.0 https://issues.apache.org/jira/browse/KAFKA-9366 .
I will look into reviving or replacing the performance patch for 3.x.

Hope this helps,
Greg Harris

On Sun, May 21, 2023 at 6:31 AM Vic Xu  wrote:
>
> Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11 and 
> log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers is 
> experiencing CPU issues, and based on the jstack information, it seems that 
> log4j is causing the problem due to its usage of StackWalker. How to solve 
> this issue?
>
> Here is jstack information:
> "data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0 
> cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190 
> runnable  [0x7f883f6f5000]
>java.lang.Thread.State: RUNNABLE
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
>  Method)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
>  Source)
> at 
> java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown 
> Source)
> at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown 
> Source)
> at 
> java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown Source)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown 
> Source)
> at 
> java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown 
> Source)
> at 
> java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown Source)
> at 
> java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown Source)
> at 
> org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
> at 
> org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$StackFrameTraverser.consumeFrames(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.doStackWalk(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(java.base@11.0.9/Native
>  Method)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(java.base@11.0.9/Unknown
>  Source)
> at 
> java.lang.StackStreamFactory$AbstractStackWalker.walk(java.base@11.0.9/Unknown
>  Source)
> at java.lang.StackWalker.walk(java.base@11.0.9/Unknown Source)
> at 
> org.apache.logging.log4j.util.StackLocator.getCallerClass(StackLocator.java:51)
> at 
> org.apache.logging.log4j.util.StackLocatorUtil.getCallerClass(StackLocatorUtil.java:104)
> at 
> org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:50)
> at 
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:47)
> at 
> org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:33)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:363)
> at kafka.utils.Logging.logger(Logging.scala:43)
> at kafka.utils.Logging.logger$(Logging.scala:43)
> at 
> kafka.server.SessionlessFetchContext.logger$lzycompute(FetchSession.scala:364)
> - locked <0x0007fa037e58> (a kafka.server.SessionlessFetchContext)
> at kafka.server.SessionlessFetchContext.logger(FetchSession.scala:364)
> at kafka.utils.Logging.debug(Logging.scala:62)
> at kafka.utils.Logging.debug$(Logging.scala:62)
> at kafka.server.SessionlessFetchContext.debug(FetchSession.scala:364)
> at 
> kafka.server.SessionlessFetchContext.updateAndGenerateResponseData(FetchSession.scala:377)
> at 
> kafka.server.KafkaApis.processResponseCallback$1(KafkaApis.scala:932)
> at 
> kafka.server.KafkaApis.$anonfun$handleFetchRequest$33(KafkaApis.scala:965)
> at 
> kafka.server.KafkaApis.$anonfun$handleFetchRequest$33$adapted(KafkaApis.scala:965)
> at 
> kafka.server.KafkaApis$$Lambda$1241/0x0008007e4040.apply(Unknown Source)


Re: [VOTE] 3.4.1 RC1

2023-05-22 Thread Greg Harris
Hi Luke,

I performed a test upgrade of MM2 from 3.4.0 to 3.4.1-RC1, and
verified that the new offset translation logic worked as intended.

Steps I took to verify:
- Downloaded 3.4.0_2.13 from the Kafka website
- Formatted and started two 3.4.0 1-node clusters, and configured MM2
to mirror data and consumer group offsets between those clusters.
- I used the console producer to produce data in batches to the source
cluster, and verified that similar amounts of data were present in the
destination.
- Verified that pre-upgrade, offset translation occurs smoothly.
- Stopped MM2 3.4.0 and started 3.4.1-RC1 with the same configuration.
- Verified that post-upgrade, offset translation occurs in steps and
can be performed prior to the latest offset sync.

Thanks!
Greg

On Mon, May 22, 2023 at 11:37 AM Josep Prat  wrote:
>
> Hi Luke,
>
> I can confirm the
> *org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeFormat*
> consistent failure when not running on a system set on UTC-0 timezone.
>
> I verified:
> - Built from source with Java 11 and Scala 2.13
> - Checked signatures and hashes
> - Check Javadoc and checked links to JDK javadoc are functional
> - Run the unit tests (works on UTC-0 environments)
> - Run integration tests
> - Run the quickstart in KRaft and Zookeeper mode
>
> Best,
>
> On Mon, May 22, 2023 at 7:04 PM Chris Egerton 
> wrote:
>
> > Hi Luke,
> >
> > Thanks for running the release!
> >
> > Steps I took to verify:
> >
> > - Built from source with Java 11
> > - Checked signatures and checksums
> > - Ran the quickstart with Java 11 in KRaft mode
> > - Ran all unit tests
> > - - The org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeDormat
> > test case failed consistently. It appears this is because we haven't
> > backported a fix for KAFKA-14836 onto the 3.4 branch; after applying those
> > changes to my local copy of the RC's source code, the test began to pass. I
> > don't know if we want to count this as a blocker since the test failure is
> > not indicative of actual issues with the main code base, but it does seem
> > like a smooth backport is possible and would fix these test failure if we
> > want to generate a new RC.
> > - Ran all integration tests for Connect and MM2
> >
> > Aside from the noted unit test failure, evening else looks good.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, May 22, 2023, 10:50 Federico Valeri  wrote:
> >
> > > Hi Luke,
> > >
> > > - Source signature and checksum
> > > - Build from source with Java 17 and Scala 2.13
> > > - Full unit and integration test suite
> > > - Java app with staging Maven artifacts
> > >
> > > +1 (non binding)
> > >
> > > Thanks
> > > Fede
> > >
> > > PS: Links still point to RC0, but I checked and RC1 artifacts are
> > > there, including Maven. The only risk I see is that you may actually
> > > test with the wrong artifacts. To avoid any confusion, I would suggest
> > > to resend them on this thread.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, May 22, 2023 at 2:53 PM Luke Chen  wrote:
> > > >
> > > > Hello Kafka users, developers and client-developers,
> > > >
> > > > This is the 2nd candidate for release of Apache Kafka 3.4.1.
> > > >
> > > > This is a bugfix release with several fixes since the release of
> > 3.4.0. A
> > > > few of the major issues include:
> > > > - core
> > > > KAFKA-14644 
> > Process
> > > > should stop after failure in raft IO thread
> > > > KAFKA-14946  KRaft
> > > > controller node shutting down while renouncing leadership
> > > > KAFKA-14887  ZK
> > > session
> > > > timeout can cause broker to shutdown
> > > > - client
> > > > KAFKA-14639  Kafka
> > > > CooperativeStickyAssignor revokes/assigns partition in one rebalance
> > > cycle
> > > > - connect
> > > > KAFKA-12558  MM2
> > may
> > > not
> > > > sync partition offsets correctly
> > > > KAFKA-14666  MM2
> > > should
> > > > translate consumer group offsets behind replication flow
> > > > - stream
> > > > KAFKA-14172  bug:
> > > State
> > > > stores lose state when tasks are reassigned under EOS
> > > >
> > > > Release notes for the 3.4.1 release:
> > > > https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html
> > > >
> > > > *** Please download, test and vote by May 29, 2023
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > https://kafka.apache.org/KEYS
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > > https://home.apache.org/~showuon/kafka-3.4.1-rc0/
> > > >
> > > > * Maven artifacts to be voted upon:
> > > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > >
> > > > * Javadoc:
> > > 

Re: Re: Kafka 3.2.1 performance issue with JDK 11

2023-05-22 Thread Greg Harris
Vic,

While you can certainly try that, I don't know if that will solve the problem.
The reason why JDK11 appears relevant in this context is that a class
was removed between JDK8 and JDK11. I don't know if a replacement
stack inspector with better performance was added to JDK17 and used
within log4j2.
If you were to try to solve this with a JDK version change, a
downgrade to 8 may solve the problem, since the log4j library would
use a different stack inspector.

Greg

On Sun, May 21, 2023 at 11:30 PM Vic Xu  wrote:
>
> Hi Greg,
>
> I found another possible solution that is upgrade JDK from 11 to 17. Do you 
> recommend this solution?
>
> On 2023/05/21 17:58:42 Greg Harris wrote:
> > Vic,
> >
> > I found an open JIRA issue that previously reported this problem:
> > https://issues.apache.org/jira/browse/KAFKA-10877 .
> > I believe one workaround is to use log4j 1.x, such as reload4j. Kafka
> > still relies on log4j 1.x until the planned upgrade to log4j 2.x in
> > kafka 4.0 https://issues.apache.org/jira/browse/KAFKA-9366 .
> > I will look into reviving or replacing the performance patch for 3.x.
> >
> > Hope this helps,
> > Greg Harris
> >
> > On Sun, May 21, 2023 at 6:31 AM Vic Xu  wrote:
> > >
> > > Hello all,  I have a Kafka cluster deployed with version 3.2.1 , JDK 11 
> > > and log4j 2.18.0. I built my own Kafka image. One of my Kafka brokers is 
> > > experiencing CPU issues, and based on the jstack information, it seems 
> > > that log4j is causing the problem due to its usage of StackWalker. How to 
> > > solve this issue?
> > >
> > > Here is jstack information:
> > > "data-plane-kafka-request-handler-6" #59 daemon prio=5 os_prio=0 
> > > cpu=86381259.23ms elapsed=1948787.21s tid=0x7f8939c04800 nid=0x190 
> > > runnable  [0x7f883f6f5000]
> > >java.lang.Thread.State: RUNNABLE
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Native
> > >  Method)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.fetchStackFrames(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.getNextBatch(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.peekFrame(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.hasNext(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$StackFrameTraverser.tryAdvance(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.util.stream.ReferencePipeline.forEachWithCancel(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.util.stream.AbstractPipeline.copyIntoWithCancel(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.9/Unknown 
> > > Source)
> > > at 
> > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.util.stream.FindOps$FindOp.evaluateSequential(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.util.stream.AbstractPipeline.evaluate(java.base@11.0.9/Unknown 
> > > Source)
> > > at 
> > > java.util.stream.ReferencePipeline.findFirst(java.base@11.0.9/Unknown 
> > > Source)
> > > at 
> > > org.apache.logging.log4j.util.StackLocator.lambda$getCallerClass$2(StackLocator.java:57)
> > > at 
> > > org.apache.logging.log4j.util.StackLocator$$Lambda$117/0x0008001a6c40.apply(Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$StackFrameTraverser.consumeFrames(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.doStackWalk(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(java.base@11.0.9/Native
> > >  Method)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(java.base@11.0.9/Unknown
> > >  Source)
> > > at 
> > > java.lang.StackStreamFactory$AbstractStackWalker.walk(java.base@11.0.9/Unknown
> > >  Source)

Re: [VOTE] 3.4.1 RC1

2023-05-22 Thread Greg Harris
Also I'll point it out here so that everyone is aware, but I don't
think it necessarily warrants a new RC on it's own:

This patch: https://github.com/apache/kafka/pull/13005 is present on
trunk/3.5.0, but is not present in 3.4.0/3.4.1-RC1.
It is not a regression strictly, since it was already a problem in 3.4.0.

Impact: when any MM2 connector starts, it attempts to create a topic.
if that topic exists (as it will on every connector restart), it
prints an ERROR log with a TopicExistsException.
This is annoying and causes ERROR log noise, but does not otherwise
affect the operation or correctness of the connectors.

If something else forces a new RC, perhaps consider rolling this into
the next RC.

Thanks,
Greg

On Mon, May 22, 2023 at 1:47 PM Greg Harris  wrote:
>
> Hi Luke,
>
> I performed a test upgrade of MM2 from 3.4.0 to 3.4.1-RC1, and
> verified that the new offset translation logic worked as intended.
>
> Steps I took to verify:
> - Downloaded 3.4.0_2.13 from the Kafka website
> - Formatted and started two 3.4.0 1-node clusters, and configured MM2
> to mirror data and consumer group offsets between those clusters.
> - I used the console producer to produce data in batches to the source
> cluster, and verified that similar amounts of data were present in the
> destination.
> - Verified that pre-upgrade, offset translation occurs smoothly.
> - Stopped MM2 3.4.0 and started 3.4.1-RC1 with the same configuration.
> - Verified that post-upgrade, offset translation occurs in steps and
> can be performed prior to the latest offset sync.
>
> Thanks!
> Greg
>
> On Mon, May 22, 2023 at 11:37 AM Josep Prat  
> wrote:
> >
> > Hi Luke,
> >
> > I can confirm the
> > *org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeFormat*
> > consistent failure when not running on a system set on UTC-0 timezone.
> >
> > I verified:
> > - Built from source with Java 11 and Scala 2.13
> > - Checked signatures and hashes
> > - Check Javadoc and checked links to JDK javadoc are functional
> > - Run the unit tests (works on UTC-0 environments)
> > - Run integration tests
> > - Run the quickstart in KRaft and Zookeeper mode
> >
> > Best,
> >
> > On Mon, May 22, 2023 at 7:04 PM Chris Egerton 
> > wrote:
> >
> > > Hi Luke,
> > >
> > > Thanks for running the release!
> > >
> > > Steps I took to verify:
> > >
> > > - Built from source with Java 11
> > > - Checked signatures and checksums
> > > - Ran the quickstart with Java 11 in KRaft mode
> > > - Ran all unit tests
> > > - - The org.apache.kafka.common.utils.UtilsTest.testToLogDateTimeDormat
> > > test case failed consistently. It appears this is because we haven't
> > > backported a fix for KAFKA-14836 onto the 3.4 branch; after applying those
> > > changes to my local copy of the RC's source code, the test began to pass. 
> > > I
> > > don't know if we want to count this as a blocker since the test failure is
> > > not indicative of actual issues with the main code base, but it does seem
> > > like a smooth backport is possible and would fix these test failure if we
> > > want to generate a new RC.
> > > - Ran all integration tests for Connect and MM2
> > >
> > > Aside from the noted unit test failure, evening else looks good.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, May 22, 2023, 10:50 Federico Valeri  wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > - Source signature and checksum
> > > > - Build from source with Java 17 and Scala 2.13
> > > > - Full unit and integration test suite
> > > > - Java app with staging Maven artifacts
> > > >
> > > > +1 (non binding)
> > > >
> > > > Thanks
> > > > Fede
> > > >
> > > > PS: Links still point to RC0, but I checked and RC1 artifacts are
> > > > there, including Maven. The only risk I see is that you may actually
> > > > test with the wrong artifacts. To avoid any confusion, I would suggest
> > > > to resend them on this thread.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, May 22, 2023 at 2:53 PM Luke Chen  wrote:
> > > > >
> > > > > Hello Kafka users, developers and client-developers,
> > > > >
> > > > > This is the 2nd candidate for release of Apache Kafka 3.4.1.
> > > > >
> >

Re: Kafka Connect Rest Extension Question

2023-07-31 Thread Greg Harris
Hello Yang Hyung Wook,

In your post I do not see anything obviously wrong, so you may need to
do some more debugging.

1. Are you using the same jar for both the classpath and plugin.path
tests? If not, do they both contain the service loader manifest file?
You can test this with
https://docs.oracle.com/javase/tutorial/deployment/jar/view.html
2. Do you see either of these errors
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269
for filesystem-specific problems?
3. This log line
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L275
should be printed multiple times when you're using plugin.path. Are
you seeing a log including a directory containing your jar file, the
jar file itself, or only other locations?
4. If there is a dependency-related error, it will appear with this
error log: 
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L431

If the above doesn't help, can you please provide your redacted worker
startup logs after
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L120
and before the worker starts printing configurations?

Thanks,
Greg Harris

On Mon, Jul 31, 2023 at 5:38 AM 양형욱  wrote:
>
> https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
>
> There is an issue with this link where the ConnectRestExtension 
> implementation is not registered. I've done everything the kip official 
> documentation says, but can you tell me why it doesn't work?
>
> 양형욱 Yang Hyung Wook
> Global Platform Dev
>
> 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529
> Tel Mobile 010-2815-2145
> Email hyungwooky...@navercorp.com
>
>
>
> 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
> 인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
> 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
> 배포, 공개해서는 안됩니다.
> 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
> 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
>
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER Cloud 
> Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
> and attachments from your system. Thank you for your cooperation.


Re: Kafka connect Graceful stop of task failed

2023-08-16 Thread Greg Harris
Hi Robson,

Thank you for the detailed bug report.

I believe the behavior that you're describing is caused by this flaw:
https://issues.apache.org/jira/browse/KAFKA-15090 which is still under
discussion. Since the above flaw was introduced in 3.0, source
connectors need to return from poll() before the graceful shutdown
timeout to avoid the error.
You may be able to mitigate the error if the connector allows you to
reduce its poll timeout/interval to something less than the graceful
timeout, but that will depend on the specific connector
implementation, so check the documentation for your connector. I know
some implementations have received patches to compensate for this
behavior in the framework, so also consider upgrading or checking
release notes for your connectors.

As for the effects of this error: whenever a non-graceful stop occurs,
the runtime will immediately close the producer so that the task will
not be able to write any further records. However, it will still leave
resources for that task (threads, in-memory records, database
connections, etc) allocated, until the task does finally return from
poll(). While this is not desirable behavior, it seems to be treated
as just a nuisance error by most operators.

I hope this gives some context for the error message you're seeing.

Thanks,
Greg


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Greg Harris
Hey Robson,

Thanks for opening an issue on the JDBC repo, I think this is
certainly relevant feedback for the connector developers. I commented
on the issue with a potential regression that I saw, you can try
downgrading your connector to see if the behavior improves.
I also know that kafka-connect-jdbc received a patch to improve this
behavior when no data is being emitted:
https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm
not sure if that is relevant to your situation.

Thanks!
Greg

On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes  wrote:
>
> No, it stops them also.
> The problem is precisely what Greg described, now the stop signal comes
> from the same thread. So any source task which is running in a blocking way
> will not process the stop signal until the current poll finishes.
> So would need to patch source jdbc connector.
>
> On Mon, 21 Aug 2023 at 15:48, sunil chaudhari 
> wrote:
>
> > I think when you delete connector it removes the task and workers continues
> > to run.
> > When you stop it actually stops the worker.
> > Both different things.
> > Point to be noted is Worker has connector.
> > So connector should be removed before stopping the worker.
> >
> > Though I am not expert in this.
> >
> > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
> > wrote:
> >
> > > Hello Sunil
> > >
> > > I'm not calling a stop, I'm straight deleting the connectors with the
> > > DELETE. Stopping the connector is done internally during deletion.
> > >
> > > Regards
> > > Robson
> > >
> > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari <
> > sunilmchaudhar...@gmail.com
> > > >
> > > wrote:
> > >
> > > > You have to remove connectors first using delete api
> > > > and then stop the connector
> > > >
> > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
> > > > wrote:
> > > >
> > > > > Hello
> > > > >
> > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and
> > > write
> > > > to
> > > > > another Postgres tables. So using JDBC source and sink connectors.
> > > > > All works good, but whenever I stop the source connectors via the
> > rest
> > > > api:
> > > > >
> > > > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > > > >
> > > > > The connector stops fine, but not the task:
> > > > >
> > > > >
> > > > > Graceful stop of connector (connector-name-here) succeeded.
> > > > >
> > > > > Graceful stop of task (task-name-here) failed.
> > > > >
> > > > >
> > > > > It only happens with the *source* connector tasks. The sink connector
> > > > > and tasks shutdown gracefully and fine.
> > > > >
> > > > > The timeout for task shutdown has been increased, but didn't help:
> > > > >
> > > > > task.shutdown.graceful.timeout.ms=6
> > > > >
> > > > >
> > > > >
> > > > > The connectors are running once per day (during the night) to load a
> > > > > lot of data, and the error happens when I try to delete the
> > connectors
> > > > > in the middle of the day. That is, they are not actually
> > > > > executing/loading any data, it has finished already.
> > > > >
> > > > > offset.flush.interval.ms=1 in development and integration
> > > > > environments.
> > > > >
> > > > >  offset.flush.interval.ms=6 in production and uat.
> > > > >
> > > > >
> > > > > The rest of the config is pretty much the default.
> > > > >
> > > > > What could be the issue?
> > > > >
> > > > > The errors of the graceful stop of the tasks are triggering our alert
> > > > > system, so trying to get rid of those.
> > > > >
> > > > >
> > > > > Thanks a lot
> > > > >
> > > > > Robson
> > > > >
> > > >
> > >
> >


Re: Consumer group offset translation in Mirror Maker 2

2023-08-29 Thread Greg Harris
Hey Hemanth!

Thank you for asking about Mirror Maker 2! Offset translation is not
so simple, so I'll summarize the main functionality and leave some
pointers into the code for you to examine yourself.

1. After MirrorSourceTask writes a record, it receives a commitRecord
callback [1] with information about what offset the record has in the
destination.
2. The MirrorSourceTask sends this information (an OffsetSync) [2] to
a persistent topic (the Offset Syncs topic.)
3. The MirrorCheckpointTask uses an OffsetSyncStore to read the Offset
Syncs from the Offset Syncs topic [3] and store them in memory.
4. The OffsetSyncStore provides a translateDownstream method that
reads from memory and translates the offset [4].
5. The translation picks an offset sync before the offset being
translated, to obtain a downstream offset which must be earlier than
where the requested offset could be written. This changed recently in
KAFKA-12468.
6. The MirrorCheckpointTask uses the translateDownstream method when
computing the checkpoint records [5] which it then emits to the
checkpoint topic.
7. The MirrorCheckpointTask stores the last checkpoint record it
emitted and writes [6] the checkpoint downstream offset to the
consumer group offset periodically.

[1]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L184
[2]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L251
[3]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L180
[4]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L131
[5]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L169
[6]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L302

I hope the above can get you started in this area, please let me know
if you have any more questions about offset translation.

Thanks,
Greg

On Mon, Aug 28, 2023 at 9:14 PM Hemanth Savasere
 wrote:
>
> Hi,
> We're using Mirror Maker 2 for replicating messages from our primary to
> secondary cluster, it's one way replication. We're also replicating by
> the consumer group offsets by adding the below properties
>
> sync.group.offsets.enabled=true
> sync.group.offsets.interval.seconds=5
> emit.checkpoints.interval.seconds=5
>
> Wanted to know how the translation of consumer group happens from source to
> destination cluster, especially need to know "how current offset and log
> end offset translation happens from source to destination kafka cluster"
>
> Thanks in advance.
>
> --
> Regards,
> Hemanth Savasere


Re: Consumer group offset translation in Mirror Maker 2

2023-09-03 Thread Greg Harris
Hey Hemanth,

Thanks for your questions!

> I'm trying to understand the specific differences between syncing offsets 
> using RemoteClusterUtils and the inbuilt MM2 consumer offset sync feature.

The RemoteClusterUtils reads the checkpoints topic, which is the
output of the MirrorCheckpointConnector that I mentioned in step (6)
before. This means that RemoteClusterUtils does not do any translation
itself, it just gives you visibility into the translation that the
MirrorCheckpointConnector is performing. If the
MirrorCheckpointConnector is computing checkpoints incorrectly, then
the output of RemoteClusterUtils will be incorrect.
If you need the checkpoint record for a certain consumer group for
something other than syncing it to the downstream consumer group, the
RemoteClusterUtils/MirrorClient is the recommended way to get that
information. If you just want the checkpoint record in order to sync
it to the downstream consumer group, then enabling the sync feature in
MirrorMaker2 itself is simpler than building yourself an application
to do the same.

> Additionally, would it still be advisable to use 
> RemoteClusterUtils.translateOffsets() given that we are on Kafka version 
> 2.8.2, which has some known bugs related to negative consumer offsets? For 
> reference, here is the related bug report: 
> https://issues.apache.org/jira/browse/KAFKA-12635.

I would certainly recommend an upgrade to the latest MirrorMaker2. The
negative offsets symptom turns out to be caused by a very significant
data loss bug, as data is replicated but the translated offsets would
not cause you to read the replicated data:
https://issues.apache.org/jira/browse/KAFKA-12468. There is one
improvement scheduled for release shortly in 3.6.0 that should make
MM2 offset translation the best it's ever been:
https://issues.apache.org/jira/browse/KAFKA-15202 so I would recommend
using that version when it is released.

Hope this helps!
Greg

On Sun, Sep 3, 2023 at 5:51 AM Hemanth Savasere
 wrote:
>
> Hi Greg,
>
> Thank you for providing the resources to understand offset translation. I'm
> currently going through them and am gaining a better understanding of the
> process.
>
> Before delving into these resources, I was examining the source code of
> RemoteClusterUtils.translateOffsets(). I'm trying to understand the
> specific differences between syncing offsets using RemoteClusterUtils and
> the inbuilt MM2 consumer offset sync feature. Although I read through KIP
> 545 regarding Mirror Maker 2, I didn't find much context to explain why
> this approach is preferable.
>
> Additionally, would it still be advisable to use
> RemoteClusterUtils.translateOffsets() given that we are on Kafka version
> 2.8.2, which has some known bugs related to negative consumer offsets? For
> reference, here is the related bug report:
> https://issues.apache.org/jira/browse/KAFKA-12635.
>
> On Sun, Sep 3, 2023 at 6:17 PM Hemanth Savasere 
> wrote:
>
> > Hi,
> > Thanks a lot Greg for the resources to understand the offset translation,
> > I'm going through them and understanding the process.
> >
> > But before that I was also going through the source code of
> > RemoteClusterUtils.translateOffsets() and wanted to know that I did not get
> > much context on what's the difference when we sync the offsets using the
> > RemoteClusterUtils instead of using the inbuilt MM2 consumer offset sync
> > feature.
> >
> > I read through the KIP 545 regarding the Mirror Maker 2 but could not get
> > the context more on why this was being done, and can we still use the
> > RemoteClusterUtils.translateOffsets() as we are using the Kafka 2.8.2
> > version which has some bugs related to negative consumer offsets
> > https://issues.apache.org/jira/browse/KAFKA-12635
> >
> > Thanks in advance,
> >
> >
> >
> > On Tue, Aug 29, 2023 at 9:29 PM Greg Harris 
> > wrote:
> >
> >> Hey Hemanth!
> >>
> >> Thank you for asking about Mirror Maker 2! Offset translation is not
> >> so simple, so I'll summarize the main functionality and leave some
> >> pointers into the code for you to examine yourself.
> >>
> >> 1. After MirrorSourceTask writes a record, it receives a commitRecord
> >> callback [1] with information about what offset the record has in the
> >> destination.
> >> 2. The MirrorSourceTask sends this information (an OffsetSync) [2] to
> >> a persistent topic (the Offset Syncs topic.)
> >> 3. The MirrorCheckpointTask uses an OffsetSyncStore to read the Offset
> >> Syncs from the Offset Syncs topic [3] and store them in memory.
> >> 4. The OffsetSyncStore provides a translateDownstream method that

Re: [kafka-clients] [VOTE] 3.6.0 RC1

2023-09-20 Thread Greg Harris
Hi all,

I verified the functionality of KIP-898 and the recent fix for
KAFKA-15473 with the following steps:

1. I started a 3.5.1 broker, and a 3.5.1 worker with most (>400)
publicly available plugins installed
2. I captured the output of /connector-plugins
3. I upgraded the worker to 3.6.0-rc1
4. I captured the output of /connector-plugins with various settings
of plugin.discovery
5. I ran the migration script to add manifests to my plugins
6. I captured the output of /connector-plugins with various settings
of plugin.discovery
7. I downgraded the worker to 3.5.1
8. I diffed the output of /connector-plugins across the different
cases and observed the expected changes.
a. When plugins are migrated for 3.6.0, all modes produce identical results.
b. When plugins are not migrated for 3.6.0, only_scan and
hybrid_warn produce identical results, hybrid_fail crashes, and
service_load is missing plugins
c. When upgrading from 3.5.1 I see that plugins with invalid
constructors are hidden, AK plugins now have versions, multi-interface
plugins now show each interface type, and plugins using AppInfoParser
change versions.
d. The startup logs now include descriptive errors for invalid
plugins that otherwise would have been thrown at runtime
d. The fix for KAFKA-15473 prevents duplicates
e. The output for 3.5.1 after downgrading is identical to before.

+1 (non-binding)

Thanks Satish for running the release!

On Wed, Sep 20, 2023 at 8:36 AM Divij Vaidya  wrote:
>
> Hey Satish
>
> My comments about documentation misses from RC0 vote thread [1] are
> still not addressed (such as missing metric documentation, formatting
> problems etc). Could you please mention why we shouldn't consider them
> as blockers to make RC1 as the final release?
>
> [1] https://lists.apache.org/thread/cokoxzd0jtgjtrlxoq7kkzmvpm75381t
>
> On Wed, Sep 20, 2023 at 4:53 PM Satish Duggana  
> wrote:
> >
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second candidate for the release of Apache Kafka 3.6.0. Some of 
> > the major features include:
> >
> > * KIP-405 : Kafka Tiered Storage
> > * KIP-868 : KRaft Metadata Transactions
> > * KIP-875: First-class offsets support in Kafka Connect
> > * KIP-898: Modernize Connect plugin discovery
> > * KIP-938: Add more metrics for measuring KRaft performance
> > * KIP-902: Upgrade Zookeeper to 3.8.1
> > * KIP-917: Additional custom metadata for remote log segment
> >
> > Release notes for the 3.6.0 release:
> > https://home.apache.org/~satishd/kafka-3.6.0-rc1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Saturday, September 23, 8am PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~satishd/kafka-3.6.0-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~satishd/kafka-3.6.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 3.6 branch) is the 3.6.0 tag:
> > https://github.com/apache/kafka/releases/tag/3.6.0-rc1
> >
> > * Documentation:
> > https://kafka.apache.org/36/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/36/protocol.html
> >
> > * Successful Jenkins builds for the 3.6 branch:
> > There are a few runs of unit/integration tests. You can see the latest at 
> > https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/. We will continue 
> > running a few more iterations.
> > System tests:
> > We will send an update once we have the results.
> >
> > Thanks,
> > Satish.
> >
> > --
> > You received this message because you are subscribed to the Google Groups 
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an 
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To view this discussion on the web visit 
> > https://groups.google.com/d/msgid/kafka-clients/CAM-aUZ%3DuJ-SKeVFtBZwBjhLHKw4CbxF_ws%2BvQqaymGHFsC%2Bmdg%40mail.gmail.com.


Re: The Plan To Introduce Virtual Threads To Kafka Connect

2023-10-16 Thread Greg Harris
Hi Boyee,

Thanks for the suggestion, Virtual threads look like they may be
helpful for Connect, particularly in Connector plugins.

There are currently no plans to use virtual threads in the Connect
framework, maybe because we need to maintain compatibility with JDK 8
until 4.0. See https://kafka.apache.org/36/documentation.html#java for
supported Java versions.
I've opened a tracking ticket here:
https://issues.apache.org/jira/browse/KAFKA-15611 but I'll leave it
unassigned for anyone interested to spec out the work. I also see a
similar ticket for the Producer:
https://issues.apache.org/jira/browse/KAFKA-14606 .

Thanks!
Greg Harris

On Mon, Oct 16, 2023 at 6:20 AM Boyee  wrote:
>
> Kafka Connect as a kind of thread-intense program, can benifit a lot from the 
> usage of virtual threads.
> From JDK 21, released in last month, virtual threads is a formal feature of 
> JDK.
> I would like to ask if any plans exist to bring virtual threads into Kafka 
> Connect.
> Thank you.


Re: Mirror Maker 2 - offset sync from source to target

2023-10-23 Thread Greg Harris
Hi Alexander,

Thanks for using MirrorMaker2!

If you turn on `sync.group.offsets.enabled`, then the
MirrorCheckpointTask will sync the offsets if all of the following is
true:
1. The source group exists
2. The source group name matches the configured group filter
(group.filter.class, groups, groups.exclude)
3. The source group has an offset for a topic which matches the
configured topic filter (topic.filter.class, topics, topics.exclude)
4. The target group does not exist, or has no active consumers
5. The target group has no offset for a specified partition, or the
offset is earlier than the translated offset
6. MirrorCheckpointTask restarted after replication happened but
before the offset could be translated.

If one of these isn't true, you won't see translation happening.

Are you having a problem with too many consumer groups being created?
You can restrict the group or topic filters, as they're very
permissive by default.
Or is the problem that you expect the consumer groups to be created,
but they aren't? One of the above conditions may not be satisfied, or
your MirrorCheckpointConnector/Task may be malfunctioning and you'll
need to inspect the logs to debug it.

Best,
Greg Harris

On Sun, Oct 22, 2023 at 5:36 AM Alexander Shapiro (ashapiro)
 wrote:
>
> Hi
> Can someone advise please
> if sync.group.offsets.enabled : true to sync offset from source to target for 
> particular consumer group
> That group must be created on target, even if no actual consumption will be 
> done ?
>
>
>
> This email and the information contained herein is proprietary and 
> confidential and subject to the Amdocs Email Terms of Service, which you may 
> review at https://www.amdocs.com/about/email-terms-of-service 
> <https://www.amdocs.com/about/email-terms-of-service>


Re: Mirror Maker 2 - offset sync from source to target

2023-10-23 Thread Greg Harris
Hi Alexander,

Sorry I noticed an inconsistency in my last email. For point 6:
If the MirrorCheckpointTask restarts after replication but before
offset is translated, then it may not be able to perform a
translation.
If the MirrorCheckpointTask does not restart, it should be able to
perform translation.

So if your MirrorMaker2 is restarting frequently, that may prevent
consistent translation.

Thanks,
Greg

On Mon, Oct 23, 2023 at 10:46 AM Alexander Shapiro (ashapiro)
 wrote:
>
> Hi Greg,
> Thank you very much,
> it is the most detailed answer I would expect.
>
> -Original Message-
> From: Greg Harris 
> Sent: Monday, October 23, 2023 8:42 PM
> To: users@kafka.apache.org
> Subject: Re: Mirror Maker 2 - offset sync from source to target
>
> CAUTION: This email is from an external source. Please don’t open any unknown 
> links or attachments.
>
> Hi Alexander,
>
> Thanks for using MirrorMaker2!
>
> If you turn on `sync.group.offsets.enabled`, then the MirrorCheckpointTask 
> will sync the offsets if all of the following is
> true:
> 1. The source group exists
> 2. The source group name matches the configured group filter 
> (group.filter.class, groups, groups.exclude) 3. The source group has an 
> offset for a topic which matches the configured topic filter 
> (topic.filter.class, topics, topics.exclude) 4. The target group does not 
> exist, or has no active consumers 5. The target group has no offset for a 
> specified partition, or the offset is earlier than the translated offset 6. 
> MirrorCheckpointTask restarted after replication happened but before the 
> offset could be translated.
>
> If one of these isn't true, you won't see translation happening.
>
> Are you having a problem with too many consumer groups being created?
> You can restrict the group or topic filters, as they're very permissive by 
> default.
> Or is the problem that you expect the consumer groups to be created, but they 
> aren't? One of the above conditions may not be satisfied, or your 
> MirrorCheckpointConnector/Task may be malfunctioning and you'll need to 
> inspect the logs to debug it.
>
> Best,
> Greg Harris
>
> On Sun, Oct 22, 2023 at 5:36 AM Alexander Shapiro (ashapiro) 
>  wrote:
> >
> > Hi
> > Can someone advise please
> > if sync.group.offsets.enabled : true to sync offset from source to
> > target for particular consumer group That group must be created on target, 
> > even if no actual consumption will be done ?
> >
> >
> >
> > This email and the information contained herein is proprietary and
> > confidential and subject to the Amdocs Email Terms of Service, which
> > you may review at https://www.amdocs.com/about/email-terms-of-service
> > <https://www.amdocs.com/about/email-terms-of-service>
> This email and the information contained herein is proprietary and 
> confidential and subject to the Amdocs Email Terms of Service, which you may 
> review at https://www.amdocs.com/about/email-terms-of-service 
> <https://www.amdocs.com/about/email-terms-of-service>


Re: Mirror Maker 2 - offset sync from source to target

2023-10-23 Thread Greg Harris
Andrew,

Yes, there isn't an explicit "create consumer group" operation, it
should be created when MM2 emits a sync for it.

Best,
Greg

On Mon, Oct 23, 2023 at 1:15 PM Alexander Shapiro (ashapiro)
 wrote:
>
> Thanks, one clarification plz
>
> In bullet for You mention "4. The target group does not exist, or has no 
> active consumers"
> If group on target does not exist, will it be created without active 
> consumers ?
>
> -Original Message-
> From: Greg Harris 
> Sent: Monday, October 23, 2023 8:56 PM
> To: users@kafka.apache.org
> Subject: Re: Mirror Maker 2 - offset sync from source to target
>
> [You don't often get email from greg.har...@aiven.io.invalid. Learn why this 
> is important at https://aka.ms/LearnAboutSenderIdentification ]
>
> CAUTION: This email is from an external source. Please don’t open any unknown 
> links or attachments.
>
> Hi Alexander,
>
> Sorry I noticed an inconsistency in my last email. For point 6:
> If the MirrorCheckpointTask restarts after replication but before offset is 
> translated, then it may not be able to perform a translation.
> If the MirrorCheckpointTask does not restart, it should be able to perform 
> translation.
>
> So if your MirrorMaker2 is restarting frequently, that may prevent consistent 
> translation.
>
> Thanks,
> Greg
>
> On Mon, Oct 23, 2023 at 10:46 AM Alexander Shapiro (ashapiro) 
>  wrote:
> >
> > Hi Greg,
> > Thank you very much,
> > it is the most detailed answer I would expect.
> >
> > -Original Message-
> > From: Greg Harris 
> > Sent: Monday, October 23, 2023 8:42 PM
> > To: users@kafka.apache.org
> > Subject: Re: Mirror Maker 2 - offset sync from source to target
> >
> > CAUTION: This email is from an external source. Please don’t open any 
> > unknown links or attachments.
> >
> > Hi Alexander,
> >
> > Thanks for using MirrorMaker2!
> >
> > If you turn on `sync.group.offsets.enabled`, then the
> > MirrorCheckpointTask will sync the offsets if all of the following is
> > true:
> > 1. The source group exists
> > 2. The source group name matches the configured group filter 
> > (group.filter.class, groups, groups.exclude) 3. The source group has an 
> > offset for a topic which matches the configured topic filter 
> > (topic.filter.class, topics, topics.exclude) 4. The target group does not 
> > exist, or has no active consumers 5. The target group has no offset for a 
> > specified partition, or the offset is earlier than the translated offset 6. 
> > MirrorCheckpointTask restarted after replication happened but before the 
> > offset could be translated.
> >
> > If one of these isn't true, you won't see translation happening.
> >
> > Are you having a problem with too many consumer groups being created?
> > You can restrict the group or topic filters, as they're very permissive by 
> > default.
> > Or is the problem that you expect the consumer groups to be created, but 
> > they aren't? One of the above conditions may not be satisfied, or your 
> > MirrorCheckpointConnector/Task may be malfunctioning and you'll need to 
> > inspect the logs to debug it.
> >
> > Best,
> > Greg Harris
> >
> > On Sun, Oct 22, 2023 at 5:36 AM Alexander Shapiro (ashapiro) 
> >  wrote:
> > >
> > > Hi
> > > Can someone advise please
> > > if sync.group.offsets.enabled : true to sync offset from source to
> > > target for particular consumer group That group must be created on 
> > > target, even if no actual consumption will be done ?
> > >
> > >
> > >
> > > This email and the information contained herein is proprietary and
> > > confidential and subject to the Amdocs Email Terms of Service, which
> > > you may review at
> > > https://www.amdocs.com/about/email-terms-of-service
> > > <https://www.amdocs.com/about/email-terms-of-service>
> > This email and the information contained herein is proprietary and
> > confidential and subject to the Amdocs Email Terms of Service, which
> > you may review at https://www.amdocs.com/about/email-terms-of-service
> > <https://www.amdocs.com/about/email-terms-of-service>
> This email and the information contained herein is proprietary and 
> confidential and subject to the Amdocs Email Terms of Service, which you may 
> review at https://www.amdocs.com/about/email-terms-of-service 
> <https://www.amdocs.com/about/email-terms-of-service>


Re: Mirror Maker 2 - offset sync from source to target

2023-10-23 Thread Greg Harris
Alexander,

My apologies for calling you Andrew.

Greg

On Mon, Oct 23, 2023 at 1:22 PM Greg Harris  wrote:
>
> Andrew,
>
> Yes, there isn't an explicit "create consumer group" operation, it
> should be created when MM2 emits a sync for it.
>
> Best,
> Greg
>
> On Mon, Oct 23, 2023 at 1:15 PM Alexander Shapiro (ashapiro)
>  wrote:
> >
> > Thanks, one clarification plz
> >
> > In bullet for You mention "4. The target group does not exist, or has no 
> > active consumers"
> > If group on target does not exist, will it be created without active 
> > consumers ?
> >
> > -Original Message-
> > From: Greg Harris 
> > Sent: Monday, October 23, 2023 8:56 PM
> > To: users@kafka.apache.org
> > Subject: Re: Mirror Maker 2 - offset sync from source to target
> >
> > [You don't often get email from greg.har...@aiven.io.invalid. Learn why 
> > this is important at https://aka.ms/LearnAboutSenderIdentification ]
> >
> > CAUTION: This email is from an external source. Please don’t open any 
> > unknown links or attachments.
> >
> > Hi Alexander,
> >
> > Sorry I noticed an inconsistency in my last email. For point 6:
> > If the MirrorCheckpointTask restarts after replication but before offset is 
> > translated, then it may not be able to perform a translation.
> > If the MirrorCheckpointTask does not restart, it should be able to perform 
> > translation.
> >
> > So if your MirrorMaker2 is restarting frequently, that may prevent 
> > consistent translation.
> >
> > Thanks,
> > Greg
> >
> > On Mon, Oct 23, 2023 at 10:46 AM Alexander Shapiro (ashapiro) 
> >  wrote:
> > >
> > > Hi Greg,
> > > Thank you very much,
> > > it is the most detailed answer I would expect.
> > >
> > > -Original Message-
> > > From: Greg Harris 
> > > Sent: Monday, October 23, 2023 8:42 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: Mirror Maker 2 - offset sync from source to target
> > >
> > > CAUTION: This email is from an external source. Please don’t open any 
> > > unknown links or attachments.
> > >
> > > Hi Alexander,
> > >
> > > Thanks for using MirrorMaker2!
> > >
> > > If you turn on `sync.group.offsets.enabled`, then the
> > > MirrorCheckpointTask will sync the offsets if all of the following is
> > > true:
> > > 1. The source group exists
> > > 2. The source group name matches the configured group filter 
> > > (group.filter.class, groups, groups.exclude) 3. The source group has an 
> > > offset for a topic which matches the configured topic filter 
> > > (topic.filter.class, topics, topics.exclude) 4. The target group does not 
> > > exist, or has no active consumers 5. The target group has no offset for a 
> > > specified partition, or the offset is earlier than the translated offset 
> > > 6. MirrorCheckpointTask restarted after replication happened but before 
> > > the offset could be translated.
> > >
> > > If one of these isn't true, you won't see translation happening.
> > >
> > > Are you having a problem with too many consumer groups being created?
> > > You can restrict the group or topic filters, as they're very permissive 
> > > by default.
> > > Or is the problem that you expect the consumer groups to be created, but 
> > > they aren't? One of the above conditions may not be satisfied, or your 
> > > MirrorCheckpointConnector/Task may be malfunctioning and you'll need to 
> > > inspect the logs to debug it.
> > >
> > > Best,
> > > Greg Harris
> > >
> > > On Sun, Oct 22, 2023 at 5:36 AM Alexander Shapiro (ashapiro) 
> > >  wrote:
> > > >
> > > > Hi
> > > > Can someone advise please
> > > > if sync.group.offsets.enabled : true to sync offset from source to
> > > > target for particular consumer group That group must be created on 
> > > > target, even if no actual consumption will be done ?
> > > >
> > > >
> > > >
> > > > This email and the information contained herein is proprietary and
> > > > confidential and subject to the Amdocs Email Terms of Service, which
> > > > you may review at
> > > > https://www.amdocs.com/about/email-terms-of-service
> > > > <https://www.amdocs.com/about/email-terms-of-service>
> > > This email and the information contained herein is proprietary and
> > > confidential and subject to the Amdocs Email Terms of Service, which
> > > you may review at https://www.amdocs.com/about/email-terms-of-service
> > > <https://www.amdocs.com/about/email-terms-of-service>
> > This email and the information contained herein is proprietary and 
> > confidential and subject to the Amdocs Email Terms of Service, which you 
> > may review at https://www.amdocs.com/about/email-terms-of-service 
> > <https://www.amdocs.com/about/email-terms-of-service>


Re: Mirror Maker 2 - offset sync from source to target

2023-10-24 Thread Greg Harris
Alexander,

Looking at the MM2 documentation
https://kafka.apache.org/documentation/#georeplication it appears that
there's no "summary" of the auto-sync functionality, only the
documentation strings for the individual configurations.
Point (5) above is not covered in the public documentation, and you
would need to read the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
to learn about it.
Point (6) above is also not covered in the public documentation, but
has always been a hidden limitation of MirrorMaker2 for a long time
that has recently become more apparent. We still have some work to do
in order to make MirrorMaker2 more resilient to restarts.

Thanks!
Greg

On Mon, Oct 23, 2023 at 9:00 PM Alexander Shapiro (ashapiro)
 wrote:
>
> Not a problem Greg.
>
> Is there some documentation explaining the same ?
> I tried to find a lot in the past
>
> ____
> From: Greg Harris 
> Sent: Monday, October 23, 2023 11:23:26 PM
> To: users@kafka.apache.org 
> Subject: Re: Mirror Maker 2 - offset sync from source to target
>
> [You don't often get email from greg.har...@aiven.io.invalid. Learn why this 
> is important at https://aka.ms/LearnAboutSenderIdentification ]
>
> CAUTION: This email is from an external source. Please don’t open any unknown 
> links or attachments.
>
> Alexander,
>
> My apologies for calling you Andrew.
>
> Greg
>
> On Mon, Oct 23, 2023 at 1:22 PM Greg Harris  wrote:
> >
> > Andrew,
> >
> > Yes, there isn't an explicit "create consumer group" operation, it
> > should be created when MM2 emits a sync for it.
> >
> > Best,
> > Greg
> >
> > On Mon, Oct 23, 2023 at 1:15 PM Alexander Shapiro (ashapiro)
> >  wrote:
> > >
> > > Thanks, one clarification plz
> > >
> > > In bullet for You mention "4. The target group does not exist, or has no 
> > > active consumers"
> > > If group on target does not exist, will it be created without active 
> > > consumers ?
> > >
> > > -Original Message-
> > > From: Greg Harris 
> > > Sent: Monday, October 23, 2023 8:56 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: Mirror Maker 2 - offset sync from source to target
> > >
> > > [You don't often get email from greg.har...@aiven.io.invalid. Learn why 
> > > this is important at https://aka.ms/LearnAboutSenderIdentification ]
> > >
> > > CAUTION: This email is from an external source. Please don’t open any 
> > > unknown links or attachments.
> > >
> > > Hi Alexander,
> > >
> > > Sorry I noticed an inconsistency in my last email. For point 6:
> > > If the MirrorCheckpointTask restarts after replication but before offset 
> > > is translated, then it may not be able to perform a translation.
> > > If the MirrorCheckpointTask does not restart, it should be able to 
> > > perform translation.
> > >
> > > So if your MirrorMaker2 is restarting frequently, that may prevent 
> > > consistent translation.
> > >
> > > Thanks,
> > > Greg
> > >
> > > On Mon, Oct 23, 2023 at 10:46 AM Alexander Shapiro (ashapiro) 
> > >  wrote:
> > > >
> > > > Hi Greg,
> > > > Thank you very much,
> > > > it is the most detailed answer I would expect.
> > > >
> > > > -Original Message-
> > > > From: Greg Harris 
> > > > Sent: Monday, October 23, 2023 8:42 PM
> > > > To: users@kafka.apache.org
> > > > Subject: Re: Mirror Maker 2 - offset sync from source to target
> > > >
> > > > CAUTION: This email is from an external source. Please don’t open any 
> > > > unknown links or attachments.
> > > >
> > > > Hi Alexander,
> > > >
> > > > Thanks for using MirrorMaker2!
> > > >
> > > > If you turn on `sync.group.offsets.enabled`, then the
> > > > MirrorCheckpointTask will sync the offsets if all of the following is
> > > > true:
> > > > 1. The source group exists
> > > > 2. The source group name matches the configured group filter 
> > > > (group.filter.class, groups, groups.exclude) 3. The source group has an 
> > > > offset for a topic which matches the configured topic filter 
> > > > (topic.filter.class, topics, topics.exclude) 4. The target group does 
> > > > not exist, or has no active consu

Re: where to capture a failed task's exception

2023-12-26 Thread Greg Harris
Hey Akash,

Thanks for the question! For a direct answer, no: throwing exceptions
from poll() is only one of many ways that a task can fail.

If you look at the AK source, every failure ultimately uses the
AbstractStatus.State.FAILED enum [1]. You can trace the usages of this
enum back to see all of the ways that a connector or task can fail.
The failure trace is also exposed in the REST API, which is a stable
public API you can depend on.

[1]: 
https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java#L27C13-L27C13

Happy to help,
Greg

On Tue, Dec 26, 2023 at 2:41 PM Akash Dhiman  wrote:
>
> Hello,
>
> I have a requirement where I need to detect failed tasks based on the
> specific errors and emit a metric only when it doesn't fail based on these
> specific errors (these include unknown_topic_or_partition, specific cases
> of ConfigException etc)
>
> I know about a similar metric accessible via Prometheus but that gives me
> count of failed task count for any reason.
>
> I was thinking that wrapping the poll method of the task in try catch block
> would be sufficient where i detect for error details in the catch block and
> emit metric when they don't match the ones i don't want the metric for)
>
> but I am unsure if this captures all possible scenarios for which a task
> might fail.
>
> is it guaranteed that all the exceptions/error for which a task might fail
> gets emitted via the poll method?


Re: Mirror Maker bidirectional offset sync

2024-01-08 Thread Greg Harris
Hi Jeroen,

Thanks for looking into MM2 for your use-case!

I believe the "active-active" model that MM2 uses is really more of a
pair of opposing "active-passive" replication flows, and this extends
to the offset sync mechanism too.
MM2 doesn't change the state of the source topic or consumer groups to
avoid conflicting with existing producers & consumers.

The functionality you're proposing could exist, but it has to resolve
a couple of complications:
1. If there are additional producers to the downstream topic,
downstream consumer offsets include records which don't exist
upstream. This is technically already a problem with source->target
offset replication, but if you're in a failover scenario it is
possible that a new producer has started and the downstream topic has
diverged from the source topic significantly.
2. Syncing to a consumer group requires that all consumers in that
group be offline. If you were to sync the exact same group in both
directions continuously, both groups would need to have all consumers
offline.

The closest supported solution for you would probably be to do a full
round-trip replication (A -> B -> A and B -> A -> B) and write a
ForwardingAdminClient plugin which transparently renames the consumer
groups.
And the best unsupported solution I could think of would be to fork
the MirrorClient (which reads from the offset-syncs topic emitted by
the plugin) and perform back-translation once during the fail-back
with a script, when you know the original consumer group will be
offline.
Neither of those seem very good to me, so you may need to design your
application around this design constraint or handle the "fail-back"
state management with some other mechanism.

Hope this helps,
Greg Harris


On Mon, Jan 8, 2024 at 7:10 AM Jeroen Schutrup
 wrote:
>
> Hi all,
> I'm exploring using the MirrorSourceConnector and MirrorCheckpointConnector
> on Kafka Connect to setup active/active replication between two Kafka
> clusters. Using the DefaultReplicationPolicy replication policy class,
> messages originating from the source cluster get replicated as expected to
> the cluster-prefixed topic in the target cluster. Consumergroup offsets
> from the source to target cluster are replicated likewise. However, once
> the consumer group migrates from the source to the target cluster, its
> offsets are not replicated from the target back to the source cluster.
> For an active/active setup I'd want consumer group offsets for topic
> . in the target cluster to be replicated
> back to  in the source cluster. This would allow consumers to
> failover & failback between clusters with minimal duplicate message
> consumption.
>
> To clarify my setup a bit; I'm running two single-broker Kafka clusters in
> Docker (cluster A & B), along with a single Connect instance on which I've
> provisioned four source connectors:
> - A MirrorSourceConnector replicating topics from cluster A to cluster B
> - A MirrorSourceConnector replicating topics from cluster B to cluster A
> - A MirrorCheckpointConnector translating & replicating offsets from
> cluster A to cluster B
> - A MirrorCheckpointConnector translating & replicating offsets from
> cluster B to cluster A
>
> I'm not sure whether this is by design, or maybe I'm missing something.
> I've seen a similar question posted to KAFKA-9076 [1] without a resolution.
>
> Regards,
> Jeroen
>
> [1]
> https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908


Re: MM2 Question

2024-01-08 Thread Greg Harris
Hi Vinay,

I am sorry to hear about your difficulties with MirrorMaker2. Are you
using the MirrorMaker2 dedicated mode, or do you have the MirrorMaker2
connectors running in a separate Connect cluster?

If you're using the dedicated mode, that is a known problem on <3.5
and is the motivation for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters
which was released in 3.5.0. See the KIP for how to enable the
feature.
The executive summary is: if MM2 does not start a REST server
(behavior <3.5 and default for >=3.5) then changes to the set of
topics cannot be communicated between nodes, so the task configs are
stuck with an outdated set of topics.
A workaround without upgrading or reconfiguring the cluster is to
fully stop and restart MirrorMaker2, to allow the first node joining
the cluster to refresh the set of topics and reconfigure the
replication tasks.

Otherwise, you may be interested to look for WARN logs, or particular
INFO logs such as this one [1].
If you're not using the dedicated mode, you can examine at the task
configs via the REST API [2] to see what the tasks are currently
configured to do.

[1]: 
https://github.com/apache/kafka/blob/a618f2095f2ce8bf345a8faa884c98ddc822c16b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L260-L265
[2]: https://kafka.apache.org/documentation/#connect_rest

I hope this helps in your investigation and remediation!

Thanks,
Greg

On Mon, Jan 8, 2024 at 3:03 PM Vinay Bagare  wrote:
>
> Hi Team,
>
> I am fairly new to Kafka but one issue that we are currently battling is in 
> MM2. MM2 for no reason is missing new topics that are present in one of the 
> source clusters (no obvious error message in MM2 error logs).
> I heard from our internal team that this MM2 has 10k topics to take care of 
> cluster of Brokers (Legacy design as per Application needs).
>
> Does anyone know if we are hitting any undocumented limits in MM2.
> I will try to get some additional information from our SRE team.
>
> Best,
> Vinay Bagare
>


Re: Mirror Maker bidirectional offset sync

2024-01-10 Thread Greg Harris
Hi Jeroen,

I'm glad you're experimenting with MM2, and I hope we can give you
some more context to explain what you're seeing.

> I wrote a small program to produce these offset syncs for the prefixed
> topic, and this successfully triggers the Checkpoint connector to start
> replicating the consumer offsets back to the primary cluster.

This is interesting, and I wouldn't have expected it to work.

To rewind, each flow Source->Target has a MirrorSourceConnector, an
Offset Syncs Topic, and a MirrorCheckpointConnector. With both
directions enabled, there are two separate flows each with Source,
Syncs topic, and Checkpoint.
With offset-syncs.topic.location=source, the
mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
replication flow. It contains topic names from cluster A, and the
corresponding offsets those records were written to on the B cluster.
When translation is performed, the consumer groups from A are
replicated to the B cluster, and the replication mapping (cluster
prefix) is added.
Using your syncs topic as an example,
OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
downstreamOffset=28} will be used to write offsets for
"a.replicate-me-0" for the equivalent group on the B cluster.

When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
upstreamOffset=29, downstreamOffset=29} is processed, it should be
used to write offsets for "a.a.replicate-me-0" but it actually writes
offsets to "replicate-me-0" due to this function that I've never
encountered before: [1].
I think you could get those sorts of syncs into the syncs-topic if you
had A->B configured with offset-syncs.topic.location=source, and B->A
with offset-syncs-topic.location=target, and configured the topic
filter to do A -> B -> A round trip replication.

This appears to work as expected if there are no failures or restarts,
but as soon as a record is re-delivered in either flow, I think the
offsets should end up constantly advancing in an infinite loop. Maybe
you can try that: Before starting the replication, insert a few
records into `a.replicate-me` to force replicate-me-0's offset n to
replicate to a.replicate-me-0's offset n+k.

Ryanne, do you recall the purpose of the renameTopicPartition
function? To me it looks like it could only be harmful, as it renames
checkpoints to target topics that MirrorMaker2 isn't writing. It also
looks like it isn't active in a typical MM2 setup.

Thanks!
Greg

[1]: 
https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267

On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
 wrote:
>
> Thank you both for your swift responses!
>
> Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> replication in cases where the producer migrated to the secondary cluster
> as well, starts feeding messages into the non-prefixed topic which are
> subsequently consumed by the consumer on the secondary cluster. After the
> fallback, it asserts the consumer offsets on the non-prefixed topic in the
> secondary cluster are translated and replicated to the consumer offsets of
> the prefixed topic in the primary cluster.
> In my example, the producer keeps producing in the primary cluster whereas
> only the consumer fails over to the secondary cluster and, after some time
> fails back to the primary cluster. This consumer will then consume messages
> from the prefixed topic in the secondary cluster, and I'd like to have
> those offsets replicated back to the non-prefixed topic in the primary
> cluster. If you like I can provide an illustration if that helps to clarify
> this use case.
>
> To add some context on why I'd like to have this is to retain loose
> coupling between producers and consumers so we're able to test failovers
> for individual applications without the need for all producers/consumers to
> failover and failback at once.
>
> Digging through the Connect debug logs I found the offset-syncs of the
> prefixed topic not being pushed to mm2-offset-syncs.b.internal is likely
> the reason the checkpoint connector doesn't replicate consumer offsets:
> DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped (offset
> sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore)
>
> I wrote a small program to produce these offset syncs for the prefixed
> topic, and this successfully triggers the Checkpoint connector to start
> replicating the consumer offsets back to the primary cluster.
> OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> downstreamOffset=28}
> OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29,
> downstreamOffset=29}
> OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29,
> downstreamOffset=29} <-- the artificially generated offset-sync
>
> At this point it goes a bit beyond my understanding of the MM2 internals of
> whether this is a wise thing to do and if it would have any negative side

Re: Mirror Maker bidirectional offset sync

2024-01-11 Thread Greg Harris
Hey Ryanne,

Thanks for the context, but I still don't see the situation where this
function is helpful.

Also "A's topic1 and B's a.topic1 should be the same data (minus
replication lag)." isn't true in the presence of failures/hard
restarts, compaction, and transaction markers.

Thanks,
Greg

On Wed, Jan 10, 2024 at 8:00 PM Ryanne Dolan  wrote:
>
> > do you recall the purpose of [...] renameTopicPartition [?]
>
> A's topic1 and B's a.topic1 should be the same data (minus replication
> lag). You can't consume a record in a.topic1 that hasn't been replicated
> yet -- a remote topic by definition does not have any records that MM2
> didn't put there. So an offset for a consumer consuming from B's a.topic1
> can be translated back to an offset in A's topic1, where the data came from.
>
> Ryanne
>
> On Wed, Jan 10, 2024, 6:07 PM Greg Harris 
> wrote:
>
> > Hi Jeroen,
> >
> > I'm glad you're experimenting with MM2, and I hope we can give you
> > some more context to explain what you're seeing.
> >
> > > I wrote a small program to produce these offset syncs for the prefixed
> > > topic, and this successfully triggers the Checkpoint connector to start
> > > replicating the consumer offsets back to the primary cluster.
> >
> > This is interesting, and I wouldn't have expected it to work.
> >
> > To rewind, each flow Source->Target has a MirrorSourceConnector, an
> > Offset Syncs Topic, and a MirrorCheckpointConnector. With both
> > directions enabled, there are two separate flows each with Source,
> > Syncs topic, and Checkpoint.
> > With offset-syncs.topic.location=source, the
> > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
> > replication flow. It contains topic names from cluster A, and the
> > corresponding offsets those records were written to on the B cluster.
> > When translation is performed, the consumer groups from A are
> > replicated to the B cluster, and the replication mapping (cluster
> > prefix) is added.
> > Using your syncs topic as an example,
> > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > downstreamOffset=28} will be used to write offsets for
> > "a.replicate-me-0" for the equivalent group on the B cluster.
> >
> > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
> > upstreamOffset=29, downstreamOffset=29} is processed, it should be
> > used to write offsets for "a.a.replicate-me-0" but it actually writes
> > offsets to "replicate-me-0" due to this function that I've never
> > encountered before: [1].
> > I think you could get those sorts of syncs into the syncs-topic if you
> > had A->B configured with offset-syncs.topic.location=source, and B->A
> > with offset-syncs-topic.location=target, and configured the topic
> > filter to do A -> B -> A round trip replication.
> >
> > This appears to work as expected if there are no failures or restarts,
> > but as soon as a record is re-delivered in either flow, I think the
> > offsets should end up constantly advancing in an infinite loop. Maybe
> > you can try that: Before starting the replication, insert a few
> > records into `a.replicate-me` to force replicate-me-0's offset n to
> > replicate to a.replicate-me-0's offset n+k.
> >
> > Ryanne, do you recall the purpose of the renameTopicPartition
> > function? To me it looks like it could only be harmful, as it renames
> > checkpoints to target topics that MirrorMaker2 isn't writing. It also
> > looks like it isn't active in a typical MM2 setup.
> >
> > Thanks!
> > Greg
> >
> > [1]:
> > https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267
> >
> > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
> >  wrote:
> > >
> > > Thank you both for your swift responses!
> > >
> > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> > > replication in cases where the producer migrated to the secondary cluster
> > > as well, starts feeding messages into the non-prefixed topic which are
> > > subsequently consumed by the consumer on the secondary cluster. After the
> > > fallback, it asserts the consumer offsets on the non-prefixed topic in
> > the
> > > secondary cluster are translated and replicated to the consumer offsets
> > of
> > > the prefixed topic in the prim

Re: Mirror Maker bidirectional offset sync

2024-01-11 Thread Greg Harris
Hey Jeroen,

Thanks for sharing your prototype! It is very interesting!

> I couldn't reproduce your hypothesis.

I think my hypothesis was for another setup which didn't involve code
changes, and instead relied on A->B->A round trip replication to
produce the "backwards" offset syncs.
I believe this would replicate data from "replicate-me-0" to
"b.a.replicate-me-0", and then possibly take the offsets intended for
"b.a.replicate-me-0" and apply them to "replicate-me-0" creating the
infinite cycle.
I would not expect your implementation to suffer from this failure
mode, because it's using the offset in "replicate-me-0" as the
downstream offset, not the offset of "b.a.replicate-me-0".

With your prototype, do you experience "collisions" in the
offset-syncs topic? Since you're sharing a single offset-syncs topic
between both replication flows, I would expect offsets for topics with
the same names on both clusters to conflict, and cause the translation
to happen using the opposite topic's offsets.
It would also be visible in the state of the OffsetSyncStore here:
[1], you can compare the normal A->B behavior before and after
starting the B -> A source connector to see if the concurrent flows
causes more syncs to be cleared, or the wrong syncs to be present.

I think it is normal for every MM2 connector to have the same
offset-syncs.topic.location to avoid these sorts of conflicts, so that
each syncs topic is only used by one of the MM2 replication flows.
I think that turning on bidirectional offset syncs will probably
require a second producer in the MirrorSourceTask to contact the
opposite cluster, or a second admin client in the
MirrorCheckpointTask.

> Do you think it'd be worthwhile proceeding with this?

This is certainly a capability that MM2 is missing right now, and
seems like it would be a natural component of failing consumers back
and forth. If you see value in it, and are interested in driving the
feature, you can open a KIP [2] to discuss the interface and design
with the rest of the community.

Thanks!
Greg

[1] 
https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup
 wrote:
>
> I see, makes complete sense to me. I built a custom version [1] based off
> of Kafka 3.5.1 with bidirectional offset replication enabled so I could do
> some more testing. Offset translation back upstream works well; I think
> because of the reason Ryanne pointed out, both topics contain identical
> data. Tested this by truncating the upstream topic before starting
> replication (so the downstream/upstream topics have different offsets).
> Truncating the upstream topic while replication is running neither results
> in any weirdness.
>
> > Before starting the replication, insert a few records into
> `a.replicate-me` to force replicate-me-0's offset n to replicate to
> a.replicate-me-0's offset n+k.
> I couldn't reproduce your hypothesis. After doing the above and then
> starting replication I didn't see any offset replication loops. Once I
> started producing data into the upstream topic and subscribing a
> console-consumer on the downstream topic, offsets were translated and
> replicated correctly back upstream. My guess is the CheckpointConnector can
> offset these surplus of messages as the actual log offsets of the
> downstream topic are written to the offset-sync topic.
>
> As this kind of active/active replication would be very beneficial to us
> for reasons stated in my previous message, we'd love to help out building
> this kind of offset replication into the Mirror connectors. I understand
> this is not something that should be enabled by default, but having it
> behind configuration toggle could help out users desiring a similar kind of
> active/active setup and who understand the restrictions. Do you think it'd
> be worthwhile proceeding with this?
>
> [1]
> https://github.com/jeroen92/kafka/commit/1a27696ec6777c230f100cf9887368c431ebe0f8
>
> On Thu, Jan 11, 2024 at 1:06 AM Greg Harris 
> wrote:
>
> > Hi Jeroen,
> >
> > I'm glad you're experimenting with MM2, and I hope we can give you
> > some more context to explain what you're seeing.
> >
> > > I wrote a small program to produce these offset syncs for the prefixed
> > > topic, and this successfully triggers the Checkpoint connector to start
> > > replicating the consumer offsets back to the primary cluster.
> >
> > This is interesting, and I wouldn't have expected it t

Re: Mirror Maker bidirectional offset sync

2024-01-12 Thread Greg Harris
Ryanne,

> > b.a.replicate-me-0
> That's actually impossible with MM2.

Thanks, I see the isCycle check in MirrorSourceConnector. That makes
me even more curious how the renameTopicPartition method triggers
without a change such as the one that Jeroen has prototyped, since the
only thing that emits offset syncs is the MirrorSourceTask, and it is
disallowed from sending topics back in a cycle.

Greg

On Fri, Jan 12, 2024 at 6:13 AM Jeroen Schutrup
 wrote:
>
> Hey Greg,
> There are no offset collisions as the offset-syncs albeit being stored on
> the same cluster, offsets from A->B are stored
> in mm2-offset-syncs.b.internal whereas offsets from B->A are stored
> in mm2-offset-syncs.a.internal.
> What's curious though is the B->A checkpoint connector (which has
> offset-syncs.topic.location: target) actually uses the offsets stored in
> mm2-offset-syncs.b.internal (which contains the downstream offsets) while I
> expected it to only read offsets stored in mm2-offset-syncs.a.internal, as
> cluster A is its target.
>
> I'm positive on driving a KIP for this feature to see whether we can get it
> implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd
> need a bit of time to get a better grasp on the mirror connector codebase.
>
> Thank you both for your valuable insights so far!
>
> Jeroen
>
> On Thu, Jan 11, 2024 at 8:06 PM Greg Harris 
> wrote:
>
> > Hey Jeroen,
> >
> > Thanks for sharing your prototype! It is very interesting!
> >
> > > I couldn't reproduce your hypothesis.
> >
> > I think my hypothesis was for another setup which didn't involve code
> > changes, and instead relied on A->B->A round trip replication to
> > produce the "backwards" offset syncs.
> > I believe this would replicate data from "replicate-me-0" to
> > "b.a.replicate-me-0", and then possibly take the offsets intended for
> > "b.a.replicate-me-0" and apply them to "replicate-me-0" creating the
> > infinite cycle.
> > I would not expect your implementation to suffer from this failure
> > mode, because it's using the offset in "replicate-me-0" as the
> > downstream offset, not the offset of "b.a.replicate-me-0".
> >
> > With your prototype, do you experience "collisions" in the
> > offset-syncs topic? Since you're sharing a single offset-syncs topic
> > between both replication flows, I would expect offsets for topics with
> > the same names on both clusters to conflict, and cause the translation
> > to happen using the opposite topic's offsets.
> > It would also be visible in the state of the OffsetSyncStore here:
> > [1], you can compare the normal A->B behavior before and after
> > starting the B -> A source connector to see if the concurrent flows
> > causes more syncs to be cleared, or the wrong syncs to be present.
> >
> > I think it is normal for every MM2 connector to have the same
> > offset-syncs.topic.location to avoid these sorts of conflicts, so that
> > each syncs topic is only used by one of the MM2 replication flows.
> > I think that turning on bidirectional offset syncs will probably
> > require a second producer in the MirrorSourceTask to contact the
> > opposite cluster, or a second admin client in the
> > MirrorCheckpointTask.
> >
> > > Do you think it'd be worthwhile proceeding with this?
> >
> > This is certainly a capability that MM2 is missing right now, and
> > seems like it would be a natural component of failing consumers back
> > and forth. If you see value in it, and are interested in driving the
> > feature, you can open a KIP [2] to discuss the interface and design
> > with the rest of the community.
> >
> > Thanks!
> > Greg
> >
> > [1]
> > https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194
> > [2]
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup
> >  wrote:
> > >
> > > I see, makes complete sense to me. I built a custom version [1] based off
> > > of Kafka 3.5.1 with bidirectional offset replication enabled so I could
> > do
> > > some more testing. Offset translation back upstream works well; I think
> > > because of the reason Ryanne pointed out, both topics contain identical
> > > data. Tested this by truncating the upstream topic before starting
> > > replica

Re: Mirror Maker bidirectional offset sync

2024-01-12 Thread Greg Harris
Hey Jeroen,

Thanks for looking into the collision theory!

> There are no offset collisions as the offset-syncs albeit being stored on
> the same cluster, offsets from A->B are stored
> in mm2-offset-syncs.b.internal whereas offsets from B->A are stored
> in mm2-offset-syncs.a.internal.

The offset syncs topic should be named opposite of its location [1]
For source-a-to-b with location=source, it should be present on
cluster A and named mm2-offset-syncs.b.internal
For source-b-to-a with location=target, it should be present on
cluster A and named mm2-offset-syncs.b.internal
For source-b-to-a with location=source, it would be present on cluster
B and named mm2-offset-syncs.a.internal

> What's curious though is the B->A checkpoint connector (which has
> offset-syncs.topic.location: target) actually uses the offsets stored in
> mm2-offset-syncs.b.internal (which contains the downstream offsets) while I
> expected it to only read offsets stored in mm2-offset-syncs.a.internal, as
> cluster A is its target.

The same logic as the source connector is used in the checkpoint
connector [2], so it should show similar behavior.
I did try this "one flow with location=source, the opposite flow with
location=target" in an integration test, and I only see one syncs
topic being created, when normally there would be two.
If you have a bunch of "well behaved" replication flows (without gaps,
where upstreamOffset=downstreamOffset) then it's possible that you're
getting collisions but they aren't noticeable because the translation
for the two topics coincides.

Anyway, as Ryanne indicated earlier, the offset-syncs.topic.location
is really meant to be the same for every connector in an active-active
replication flow, and so this bi-directional sync should be made to
work with that typical configuration.

> I'm positive on driving a KIP for this feature to see whether we can get it
> implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd
> need a bit of time to get a better grasp on the mirror connector codebase.

Sounds good! There's no rush of course, you can contribute as much as
you are willing and able. And more people understanding MM2 is always
a good thing :)

Thanks,
Greg

[1] 
https://github.com/apache/kafka/blob/21227bda61e75e3a8f1401ff94b27e9161cd3f1b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L137-L142
[2] 
https://github.com/apache/kafka/blob/21227bda61e75e3a8f1401ff94b27e9161cd3f1b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java#L137-L142

On Fri, Jan 12, 2024 at 1:53 PM Greg Harris  wrote:
>
> Ryanne,
>
> > > b.a.replicate-me-0
> > That's actually impossible with MM2.
>
> Thanks, I see the isCycle check in MirrorSourceConnector. That makes
> me even more curious how the renameTopicPartition method triggers
> without a change such as the one that Jeroen has prototyped, since the
> only thing that emits offset syncs is the MirrorSourceTask, and it is
> disallowed from sending topics back in a cycle.
>
> Greg
>
> On Fri, Jan 12, 2024 at 6:13 AM Jeroen Schutrup
>  wrote:
> >
> > Hey Greg,
> > There are no offset collisions as the offset-syncs albeit being stored on
> > the same cluster, offsets from A->B are stored
> > in mm2-offset-syncs.b.internal whereas offsets from B->A are stored
> > in mm2-offset-syncs.a.internal.
> > What's curious though is the B->A checkpoint connector (which has
> > offset-syncs.topic.location: target) actually uses the offsets stored in
> > mm2-offset-syncs.b.internal (which contains the downstream offsets) while I
> > expected it to only read offsets stored in mm2-offset-syncs.a.internal, as
> > cluster A is its target.
> >
> > I'm positive on driving a KIP for this feature to see whether we can get it
> > implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd
> > need a bit of time to get a better grasp on the mirror connector codebase.
> >
> > Thank you both for your valuable insights so far!
> >
> > Jeroen
> >
> > On Thu, Jan 11, 2024 at 8:06 PM Greg Harris 
> > wrote:
> >
> > > Hey Jeroen,
> > >
> > > Thanks for sharing your prototype! It is very interesting!
> > >
> > > > I couldn't reproduce your hypothesis.
> > >
> > > I think my hypothesis was for another setup which didn't involve code
> > > changes, and instead relied on A->B->A round trip replication to
> > > produce the "backwards" offset syncs.
> > > I believe this would replicate data from "replicate-me-0" to
> > > "b.a.replicate

Re: KRAFT: Correct way to add a controller only node

2024-02-08 Thread Greg Harris
Hi Denny,

I believe the problem you're describing is the motivation for KIP-853
[1] which is currently under discussion for 3.8.
If you're performing a rolling upgrade, I believe the current
recommendation is to use DNS to "re-assign" the controller from one
machine to another.
If you're running every broker as a controller, you can also switch to
running a fixed set of controllers for the first 3 or first 5 nodes,
and then begin adding broker-only nodes after that.
If you're trying to permanently increase the number of controllers, I
am not aware of a workaround that does not include downtime.

Hope this helps.
Greg

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes

On Thu, Feb 8, 2024 at 4:55 AM Denny Fuchs
 wrote:
>
> Hello,
>
> we have 5 broker/controller Kafka (v3.5.1) nodes running and I tried to
> add a controller only node. I've got it in the end working .. but I
> think, I've did it wrong. I had to stop start several times all nodes to
> remove the "quorum-state" file. I had to do a lot of stop / starts and
> crashing, because the Nodes were shutdown .. because of timeout from
> voting / electing.
>
> So, what is the right way .. to add a broker or a controller ?
>
> cu denny
>


Re: Is it possible to include only the connector name in worker logs?

2024-02-14 Thread Greg Harris
Hey Yeikel,

Thanks for your question!

It appears that we only expose the already-concatenated data to the
logs: 
https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java#L186-L204
instead of letting the logger do the concatenation.
It looks like the discussion thread included mention of parsing, and
the current format is intended to be "easy" to parse. I don't see any
mention of providing connector id as a separate field that can be
configured in the log config, so I don't think they considered such a
solution.

I think you'll need to stick to regex parsing the connector.context
field in the logs.

Hope this helps!
Greg

On Sat, Feb 10, 2024 at 9:58 PM Yeikel Santana  wrote:
>
> Hi team,
>
>
>
> In KIP-449[1], we introduced a "connector context" that allows us to get 
> worker context in the logs such as "[my-connector|task-0|offsets]"
>
>
>
> Are you aware of any built-in mechanism to extract only the connector name 
> from these logs?
>
>
>
> In an ideal world, I would like to build a log that contains just the 
> connector name excluding any task or offset information. ie: 
> connector=my-connector-name. The final goal is to get stats per connector 
> name where the task or offset information is not that important. The 
> additional metadata gets in the way as it pollutes the log entry
>
>
>
> I imagine that I can write a regular expression to do this, but I do wish to 
> reinvent the wheel if there are better options.
>
>
>
> Thank you!
>
>
>
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs


Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Greg Harris
Hi Yeikel,

Thanks for your question. It certainly isn't clear from the original
KIP-298, the attached discussion, or the follow-up KIP-610 as to why
the situation is asymmetric.

The reason as I understand it is: Source connectors are responsible
for importing data to Kafka. If an error occurs during this process,
then writing useful information to a dead letter queue about the
failure is at least as difficult as importing the record correctly.

For some examples:
* If an error occurs during poll(), the external data has not yet been
transformed into a SourceRecord that the framework can transform or
serialize.
* If an error occurs during conversion/serialization, the external
data cannot be reasonably serialized to be forwarded to the DLQ.
* If a record cannot be written to Kafka, such as due to being too
large, the same failure is likely to happen with writing to the DLQ as
well.

For the Sink side, we already know that the data was properly
serializable and appeared as a ConsumerRecord. That can
be forwarded to the DLQ as-is with a reasonable expectation for
success, with the same data formatting as the source topic.

If you have a vision for how this can be improved and are interested,
please consider opening a KIP! The situation can certainly be made
better than it is today.

Thanks!
Greg

On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana  wrote:
>
> Hi all,
>
> Sink connectors support Dear Letter Queues[1], but Source connectors don't 
> seem to
>
> What is the reason that we decided to do that?
>
> In my data pipeline, I'd like to apply some transformations to the messages 
> before they are sink, but that leaves me vulnerable to failures as I need to 
> either fail the connector or employ logging to track source failures
>
> It seems that for now, I'll need to apply the transformations as a sink and 
> possibly reinsert them back to Kafka for downstream consumption, but that 
> sounds unnecessary
>
>
> [1]https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065


Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Greg Harris
Hey Chris,

That's a cool idea! That can certainly be applied for failures other
than poll(), and could be useful when combined with the Offsets
modification API.

Perhaps failures inside of poll() can be handled by an extra
mechanism, similar to the ErrantRecordReporter, which allows reporting
affected source partition/source offsets when a meaningful key or
value cannot be read.

Thanks,
Greg

On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton  wrote:
>
> Hi Greg,
>
> This was my understanding as well--if we can't turn a record into a byte
> array on the source side, it's difficult to know exactly what to write to a
> DLQ topic.
>
> One idea I've toyed with recently is that we could write the source
> partition and offset for the failed record (assuming, hopefully safely,
> that these can at least be serialized). This may not cover all bases, is
> highly dependent on how user-friendly the offsets published by the
> connector are, and does come with the risk of data loss (if the upstream
> system is wiped before skipped records can be recovered), but could be
> useful in some scenarios.
>
> Thoughts?
>
> Chris
>
> On Tue, Mar 5, 2024 at 5:49 PM Greg Harris 
> wrote:
>
> > Hi Yeikel,
> >
> > Thanks for your question. It certainly isn't clear from the original
> > KIP-298, the attached discussion, or the follow-up KIP-610 as to why
> > the situation is asymmetric.
> >
> > The reason as I understand it is: Source connectors are responsible
> > for importing data to Kafka. If an error occurs during this process,
> > then writing useful information to a dead letter queue about the
> > failure is at least as difficult as importing the record correctly.
> >
> > For some examples:
> > * If an error occurs during poll(), the external data has not yet been
> > transformed into a SourceRecord that the framework can transform or
> > serialize.
> > * If an error occurs during conversion/serialization, the external
> > data cannot be reasonably serialized to be forwarded to the DLQ.
> > * If a record cannot be written to Kafka, such as due to being too
> > large, the same failure is likely to happen with writing to the DLQ as
> > well.
> >
> > For the Sink side, we already know that the data was properly
> > serializable and appeared as a ConsumerRecord. That can
> > be forwarded to the DLQ as-is with a reasonable expectation for
> > success, with the same data formatting as the source topic.
> >
> > If you have a vision for how this can be improved and are interested,
> > please consider opening a KIP! The situation can certainly be made
> > better than it is today.
> >
> > Thanks!
> > Greg
> >
> > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana  wrote:
> > >
> > > Hi all,
> > >
> > > Sink connectors support Dear Letter Queues[1], but Source connectors
> > don't seem to
> > >
> > > What is the reason that we decided to do that?
> > >
> > > In my data pipeline, I'd like to apply some transformations to the
> > messages before they are sink, but that leaves me vulnerable to failures as
> > I need to either fail the connector or employ logging to track source
> > failures
> > >
> > > It seems that for now, I'll need to apply the transformations as a sink
> > and possibly reinsert them back to Kafka for downstream consumption, but
> > that sounds unnecessary
> > >
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
> >


Re: Tiered storage

2024-03-08 Thread Greg Harris
Hi Lukas,

Thanks for your interest in Tiered Storage!

Disclaimer: I work/have worked for both of the vendors mentioned.

There are two different Tiered Storage implementations:
* Confluent's closed-source implementation [1]
* Apache Kafka's open-source implementation [2]

You can read the documentation for each implementation to find out
more, but at a high level the two implementations:
1. Solve the same problem of offloading storage from brokers' disks
2. Expose different features, such as supporting compacted topics
3. Are distributed under different licenses

The Apache Kafka implementation has a pluggable architecture, such
that storage backend plugins are required, but not provided by the
Kafka project.
Aiven is developing and distributing these plugins [3] which, when
combined with the Apache Kafka open-source implementation, provide the
Tiered Storage functionality.
You can also avoid depending on the mentioned vendors by developing or
finding alternative plugins for the Apache Kafka implementation.

You will need to make the comparison yourself as to which
implementation suits your use-case.

To directly answer your questions:
1. The Confluent implementation is mature and ready for production
[1]. The Apache Kafka implementation is still considered early-access
[4].
2. Using one of the vendors is not required, but may save you the
development and maintenance costs associated with implementing a
solution yourself.
3. The Confluent implementation uses an in-memory cache [5]. The
Apache Kafka implementation serves directly from the plugin without
caching [6], and the Aiven plugin has support for in-memory caching
and disk caching [7].

Hope this clears things up!
Greg Harris

[1] https://docs.confluent.io/platform/current/clusters/tiered-storage.html
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
[3] https://github.com/Aiven-Open/tiered-storage-for-apache-kafka
[4] 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes
[5] https://developer.confluent.io/courses/architecture/tiered-storage/
[6] 
https://github.com/apache/kafka/blob/b9a5b4a8053c1fa65e27a9f93440194b0dd5eec4/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1367
[7] 
https://github.com/Aiven-Open/tiered-storage-for-apache-kafka?tab=readme-ov-file#local-cache



On Fri, Mar 8, 2024 at 10:21 AM Lukas Zimmerman  wrote:
>
> Hey!
>
> I came across the Tiered Storage feature in Kafka and found this feature
> quite exciting! It seems like it could help with dealing with the retention
> of large amounts of data in Kafka topics.
>
> I have several questions:
>
> 1/ Is it still in early access? When will it be considered ready for
> production?
> 2/ It appears this might require a module from a vendor, such as Aiven or
> Confluent. Am I mistaken?
> 3/ Assuming I've offloaded data to S3, when I need to read the offloaded
> data back using a Kafka consumer, does the broker read the data directly
> from S3, or is the data first moved back to the broker's disks before being
> read?
>
> Thank you very much for your help :-)


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Greg Harris
Hi William,

>From your description, it sounds like you want the success/failure of
a callback to influence whether that record (and later records) are
present in the topic. Is this correct?
The solution that you posted does actually write a record that has an
erroneous callback, is that desirable, or would you want that record
to also be rejected?

This sounds like a use-case for transactional producers [1] utilizing
Exactly Once delivery. You can start a transaction, emit records, have
them persisted in Kafka, perform some computation afterwards, and then
decide whether to commit or abort the transaction based on the result
of that computation.

There is also a performance penalty to transactional producers, but it
is different from the max.in.flight.requests.per.connection bottleneck
and not directly comparable.
I think you should carefully consider throwing delivery-critical
errors from the callback, as that is not a common workflow. Could
those errors be moved to a different part of the pipeline, such as the
consumer application?

And since you're performance sensitive, please be aware that
performance (availability) nearly always comes at the cost of delivery
guarantees (consistency) [2]. If you're not willing to pay the
performance cost of max.in.flight.requests.per.connection=1, then you
may need to make a compromise on the consistency and find a way to
manage the extra data.

Thanks,
Greg Harris

[1] 
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
[2] https://en.wikipedia.org/wiki/CAP_theorem

On Mon, Mar 11, 2024 at 7:32 AM William Lee  wrote:
>
> Hi Haruki,
> Thanks for your answer.
> > I still don't get why you need this behavior though
> The reason is I have to ensure message ordering per partition strictly.
> Once there is an exception in the producer callback, it indicates that the
> exception is not a retryable exception(from kafka producer's perspective).
> There must be something wrong, so I have to stop sending records and
> resolve the underlying issue.
>
> As for the performance problem, I found a kafka wiki which investigated the
> impact of max.in.flight.requests.per.connection: An analysis of the impact
> of max.in.flight.requests.per.connection and acks on Producer performance -
> Apache Kafka - Apache Software Foundation
> <https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance>
> From the wiki, max.in.flight.requests.per.connection is better set to 2 or
> more.
>
> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
> this could become a performance bottleneck


Re: [ Questions on log4j file & version ]

2024-05-16 Thread Greg Harris
Hi Ashok,

Kafka 2.7.1 was built from the 2.7.1 tag [1] and looking at the
dependencies in that version [2], it should have shipped with 1.2.17.
You can verify this by looking for the log4j jar in your installation.
Because of the security vulnerabilities you mention, Kafka switched to
reload4j in [3] around 3.2.0, and last upgraded reload4j in 3.6.0 [4].

You should consider upgrading to a more recent version of Kafka
(recommended, as 2.7 is well out-of-support) or consider swapping out
the log4j jar with a recent version of reload4j (not recommended).

[1] https://github.com/apache/kafka/tree/2.7.1
[2] 
https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/gradle/dependencies.gradle#L76
[3] https://issues.apache.org/jira/browse/KAFKA-13660
[4] https://github.com/apache/kafka/pull/13673

Thanks,
Greg

On Thu, May 16, 2024 at 5:50 AM Ashok Kumar Ragupathi
 wrote:
>
> Hello Kafka Team,
>
> Request your help...
>
> We are using Apache Kafka kafka_2.13-2.7.1 & installed on a server.
>
> I understand it uses log4j java for logger purposes.
>
> But we don't know, what is the log4j version it is using?
>
> Recently we came to know that log4j_1.2.17 has some security issues, how to
> upgrade the log4j_v2 version? how to find what version internally it uses
> or refers ?
>
> Thanks & Regards
> Ashok Kumar
> Denovo Systems


Re: Fwd: Request to be added to kafka contributors list

2024-05-21 Thread Greg Harris
Hi Franck,

Thank you for contributing to Apache Kafka!

1. Git is generally permissive of this, as long as there are no merge
conflicts. If you have merge conflicts with `trunk`, you will need to
resolve them before a committer can merge your changes, so rebasing on
trunk before opening the PR is a good idea :)

2.
Are you on an M1 mac, with a recent (>11) JDK? I've been experiencing
some consistent failures recently [1] and haven't figured it out yet.
You may also be getting a flaky failure: a test which is
nondeterministic and sometimes fails. We are constantly trying to burn
down the list of flaky tests [2], but there are still some around.
As far as how this impacts the PR: You should find and resolve all of
the deterministic failures that you introduce in the PR, and do your
best to check whether you introduced any flakiness. You can look for
tickets mentioning those failures, or ask a committer for more
information.

Hope this helps,
Greg Harris

[1] https://issues.apache.org/jira/browse/KAFKA-16701
[2] 
https://issues.apache.org/jira/browse/KAFKA-16701?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20flaky-test

On Tue, May 21, 2024 at 6:43 AM Franck  wrote:
>
> Hello,
>
> It works like a charm.
>
> Few questions:
>
>  1. Now, I'm asking my self, I did the job describe in JIRA 16707 in a
> fork/branch of the 3.7.0 of kafka, but reading the "Contributing
> Code Change", I feeI should have done it on a branch from trunk of
> my fork? (if so, I'll just do on my fork a new branch, rebase, and
> re-run test for sure, I just want to from which point I should start
> to PR correctly)
>  2. when doing a "gradelew clean test" from a clean fork of the 3.7.0
> branch, I have a failure, so I'm asking my self how it will be
> managed when I'll do the PR, do you know?
>
> Best regards
>
> On 21/05/2024 03:35, Matthias J. Sax wrote:
> > Done. You should be all set :)
> >
> >
> > -Matthias
> >
> > On 5/20/24 10:10 AM, bou...@ulukai.net wrote:
> >>
> >> Dear Apache Kafka Team,
> >>
> >>  I hope to post in the right place: my name is Franck LEDAY,
> >> under Apache-Jira ID "handfreezer".
> >>
> >>  I opened an issue as Improvement KAFKA-16707 but I failed to
> >> assigned it to me.
> >>
> >>  May I ask to be added to the contributors list for Apache Kafka?
> >> As I already did the job of improvement, and would like to be
> >> assigned on to end my contribution.
> >>
> >> Thank you for considering my request.
> >> Best regards, Franck.


Re: MirrorMaker2 Offset Replication Issue: MirrorCheckpointConnector Doesn't Replicate Final Offsets In Migrations

2024-05-28 Thread Greg Harris
Hi Mehrtens,

I think you are experiencing this problem:
https://issues.apache.org/jira/browse/KAFKA-15905 which just received a
patch which is due to release in 3.8.0/3.7.1. Read there for more context,
and you can consider building and testing with that patch to see if it
resolves your issue.

Hope this helps,
Greg Harris

On Tue, May 28, 2024 at 12:35 PM Mehrtens, M 
wrote:

> Hi there,
> I’m running into an issue with MirrorMaker2 (MM2), specifically the
> MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final
> batch of offsets to the target Kafka cluster during a migration. I believe
> I can point to the sections of the MM2 code where this is happening, but I
> can’t figure out why it works that way. Can someone help me understand if
> this is a bug or a feature?
>
> Issue Summary:
>
> During a migration scenario with MirrorMaker2, it is desirable to shut
> down a consumer on the source cluster and start it again in the target
> cluster. However, it seems that during this process the final consumer
> group offsets for the source cluster do not get replicated to the target
> cluster.
>
>   *   Consumer is running against source cluster, committing offsets over
> time
>   *   Right before the cutover, consumer commits offset 10
>   *
>  emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> later, CPC replicates offset 10
>   *   Consumer commits offset 20, then gets shut down for migration
>   *   CPC never replicates offset 20, causing a gap in committed offsets
> MM2 Version:
> Kafka 3.7.0
> CPC Config:
>
> {
>
> "name": "mm2-cpc",
>
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>
> "clusters": "msksource,mskdest",
>
> "source.cluster.alias": "msksource",
>
> "target.cluster.alias": "mskdest",
>
> "target.cluster.bootstrap.servers": "b-1...",
>
> "source.cluster.bootstrap.servers": "b-1...",
>
> "tasks.max": "1",
>
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
>
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>
> "replication.policy.class":
> "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
>
> "replication.factor": "3",
>
> "checkpoints.topic.replication.factor": "3",
>
> "refresh.groups.interval.seconds": "60",
>
> "emit.checkpoints.interval.seconds": "20",
>
> "sync.group.offsets.interval.seconds": "20",
>
> "sync.group.offsets.enabled": "true",
>
> }
>
> Possible Explanation
>
> From my read of the code, this appears to be an issue in the
> OffsetSyncStore.
>
> Example during migration when lag is small
> Time 1:
>
> Target cluster consumer group is at offset 381561, source is at offset
> 381561:
>
> Target:
>
> GROUPTOPIC   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 381561
> 381561  0   -   -   -
>
> Source:
>
> GROUPTOPIC   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 381561
> 381561  0
>  clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143
>  clickstream-consumer
>
> 60 seconds later:
>
> Target cluster consumer group has advanced to offset 614064, source is at
> offset 651218. The lag is higher in the target than the source because of
> the emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> being set to 20 seconds. If these were lower (e.g. 1 second) we'd still see
> some lag (albeit lower).
>
> Target:
>
> GROUPTOPIC   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 614064
> 646293  32229   -   -   -
>
>
>
> Source:
>
> GROUPTOPIC   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 651218
> 

Re: Kafka rebalance

2024-06-12 Thread Greg Harris
Hi Sebastian,

Thanks for your question! A consumer joining a group should always notify
the group coordinator and the leader consumer, and the
partition.assignment.strategy [1] will determine the rebalance protocol,
and which partitions are affected by the rebalance.

For example, I expect that the Range, RoundRobin, and Sticky assignors
would be quite disruptive, with additional consumers triggering a full
revocation of all partitions.
The CooperativeStickyAssignor should perform a much less invasive
cooperative rebalance that avoids revoking unnecessary partitions [2]. It
also appears that consumers undergoing a cooperative rebalance are
permitted to continue processing data [3].

So under good conditions with modern versions and configurations, extra
consumers joining the group should trigger a rebalance, but the rebalance
should have minimal impact on processing.

Thanks,
Greg

[1]
https://kafka.apache.org/documentation.html#consumerconfigs_partition.assignment.strategy
[2] https://kafka.apache.org/documentation.html#upgrade_240_notable
[3] https://kafka.apache.org/documentation.html#upgrade_250_notable

On Wed, Jun 12, 2024 at 4:53 AM Sejal Patel 
wrote:

> While I haven't reached that point because with repartitioning and
> aggregation and various other things happening with in the stream having
> additional topics a simple 5 input topics with 5 partitions can end up with
> hundreds of total topic partitions overall.
>
> But I'm 99% confident that it will rebalance because part of that is
> understanding who is available even, who has which data, latency of work
> being done etc. While something might be perfectly balanced 1 second, it
> might not be the next in theory.
>
> But if it makes you feel better, it should only rebalance for a split
> second if it issue seeing a reason to shift work around. But I'm more
> confident that you'll not be spinning up more consumers than you have total
> partitions to start with cause there are a lot of partitions happening with
> the Kafka stores each having some as well.
>
> Sejal [Patel] | CTO
> se...@playerzero.ai  | 470.440.1255
>
>
> On Wed Jun 12, 2024, 10:25 AM GMT, Sébastien Rebecchi  srebec...@kameleoon.com> wrote:
> > Hello,
> >
> > If I have a consumer group with more members than the number of partition
> > of a topic, adding a consumer to the group will still trigger a
> rebalancing
> > of partitions to the group?
> >
> > Imagine the partitions are already perfectly balanced, ie each consumer
> has
> > 1 partition. Then reblancing won't be of any use in theory. So does Kafka
> > still triggers a rebalancing?
> >
> > Thanks ,
> >
> > Sébastien


Re: KafkaConsumer taking time to determine offset

2024-06-12 Thread Greg Harris
Hi Akash,

Thanks for your question. When you say:

> kafkaconsumer misses some of the records written into the kafka source
topic.

Do you mean that there are records in the source topic that are never
returned by Consumer#poll(), SourceTask#poll(), or written to the target
topic by KafkaConnect?

That is the behavior I would expect if you are starting up the consumer for
the first time, or after not committing offsets, and
auto.offset.reset=latest triggers [1]. You could turn on logging for the
SubscriptionState, OffsetFetcher, or ConsumerCoordinator to watch what the
Consumer is doing internally.
If you want it to start at the beginning of the topic and read every
record, you need to use auto.offset.reset=earliest. If you want the
consumer to remember its progress and not re-process data, you will need to
have the consumer seek() to particular offsets, or use commitSync() or
commitAsync() [2].

Also what you're doing sounds similar to the existing MirrorMaker2 tool
[3], you can consider using it, or learning from how it is implemented.

Thanks,
Greg

[1]
https://kafka.apache.org/documentation.html#consumerconfigs_auto.offset.reset
[2]
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
[3] https://kafka.apache.org/documentation.html#georeplication

On Wed, Jun 12, 2024 at 2:08 PM Akash Dhiman 
wrote:

> Hello,
> we have a usecase where we use kafkaConsumer in a SourceTask of a source
> connector to poll messages from an aws msk.
>
> if we try to produce data into the source topic immediately after the
> connector gets into the running state we sometimes notice that
> kafkaconsumer misses some of the records written into the kafka source
> topic. (note that sourceTask#start involves subscribing to the topic and
> sourceTask#poll involves the acutal kafkaConsumer.poll) call.
>
> i hypothesised that this might be due to kafka Consumer taking time to find
> the offset for the topic and given that we have the auto.offset.reset
> config set to latest this is the reason why it's happening, but I am unsure
> on what observability i can use to confirm this (I have set up the log
> level to error). but can it happen that the kafka connector is in running
> state but it's polling method which basically uses kaflaConsumer.poll() is
> still awaiting offset allocation? is there a way to verify this ina. more
> efficient manner?
>


Re: MirrorMaker2 Offset Replication Issue: MirrorCheckpointConnector Doesn't Replicate Final Offsets In Migrations

2024-06-21 Thread Greg Harris
Hi Mehrtens,

That is expected with the current implementation of offset translation. It
makes a tradeoff that translation further from the end of the topic is less
accurate, and it won't "converge" to a perfect translation.
The only "perfect" translation that is offered is when the consumer group
is at the end of the topic, and no new records are being emitted: then you
should expect the offsets to be translated precisely.

We're currently considering ways of improving translation, see
https://issues.apache.org/jira/browse/KAFKA-16364 for one of them. Please
get involved if you have some more insight.
Until then, you will need to design around translated offsets being
slightly behind source offsets, as MM2 does not offer strong consistency
guarantees here.

Thanks,
Greg

On Fri, Jun 21, 2024 at 1:54 PM Mehrtens, M 
wrote:

> HI - Apologies for the delay, it took me a while to find time to learn how
> to build from source and incorporate that into my test environment. I've
> built off of the 3.7.1-rc2 tag which includes the KAFKA-15905 fix.
> Unfortunately, this did not fix the issue.
>
> With 3.7.1-rc2, offset replication still paused once the source cluster
> consumer group was turned off.
>
> There is a gap in offsets between source & target cluster, where the
> source cluster has a last committed offset for the consumer group that is
> greater than the last committed offset replicated in the target cluster.
>
>
>
>
> On 5/28/24, 2:43 PM, "Greg Harris"  greg.har...@aiven.io.inva>LID> wrote:
>
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
>
>
>
> Hi Mehrtens,
>
>
> I think you are experiencing this problem:
> https://issues.apache.org/jira/browse/KAFKA-15905 <
> https://issues.apache.org/jira/browse/KAFKA-15905> which just received a
> patch which is due to release in 3.8.0/3.7.1. Read there for more context,
> and you can consider building and testing with that patch to see if it
> resolves your issue.
>
>
> Hope this helps,
> Greg Harris
>
>
> On Tue, May 28, 2024 at 12:35 PM Mehrtens, M  <mailto:mmehr...@amazon.com.inva>lid>
> wrote:
>
>
> > Hi there,
> > I’m running into an issue with MirrorMaker2 (MM2), specifically the
> > MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the
> final
> > batch of offsets to the target Kafka cluster during a migration. I
> believe
> > I can point to the sections of the MM2 code where this is happening, but
> I
> > can’t figure out why it works that way. Can someone help me understand if
> > this is a bug or a feature?
> >
> > Issue Summary:
> >
> > During a migration scenario with MirrorMaker2, it is desirable to shut
> > down a consumer on the source cluster and start it again in the target
> > cluster. However, it seems that during this process the final consumer
> > group offsets for the source cluster do not get replicated to the target
> > cluster.
> >
> > * Consumer is running against source cluster, committing offsets over
> > time
> > * Right before the cutover, consumer commits offset 10
> > *
> > emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> > later, CPC replicates offset 10
> > * Consumer commits offset 20, then gets shut down for migration
> > * CPC never replicates offset 20, causing a gap in committed offsets
> > MM2 Version:
> > Kafka 3.7.0
> > CPC Config:
> >
> > {
> >
> > "name": "mm2-cpc",
> >
> > "connector.class":
> > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> >
> > "clusters": "msksource,mskdest",
> >
> > "source.cluster.alias": "msksource",
> >
> > "target.cluster.alias": "mskdest",
> >
> > "target.cluster.bootstrap.servers": "b-1...",
> >
> > "source.cluster.bootstrap.servers": "b-1...",
> >
> > "tasks.max": "1",
> >
> > "key.converter": "
> > org.apache.kafka.connect.converters.ByteArrayConverter",
> >
> > "value.converter":
> > "org.apache.kafka.connect.converters.ByteArrayConverter",
> >
> > "replication.policy.class":
> > "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
> >
> > "replication.factor": "3",
> >
> > "checkpoin

Re: Kafka Connect Limits

2024-07-03 Thread Greg Harris
Hey Burton,

Thanks for your question and bug report.

The exception you included does not indicate that your connectors are
overloaded. The primary way of diagnosing an overloaded connector is the
consumer lag metric, and if you're seeing acceptable lag, that should
indicate that your connectors are capable of handling the load.
I would say that your workload is _unhealthy_ though, because it should be
able to operate without throwing this particular exception. I found one
previous report of this exception [1] but no action was taken. Instead, the
recommendation was to change the task implementation to perform offset
loading only during open().

It looks like this depends on the task implementation: If the consumer
rebalances and the task loses its assignment, and the task later calls
SinkTaskContext#offset() with the now-revoked partition, it would cause
this exception.
I'm not familiar with the Snowflake task, but upon a cursory inspection, it
looks like it buffers records [2] across poll() calls, and may call
SinkTaskContext#offset() in a later poll [3]. They appear to have a close()
method that could be used to prevent SinkTaskContext#offset() from being
called, but I'm not sure why it isn't effective.
You should contact the Snowflake Connector maintainers and know that they
are exposed to this exception.

I'll re-open this issue on the framework side to see if we can find a
solution to fix this for other connectors.

Thanks,
Greg

[1] https://issues.apache.org/jira/browse/KAFKA-10370
[2]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383
[3]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908
[4]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442

On Wed, Jul 3, 2024 at 12:25 PM Burton Williams 
wrote:

> Hi,
>
> I have 100+ sink connectors running with 100+ topics each with roughly 3
> partitions per topic. There are running on K8s on 10 pods with 6 cpus and
> 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0.
> This worked in the mini batch mode SNOWPIPE, but once i switched over to
> SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
> exception:
>
> State:  FAILED
>
> Worker ID:  10.136.83.73:8080
>
> Trace:  java.lang.IllegalStateException: No current assignment for
> partition bigpicture.bulk_change-0
> at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
>
> at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
>
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
>
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
>
> at
>
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>
> at
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
> My questions are:
>
>1. Are these connectors are overloaded? Can kafka connect handle this
>level of load?
>2. say it can, which i've seen it do, could it be that this is caused by
>underlying rebalancing? If so what would you recommend I do to mitigate?
>
>
> Thanks
>
> -BW
>


Re: [VOTE] 3.8.0 RC1

2024-07-16 Thread Greg Harris
Hi Josep,

I found this blocker regression:
https://issues.apache.org/jira/browse/KAFKA-17150
Summary: Connector configurations that specified converters with aliases
(e.g. JsonConverter instead of org.apache.kafka.connect.json.JsonConverter)
previously worked, and in this RC they throw validation exceptions and are
unable to start/run.
Severity: This would take tasks offline completely until the workers are
downgraded or the connectors are reconfigured with full class names
Impact: Aliases are commonly used in demos, and slightly less commonly used
in production environments. It is by no means a demo-only feature, and I
would expect this to break at least one production workload.
Risk: I've opened a PR to resolve it here:
https://github.com/apache/kafka/pull/16608 . We've already solved this bug
once before, so it seems low-risk to widen the scope of the original fix to
cover this new case.

I found this apparent bug, but as it doesn't appear to be a regression and
has very low severity, I don't think it's a blocker on its own:
https://issues.apache.org/jira/browse/KAFKA-17148
If we're rolling a new RC, hopefully we can include this if it's resolved
quickly.

I also performed the following successful verifications:
1. Verified that protected members don't appear in the generated javadocs
(KAFKA-14839)
2. Verified that Connect Distributed can start against a Kraft cluster
3. Verified that plugin scanning doesn't throw errors with jackson
(KAFKA-17111)
4. Verified that the allowed.paths configuration works for
DirectoryConfigProvider (KIP-993)

Unfortunately due to the blocker regression, I think I'll have to -1
(binding) this RC. Sorry!

Thanks,
Greg

On Tue, Jul 16, 2024 at 1:32 PM Jakub Scholz  wrote:

> +1 (non-binding). I used the staged Scala 2.13 binaries and the staged
> Maven artifacts. All seems to work fine. Thanks!
>
> Jakub
>
> On Mon, Jul 15, 2024 at 5:53 PM Josep Prat 
> wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second release candidate for Apache Kafka 3.8.0.
> >
> > Some of the major features included in this release are:
> > * KIP-1028: Docker Official Image for Apache Kafka
> > * KIP-974: Docker Image for GraalVM based Native Kafka Broker
> > * KIP-1036: Extend RecordDeserializationException exception
> > * KIP-1019: Expose method to determine Metric Measurability
> > * KIP-1004: Enforce tasks.max property in Kafka Connect
> > * KIP-989: Improved StateStore Iterator metrics for detecting leaks
> > * KIP-993: Allow restricting files accessed by File and Directory
> > ConfigProviders
> > * KIP-924: customizable task assignment for Streams
> > * KIP-813: Shareable State Stores
> > * KIP-719: Deprecate Log4J Appender
> > * KIP-390: Support Compression Level
> > * KIP-1018: Introduce max remote fetch timeout config for
> > DelayedRemoteFetch requests
> > * KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission
> > * KIP-1047 Introduce new org.apache.kafka.tools.api.Decoder to replace
> > kafka.serializer.Decoder
> > * KIP-899: Allow producer and consumer clients to rebootstrap
> >
> > Release notes for the 3.8.0 release:
> > https://home.apache.org/~jlprat/kafka-3.8.0-rc1/RELEASE_NOTES.html
> >
> >  Please download, test and vote by Thursday, July 18th, 12pm PT*
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~jlprat/kafka-3.8.0-rc1/
> >
> > * Docker release artifact to be voted upon(apache/kafka-native is
> supported
> > from 3.8+ release.):
> > apache/kafka:3.8.0-rc1
> > apache/kafka-native:3.8.0-rc1
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~jlprat/kafka-3.8.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 3.8 branch) is the 3.8.0 tag:
> > https://github.com/apache/kafka/releases/tag/3.8.0-rc1
> >
> > Once https://github.com/apache/kafka-site/pull/608 is merged. You will
> be
> > able to find the proper documentation under kafka.apache.org.
> > * Documentation:
> > https://kafka.apache.org/38/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/38/protocol.html
> >
> > * Successful Jenkins builds for the 3.8 branch:Unit/integration tests:
> >
> >
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%252Fkafka/detail/3.8/67/
> > (Some known flaky tests and builds from 64 to 68 show tests passing at
> > least once). Additionally, this is the CI run for the changes between RC0
> > and RC1:
> >
> >
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16593/1/pipeline/
> >
> > System tests: (Same as before)
> >
> >
> https://confluent-open-source-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/trunk/2024-07-10--001.f1f05b43-3574-45cb-836e-8968f02d722f--1720631619--apache--3.8--4ecbb75c1f

Re: [VOTE] 3.8.0 RC3

2024-07-23 Thread Greg Harris
Hi Josep,

+1 (binding)

I performed the following validations:
1. I formatted a new log directory and verified that the stray log message
is no longer present (KAFKA-17148)
2. I configured a connect worker with a custom plugin path and
plugin.discovery=service_load and verified that plugin scanning doesn't
throw errors with jackson (KAFKA-17111)
3. I verified that aliases for converters work as-intended (KAFKA-17150)
4. Verified that the allowed.paths configuration works as intended for the
DirectoryConfigProvider (KIP-993)

Thanks for running the release!
Greg

On Tue, Jul 23, 2024 at 12:51 PM Chris Egerton 
wrote:

> Forwarding my response to the other mailing list threads; apologies for
> missing the reply-all the first time!
>
> -- Forwarded message -
> From: Chris Egerton 
> Date: Tue, Jul 23, 2024 at 3:45 PM
> Subject: Re: [VOTE] 3.8.0 RC3
> To: 
>
>
> Hi Josep,
>
> Thanks for running this release! I'm +1 (binding).
>
>
> To verify, I:
> - Built from source using Java 11 with both:
> - - the 3.8.0-rc3 tag on GitHub
> - - the source artifact from
> https://home.apache.org/~jlprat/kafka-3.8.0-rc3/kafka-3.8.0-src.tgz
> - Checked signatures and checksums
> - Ran the quickstart using both:
> - - The build artifact from
> https://home.apache.org/~jlprat/kafka-3.8.0-rc3/kafka_2.13-3.8.0.tgz with
> Java 11 and Scala 13 in KRaft mode
> - - Our JVM-based broker Docker image, apache/kafka:3.8.0-rc3
> - Ran all unit tests
> - Ran all integration tests for Connect and MM2
>
>
> A few small, non-blocking notes:
> 1) The release notes categorize KAFKA-16445 [1] as an improvement, but I
> believe it should be listed as a new feature instead.
> 2) The following unit tests failed the first time around, but passed when
> run a second time:
> - (clients) SaslAuthenticatorTest.testMissingUsernameSaslPlain()
> - (core)
> ProducerIdManagerTest.testUnrecoverableErrors(UNKNOWN_SERVER_ERROR)
> - (core) RemoteLogManagerTest.testCopyQuota(false)
> -
> (core)
> SocketServerTest.testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend()
> - (core) ZkMigrationIntegrationTest.testMigrateTopicDeletions [7] Type=ZK,
> MetadataVersion=3.7-IV4,Security=PLAINTEXT
>   - This is also not actually a unit test, but an integration test. Looks
> like we haven't classified it correctly?
>
>
> [1] - https://issues.apache.org/jira/browse/KAFKA-16445
>
> Cheers,
>
> Chris
>
> On Tue, Jul 23, 2024 at 8:42 AM Josep Prat 
> wrote:
>
> > Here is the link to the system tests:
> >
> >
> https://confluent-open-source-kafka-system-test-results.s3-us-west-2.amazonaws.com/3.8/2024-07-22--001.ffbb03b2-61f4-4ebb-ae1f-af5c753682fb--1721733000--confluentinc--3.8--9a2b34b68c/report.html
> >
> > The Quota tests are known to fail in this CI system. Regarding the other
> > tests, they run successfully in the past and they are now timeouting.
> >
> > Best,
> >
> > On Tue, Jul 23, 2024 at 12:07 PM Josep Prat  wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the fourth candidate for release of Apache Kafka 3.8.0.
> > >
> > > Some of the major features included in this release are:
> > > * KIP-1028: Docker Official Image for Apache Kafka
> > > * KIP-974: Docker Image for GraalVM based Native Kafka Broker
> > > * KIP-1036: Extend RecordDeserializationException exception
> > > * KIP-1019: Expose method to determine Metric Measurability
> > > * KIP-1004: Enforce tasks.max property in Kafka Connect
> > > * KIP-989: Improved StateStore Iterator metrics for detecting leaks
> > > * KIP-993: Allow restricting files accessed by File and Directory
> > > ConfigProviders
> > > * KIP-924: customizable task assignment for Streams
> > > * KIP-813: Shareable State Stores
> > > * KIP-719: Deprecate Log4J Appender
> > > * KIP-390: Support Compression Level
> > > * KIP-1018: Introduce max remote fetch timeout config for
> > > DelayedRemoteFetch requests
> > > * KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission
> > > * KIP-1047 Introduce new org.apache.kafka.tools.api.Decoder to replace
> > > kafka.serializer.Decoder
> > > * KIP-899: Allow producer and consumer clients to rebootstrap
> > >
> > > Release notes for the 3.8.0 release:
> > > https://home.apache.org/~jlprat/kafka-3.8.0-rc3/RELEASE_NOTES.html
> > >
> > >  Please download, test and vote by Friday, July 26, 9am PT*
> > >
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > https://kafka.apache.org/KEYS
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~jlprat/kafka-3.8.0-rc3/
> > >
> > > * Docker release artifact to be voted upon:
> > > apache/kafka:3.8.0-rc3
> > > apache/kafka-native:3.8.0-rc3
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * Javadoc:
> > > https://home.apache.org/~jlprat/kafka-3.8.0-rc3/javadoc/
> > >
> > > * Tag to be voted upon (off 3.8 branch) is the

Re: Old Ticket (OutOfMemoryError in SaslClientAuthenticator during server restart)

2024-08-02 Thread Greg Harris
Hi Andreas,

Thank you for the reminder, it is unfortunate that this slipped through the
cracks.
This is certainly something that we should fix for everyone, so you don't
have to maintain patches yourself. I'll investigate some more and get back
to you.

Thanks!
Greg

On Fri, Aug 2, 2024 at 12:33 AM Andreas Martens1 
wrote:

> Hello good people of Kafka,
>
>
>
> We have an old ticket open (
> https://issues.apache.org/jira/browse/KAFKA-15247) with no replies.
>
>
>
> The fix has been working well for us for over a year now, here’s the main
> text of the ticket:
> When the Kafka server is installed in an open shift environment we are
> seeing cases where the clients receive OutOfMemory errors due to single
> large (1.2Gb) byte buffers being allocated by the client.
>
>
>
> From research this appears to be a known issue when a plaintext client is
> configured to attempt connection to a TLS secured endpoint however in this
> instance we see successful communication  via TLS and then when the Kafka
> server is restarted (or connectivity is broken) both the consumers and
> producers can throw OutOfMemoryError's with the following stacks:
>
> [ … removed stacks, see jira ticket for details … ]
>
> We believe that what is happening is that when the Kafka server goes down,
> in the RHOS environment the route is still available for some small period
> of time and the SASLClientAuthenticator is able to receive rogue packets
> which it interprets as a length to read off stream.
>
> For the consumer code since there is application code on the stack we were
> able to implement a workaround by catching the OOM but on the producer side
> the entire stack is Kafka client code.
>
> I looked at the SaslClientAuthenticator code and I can see that it's use
> of the network buffer is unbounded so I applied 2 patches to this code. The
> first limits the buffer size for authentication to 10Mb, the 2nd catches
> the OOM and instead fails auth.
>
> Using the patched client the customer has gone from being able to recreate
> this on at least 1 appserver for every Kafka server restart to not being
> able to reproduce the issue at all.
>
> I am happy to submit a PR but I wanted to get feedback before I did so.
> For instance is 10Mb a suitable maximum buffer size for auth, should the
> maximum perhaps be configurable instead and if so what is best practice for
> providing this configuration>
>
> Secondly catching the OOM doesn't feel like best practice to me however
> without doing this the entire application fails due to aggressive
> allocation of byte buffers in the SaslClientAuthenticator is there any
> alternative I should be considering.
>
>
>
> Is there anything we can do to get some attention on the ticket so we
> don’t have to patch every level of Kafka?
>
>
>
> Thanks!
>
> Andreas
>
>
>
> --
>
> *Andreas Martens*
>
> [image: signature_558150371]
>
> Senior Engineer
>
> App Connect Enterprise
>
> IBM
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: Question about config.action.reload

2024-08-20 Thread Greg Harris
Hi Ran,

The config.action.reload, subscribe, unsubscribe, and ConfigChangeCallback
features are partially-implemented. As far as I can tell if you're using
only AK connect-runtime and config providers, none of it is functional.

If you have a config provider that implements TTLs, the runtime will use
the value of config.action.reload will determine whether the TTLs expiring
restarts the connector. No AK ConfigProvider implementation implements
TTLs, so this should only have an effect with custom ConfigProvider
implementations.
If you have a config provider that implements subscribe/unsubscribe, it
will not be functional with the AK connect-runtime, as the runtime does not
call these methods or listen for the changes. I would expect that because
of this, no ConfigProvider implements these methods. If you had a fork of
the connect-runtime, it might make use of these methods.

In the future we should add support for these methods to the runtime:
https://issues.apache.org/jira/browse/KAFKA-14823 and possibly also add
support for these methods to our built-in ConfigProvider implementations to
make them more functional.
If you're interested in helping with this, it would be greatly appreciated!

Thanks,
Greg

On Tue, Aug 20, 2024 at 7:44 AM Ran Qin  wrote:

> Hi Kafka team,
>
> Currently I'm investigating the config.action.reload feature of Kafka
> Connect. I'm wondering how to enable this feature. E.g. If I use the
> FileConfigProvider,
> clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
> is it enough to set the config.action.reload = true in the connector
> configs? Or I have to implement my own subscribe and unsubscribe logic
> before using it?
> https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java#L52-L73
>
> What's more interesting is I noticed Herder should have corresponding
> logic to handle the onChange() function, but I didn't find how the function
> is used in Herder. I'm curious what's the best practice to use the feature
> to update the value from ConfigProvider? Thanks!
> [image: image.png]
>


Re: MirrorMaker 2 : Offset issues since 3.5.x+

2024-08-22 Thread Greg Harris
Hi Tom,

Thanks for your question!

In Kafka 3.5 there was an overhaul of the offset translation logic to avoid
a data loss bug. This [1] is the ticket that I've been using to represent
that overhaul.
In the implementation for that fix, we traded a lot of availability (read:
consumer lag) for correctness (the translated offset is never too far
ahead). We've been working to claw some of that availability back, most
notably through [2] in 3.6.0, [3] in 3.7.0, and [4] in 3.8.0, but it's
still not back to original levels.

I would definitely suggest trying 3.8.0 to see if it is suitable for your
use-case, as that last fix KAFKA-15905 is very impactful for Mirror Maker
instances that undergo restarts or rebalances.

>From the description "Some topics are dozen or so behind, others are
hundreds of messages behind" is it possible that the translation is already
working to the best of its ability, or may benefit from a lower
`offset.lag.max`, without more information I can't be sure. I do know that
running versions 3.5-3.7 with offset.lag.max=0 is not sufficient to get
good translation, the latest patches are quite important.
There are open issues [5, 6] for future improvements to the algorithm, but
there hasn't been much movement on those recently.

With the current implementation, I would expect the target consumer lag to
be approximately double the source consumer lag. The only time you should
expect "perfect" translation is for a consumer group that has committed at
the very end of a stable topic.

Thanks,
Greg

[1] https://issues.apache.org/jira/browse/KAFKA-12468
[2] https://issues.apache.org/jira/browse/KAFKA-15202
[3] https://issues.apache.org/jira/browse/KAFKA-15906
[4] https://issues.apache.org/jira/browse/KAFKA-15905
[5] https://issues.apache.org/jira/browse/KAFKA-16364
[6] https://issues.apache.org/jira/browse/KAFKA-16641

On Thu, Aug 22, 2024 at 1:35 PM Harty, Tom A 
wrote:

> We’ve been using MM2 one way for Active/Passive clusters for several years
> now. We started running into issues with 3.5.1. It hasn’t been keeping up
> with with consumer offset like it used to.
> To test this out we’ve done rollbacks to 3.4.0 and the offset issue
> corrects itself. Looking at the issue log it seems like some things around
> offset management have been corrected in 3.6.1 and 3.7.0. Unfortunately, we
> tried upgrading all the way to 3.7.0 and found the issue still remains.
> It doesn’t seem to matter if it’s the low-volume non-prod clusters or
> high-volume prod clusters. Some topics are dozen or so behind, others are
> hundreds of messages behind. When I look at the offsets-sync topic it seems
> to be producing meaningful data. And of course the messages themselves are
> fully insync.
>
> Since we’re using Strimzi put the question to Jakob and he suggested
> changing offset.lag.max to different lower values, but that didn’t really
> move the needle.
>
> What changed between 3.4.0 and later versions? Are there configuration
> changes we should look at?
>
>
>
> 
>
> This e-mail and any files transmitted with it are confidential and are
> intended solely for the use of the individual or entity to whom they are
> addressed. If you are not the intended recipient or the individual
> responsible for delivering the e-mail to the intended recipient, please be
> advised that you have received this e-mail in error and that any use,
> dissemination, forwarding, printing, or copying of this e-mail is strictly
> prohibited.
>
> If you have received this communication in error, please return it to the
> sender immediately and delete the original message and any copy of it from
> your computer system. If you have any questions concerning this message,
> please contact the sender. Disclaimer R001.0
>


Re: MirrorMaker2 Question

2024-08-23 Thread Greg Harris
Hi Harry,

Thanks for your question!

There were negative lag/data loss bugs fixed some time back [1, 2] so check
what version of the MirrorMaker connectors you're using. I would recommend
using the latest version of MM2 (3.8.0) if possible.

After upgrading you will need to clear the checkpoints topic and target
consumer offsets to allow MM2 to translate the offsets again, as MM2 will
not rewind the target consumer group in situations like this.

Thanks, and hope this helps!
Greg

[1] https://issues.apache.org/jira/browse/KAFKA-12635
[2] https://issues.apache.org/jira/browse/KAFKA-12468

On Fri, Aug 23, 2024 at 5:24 PM Harry Fallows
 wrote:

> Hi everyone,
>
> Does anyone know if it is possible for the Checkpoint connector to be
> ahead of the Source connector?
>
> For context, I have a target Kafka cluster that is reporting negative lag
> because the log end offset is 0 but the consumer group offset is 100. I
> have a tenuous theory that this could be caused in the followng sequence of
> events:
>
> - Offset is translated.
> - Messages in topic are cleaned up before being replicated.
>
> Does this sound possible? Any insight would be greatly appreciated.
>
> Kind regards
> Harry


Re: Will there be a 3.7.2?

2024-09-04 Thread Greg Harris
Hi Douglas,

Thanks for your interest in Apache Kafka!

At this time, we don't have a timeline for 3.7.2 yet, and no committer has
volunteered. Usually that happens once there are a sufficient number of
backported patches accumulated or a security patch is necessary. At this
time, the next planned release is 3.9.0.

I responded on the ticket about that specific issue. Once we know more
about the problem, we can find a solution and then finally consider
releasing it.

Thanks,
Greg

On Wed, Sep 4, 2024 at 9:00 AM Doug Whitfield
 wrote:

> I see there is a release plan for 3.7.1 (which is out):
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.7.1
>
> However, there is not one for 3.7.2:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.7.2
>
> This appears to be an incomplete fix, but I am wondering if it is worth
> pursuing a fix in 3.7.2: https://issues.apache.org/jira/browse/KAFKA-9228
>
> If a 3.7.2 does not seem likely, I will push to test with 3.8.0.
>
> Best Regards,
>
> Douglas Whitfield | Enterprise Architect
>
>
>
>
> This e-mail may contain information that is privileged or confidential. If
> you are not the intended recipient, please delete the e-mail and any
> attachments and notify us immediately.
>
>


Re: [DISCUSS] Mirroring the Repartition Topic: MirrorMaker2 and Streams interaction

2024-09-12 Thread Greg Harris
Hey everyone,

Bumping this topic to see if anyone has experience migrating Streams jobs
between clusters with Mirror Maker, and how you handled repartition topics.

Thanks!
Greg

On Mon, Aug 12, 2024 at 8:55 AM Greg Harris  wrote:

> Hi all,
>
> We were recently working to set up a MirrorMaker2 flow to migrate a
> Streams job between an existing Kafka cluster and a new Kafka cluster. We
> noticed the following behavior:
>
> * Streams sets infinite retention on the repartition topic, and uses the
> AdminClient deleteRecords call to implement "active retention" by deleting
> data that is no longer necessary.
> * MirrorMaker2 mirrors the data to topic with infinite retention, but
> _without_ the active AdminClient deleteRecords calls running, as it is not
> known to be a repartition topic and the destination job is not yet active.
> * The target topic grows without bound and exhausts disks on the
> destination.
>
> Could any Streams folks give their recommendations for this situation?
>
> 1. Should repartition topics have only streams-managed consumers, is
> inspecting the repartition topic intended to be undefined behavior?
> 2. If Streams deletes some records before MM2 mirrors them, could that
> "lossy" replication of the repartition topic cause corruption or data loss?
> 3. If someone truncates the repartition topic or MM2 doesn't replicate it,
> are at-least-once and/or exactly-once semantics preserved for the Streams
> job?
>
> And the more general question I'm trying to answer is: should MM2 have an
> option to explicitly "sync" the deleteRecords calls across the cluster
> boundaries? If "active retention" is a common pattern that just happens to
> show up in the repartition topic, maybe it would be desirable to try and
> sync the retention behaviors, and not just the retention properties.
>
> Thanks,
> Greg
>


Re: kafka MirrorMaker2 - stop offset sync when consumer group idle on source cluster

2024-10-04 Thread Greg Harris
Hi Massimiliano,

A small amount of re-delivery is expected when using MirrorMaker2 3.5+
offsets replication. This is because MM2 does not offer exactly once offset
replication.

I gave a conference talk recently about this which you may find
informative:
https://current.confluent.io/2024-sessions/mirrormaker-2s-offset-translation-isnt-exactly-once-and-thats-okay
You can also read about this here, and in the linked tickets:
https://issues.apache.org/jira/browse/KAFKA-12468

Thanks,
Greg

On Fri, Oct 4, 2024 at 1:31 PM Massimiliano Rotondo 
wrote:

> Hello,
>
> I'm experiencing a side bug with usage of Kafka MirrorMaker2, all is
> described in the below post:
>
>
> https://stackoverflow.com/questions/79051730/kafka-mirrormaker2-stop-offset-sync-when-consumer-group-idle-on-source-cluster
>
> The same "bug" has been reproduced using updated Kafka Cluster 3.7.1 on
> both source/destination (so it seems not depending on the old version of
> the source cluster).
>
> Could you please support with this?
>
> Best Regards,
>
> Massimiliano  Rotondo
>


Re: doc clarification about meesage format

2024-10-24 Thread Greg Harris
Hey Xiang,

Thanks for your questions! This is getting to the limit of my knowledge,
but I'll answer as best I can.

The partitionLeaderEpoch is only set once during the batch lifetime (during
Produce), and is not mutated any other time. This includes when data is
fetched by other replicas and by consumers, and when partition leadership
changes.
I believe this field is a record of which partitionLeaderEpoch was active
at the time the batch was produced, and can be different for different
batches within a partition as leadership changes. I wouldn't call this
"outdated", as I think there is an intentional use for this historical
leadership data in the log [1].

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation

Thanks,
Greg

On Wed, Oct 23, 2024 at 8:07 PM Xiang Zhang 
wrote:

> Thank you Greg for all the knowledge, some follow up questions.
>
> Does partitionLeaderEpoch always reflect the latest leader election or an
> old epoch can be allowed ? If it is the first case, then I agree
> partitionLeaderEpoch should not be included in CRC computation. But it
> raises some new questions for me, which is which roles will check the
> checksum and under what circumstances? I am asking this because after the
> producing process, any record in the broker log can have an outdated leader
> epoch field once leader election happens, right ? Do they get updated ?
>
> Sorry for all the questions, I have been using Kafka for several years and
> want to dive deep into it a little bit. I have become more interested and
> ready to find out on my own. But still look forward to your thoughts on
> this if the questions above do make some sense.
>
>
> Thanks,
> XIang
>
> Greg Harris  于2024年10月24日周四 00:25写道:
>
> > Hi Xiang,
> >
> > Thanks for your question! That sentence is a justification for why the
> > partitionLeaderEpoch field is not included in the CRC.
> >
> > If you mutate fields which are included in a CRC, you need to recompute
> the
> > CRC value. See [1] for mutating the maxTimestamp. Compare that with [2]
> for
> > setting the partitionLeaderEpoch.
> > This makes setting the partitionLeaderEpoch faster than setting the max
> > timestamp. And because setting the partitionLeaderEpoch happens on every
> > Produce request, it was optimized in the protocol design.
> > It does have the tradeoff that corruptions in the partitionLeaderEpoch
> are
> > not detected by the CRC, but someone decided this was worth the
> > optimization to the Produce flow.
> >
> > I don't have more information on why this optimization was made for
> > partitionLeaderEpoch and not maxTimestamp.
> >
> > Hope this helps,
> > Greg
> >
> > [1]
> >
> >
> https://github.com/apache/kafka/blob/2d896d9130f121e75ccba2d913bdffa358cf3867/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L371-L382
> > [2]
> >
> >
> https://github.com/apache/kafka/blob/2d896d9130f121e75ccba2d913bdffa358cf3867/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L385-L387
> >
> >
> > On Tue, Oct 22, 2024 at 7:51 PM Xiang Zhang 
> > wrote:
> >
> > > Hi all,
> > >
> > > I am reading official doc here:
> > > https://kafka.apache.org/documentation/#messageformat, and I could not
> > > fully understand it. If someone can clarify it for me, it would be much
> > > appreciated. The sentence is
> > >
> > > The partition leader epoch field is not included in the CRC computation
> > to
> > > avoid the need to recompute the CRC when this field is assigned for
> every
> > > batch that is received by the broker.
> > >
> > > I just don’t really get what the highlight part is trying to say.
> > >
> > > Regards,
> > > Xiang Zhang
> > >
> >
>


Re: Kafka producer transaction details

2024-10-25 Thread Greg Harris
Hi Artur,

Thanks for the question!

In transactions there's no distinction between partitions within the same
topic and partitions in different topics. You can have transactions across
multiple topics, and multiple partitions in those topics.

Hope this is more clear,
Greg

On Fri, Oct 25, 2024 at 5:11 AM Artur Bondek  wrote:

> Hi,
>
> I would like to ask about the kafka producer transaction. Documentation
> states (https://kafka.apache.org/documentation/#semantics):
> Also beginning with 0.11.0.0, *the producer supports the ability to send
> messages to multiple topic partitions* using transaction-like semantics:
> i.e. either all messages are successfully written or none of them are.
>
> Does it mean that the producer supports transactions in multiple topics
> (and multiple partitions)? Or it's just for multiple partitions but within
> a single topic?
>
> Artur Bondek
>


Re: CVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider

2024-11-18 Thread Greg Harris
Hi Everyone,

Due to an oversight, the Affected versions are incorrect. Version 3.7.1 of
kafka-clients is not vulnerable. This is the correct data:

Affected versions:

- Apache Kafka Clients 2.3.0 through 3.5.2
- Apache Kafka Clients 3.6.0 through 3.6.2
- Apache Kafka Clients 3.7.0

This issue affects Apache Kafka Clients: from 2.3.0 through 3.5.2, 3.6.2,
3.7.0.

Thanks,
Greg Harris

On Mon, Nov 18, 2024 at 10:42 AM Greg Harris  wrote:

> Severity: moderate
>
> Affected versions:
>
> - Apache Kafka Clients 2.3.0 through 3.5.2
> - Apache Kafka Clients 3.6.0 through 3.6.2
> - Apache Kafka Clients 3.7.0 through 3.7.1
>
> Description:
>
> Files or Directories Accessible to External Parties, Improper Privilege
> Management vulnerability in Apache Kafka Clients.
>
> Apache Kafka Clients accept configuration data for customizing behavior,
> and includes ConfigProvider plugins in order to manipulate these
> configurations. Apache Kafka also provides FileConfigProvider,
> DirectoryConfigProvider, and EnvVarConfigProvider implementations which
> include the ability to read from disk or environment variables.
> In applications where Apache Kafka Clients configurations can be specified
> by an untrusted party, attackers may use these ConfigProviders to read
> arbitrary contents of the disk and environment variables.
>
> In particular, this flaw may be used in Apache Kafka Connect to escalate
> from REST API access to filesystem/environment access, which may be
> undesirable in certain environments, including SaaS products.
> This issue affects Apache Kafka Clients: from 2.3.0 through 3.5.2, 3.6.2,
> 3.7.1.
>
>
> Users with affected applications are recommended to upgrade kafka-clients
> to version >=3.8.0, and set the JVM system property
> "org.apache.kafka.automatic.config.providers=none".
> Users of Kafka Connect with one of the listed ConfigProvider
> implementations specified in their worker config are also recommended to
> add appropriate "allowlist.pattern" and "allowed.paths" to restrict their
> operation to appropriate bounds.
>
>
> For users of Kafka Clients or Kafka Connect in environments that trust
> users with disk and environment variable access, it is not recommended to
> set the system property.
> For users of the Kafka Broker, Kafka MirrorMaker 2.0, Kafka Streams, and
> Kafka command-line tools, it is not recommended to set the system property.
>
> Credit:
>
> Greg Harris (finder)
> Mickael Maison (remediation reviewer)
> Chris Egerton (remediation reviewer)
>
> References:
>
> https://kafka.apache.org/
> https://www.cve.org/CVERecord?id=CVE-2024-31141
>
>


Re: CVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider

2024-11-18 Thread Greg Harris
Hi Everyone,

Due to an oversight, the Affected versions are incorrect. Version 3.7.1 of
kafka-clients is not vulnerable. This is the correct data:

Affected versions:

- Apache Kafka Clients 2.3.0 through 3.5.2
- Apache Kafka Clients 3.6.0 through 3.6.2
- Apache Kafka Clients 3.7.0


This issue affects Apache Kafka Clients: from 2.3.0 through 3.5.2, 3.6.2, 3.7.0.
Thanks,
Greg Harris


On Mon, Nov 18, 2024 at 10:42 AM Greg Harris  wrote:

> Severity: moderate
>
> Affected versions:
>
> - Apache Kafka Clients 2.3.0 through 3.5.2
> - Apache Kafka Clients 3.6.0 through 3.6.2
> - Apache Kafka Clients 3.7.0 through 3.7.1
>
> Description:
>
> Files or Directories Accessible to External Parties, Improper Privilege
> Management vulnerability in Apache Kafka Clients.
>
> Apache Kafka Clients accept configuration data for customizing behavior,
> and includes ConfigProvider plugins in order to manipulate these
> configurations. Apache Kafka also provides FileConfigProvider,
> DirectoryConfigProvider, and EnvVarConfigProvider implementations which
> include the ability to read from disk or environment variables.
> In applications where Apache Kafka Clients configurations can be specified
> by an untrusted party, attackers may use these ConfigProviders to read
> arbitrary contents of the disk and environment variables.
>
> In particular, this flaw may be used in Apache Kafka Connect to escalate
> from REST API access to filesystem/environment access, which may be
> undesirable in certain environments, including SaaS products.
> This issue affects Apache Kafka Clients: from 2.3.0 through 3.5.2, 3.6.2,
> 3.7.1.
>
>
> Users with affected applications are recommended to upgrade kafka-clients
> to version >=3.8.0, and set the JVM system property
> "org.apache.kafka.automatic.config.providers=none".
> Users of Kafka Connect with one of the listed ConfigProvider
> implementations specified in their worker config are also recommended to
> add appropriate "allowlist.pattern" and "allowed.paths" to restrict their
> operation to appropriate bounds.
>
>
> For users of Kafka Clients or Kafka Connect in environments that trust
> users with disk and environment variable access, it is not recommended to
> set the system property.
> For users of the Kafka Broker, Kafka MirrorMaker 2.0, Kafka Streams, and
> Kafka command-line tools, it is not recommended to set the system property.
>
> Credit:
>
> Greg Harris (finder)
> Mickael Maison (remediation reviewer)
> Chris Egerton (remediation reviewer)
>
> References:
>
> https://kafka.apache.org/
> https://www.cve.org/CVERecord?id=CVE-2024-31141
>
>


CVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider

2024-11-18 Thread Greg Harris
Severity: moderate

Affected versions:

- Apache Kafka Clients 2.3.0 through 3.5.2
- Apache Kafka Clients 3.6.0 through 3.6.2
- Apache Kafka Clients 3.7.0 through 3.7.1

Description:

Files or Directories Accessible to External Parties, Improper Privilege 
Management vulnerability in Apache Kafka Clients.

Apache Kafka Clients accept configuration data for customizing behavior, and 
includes ConfigProvider plugins in order to manipulate these configurations. 
Apache Kafka also provides FileConfigProvider, DirectoryConfigProvider, and 
EnvVarConfigProvider implementations which include the ability to read from 
disk or environment variables.
In applications where Apache Kafka Clients configurations can be specified by 
an untrusted party, attackers may use these ConfigProviders to read arbitrary 
contents of the disk and environment variables.

In particular, this flaw may be used in Apache Kafka Connect to escalate from 
REST API access to filesystem/environment access, which may be undesirable in 
certain environments, including SaaS products.
This issue affects Apache Kafka Clients: from 2.3.0 through 3.5.2, 3.6.2, 3.7.1.


Users with affected applications are recommended to upgrade kafka-clients to 
version >=3.8.0, and set the JVM system property 
"org.apache.kafka.automatic.config.providers=none".
Users of Kafka Connect with one of the listed ConfigProvider implementations 
specified in their worker config are also recommended to add appropriate 
"allowlist.pattern" and "allowed.paths" to restrict their operation to 
appropriate bounds.


For users of Kafka Clients or Kafka Connect in environments that trust users 
with disk and environment variable access, it is not recommended to set the 
system property.
For users of the Kafka Broker, Kafka MirrorMaker 2.0, Kafka Streams, and Kafka 
command-line tools, it is not recommended to set the system property.

Credit:

Greg Harris (finder)
Mickael Maison (remediation reviewer)
Chris Egerton (remediation reviewer)

References:

https://kafka.apache.org/
https://www.cve.org/CVERecord?id=CVE-2024-31141



Re: [DISCUSS] Java 23 Support for 3.9.x

2024-11-20 Thread Greg Harris
> Has the SecurityManager been fully removed in JDK 23?
> What is the effect of running Kafka 3.9.0 with JDK 23?

The SecurityManager has been degraded, so by default our users experience
an UnsupportedOperationException. They can work-around this by setting a
system property.
In JRE 24, JEP-486 [1] has removed this workaround, so an unpatched 3.9.x
will experience an UnsupportedOperationException unconditionally.

> I see https://issues.apache.org/jira/browse/KAFKA-17638
> which explicitly adds JDK 23 to our CI with a fix version of 4.0.0. Lack
of
> support for JDK 23 in 3.9.x is not a bug, it is what we planned (as far as
> I can tell).

Originally we were planning to get this change into 3.9.0, but we missed
the merge deadline. I opened that ticket afterwards to be fixed in 4.0.0
because that's the next release.
The patch was always intended to be backportable, and I intended to
backport it [2].

I understand that if we consider Java 23 support to be a feature (which is
the standing decision), this is a pretty obvious case of missing feature
freeze, and the current course of action (releasing in 4.0.0) is how we
would handle it.
I'm asking for this to be reconsidered as a bug fix, because it allows us
to backport the change, which is what our users are asking for [3].

Thanks,
Greg

[1] https://openjdk.org/jeps/486
[2] https://github.com/apache/kafka/pull/16522#issuecomment-2377340024
[3] https://lists.apache.org/thread/312lm617q05k87kxsrwlqhk8rfg29t7g

On Wed, Nov 20, 2024 at 11:50 AM David Arthur  wrote:

> Greg,
>
> I have not been following this closely, so apologies for some basic
> questions.
>
> Has the SecurityManager been fully removed in JDK 23?
>
> What is the effect of running Kafka 3.9.0 with JDK 23?
>
> By "4.0 breaking changes" do you mean changes to our JDK/Scala supported
> versions, removal or ZK, Kafka API changes, or something else?
>
> In general, I do not think we should change our supported JDK versions in a
> hotfix release. I see https://issues.apache.org/jira/browse/KAFKA-17638
> which explicitly adds JDK 23 to our CI with a fix version of 4.0.0. Lack of
> support for JDK 23 in 3.9.x is not a bug, it is what we planned (as far as
> I can tell).
>
> Also, I feel that we should not add too much to 3.9.x aside from actual
> bugs. If we backport things into 3.9.x, it will slow adoption of 4.x and
> increase our maintenance burden over time.
>
> Just my $0.02
>
> Thanks!
> David A
>
> On Wed, Nov 20, 2024 at 12:22 PM Greg Harris  >
> wrote:
>
> > Hi all,
> >
> > Now that 3.9.0 is released and 4.0.x is progressing, I'd like to
> understand
> > everyone's expectations about the 3.9.x branch, and ask for a specific
> > consensus on Java 23 support.
> >
> > Some context that I think is relevant to the discussion:
> > * KIP-1006 [1] proposes a backwards-compatible strategy for handling the
> > ongoing removal of the SecurityManager, which is merged and due to
> release
> > in 4.0.0 [2].
> > * KIP-1012 [3] rejected ongoing parallel feature development on a 3.x
> > branch while having trunk on 4.x.
> > * During the 3.9.0 release, the patch [2] was rejected [4] due to being a
> > new feature which did not meet the feature freeze deadline.
> > * Other than the SecurityManager removal, there are additional PRs which
> > would also need to be backported for full Java 23 support [5] including a
> > Scala patch upgrade.
> > * Downstream users are asking for a backport [6] because adding support
> for
> > Java 23 would obligate them to also include the 4.0 breaking changes.
> >
> > So while adding Java version support in the past has been a KIP-less
> > feature and normally only appears in the next version, it happens to
> align
> > with a major version bump this time. This will cause additional pain for
> > users if we do not elect to backport this.
> >
> > Thanks,
> > Greg
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support
> > [2] https://github.com/apache/kafka/pull/16522
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8+and+3.9+release
> > [4] https://lists.apache.org/thread/xy5rwd1w274qgpwf3qxxnzlqpoly5d4p
> > [5] https://issues.apache.org/jira/browse/KAFKA-17638
> > [6] https://github.com/apache/kafka/pull/16522#issuecomment-2488340682
> >
>
>
> --
> David Arthur
>


[DISCUSS] Java 23 Support for 3.9.x

2024-11-20 Thread Greg Harris
Hi all,

Now that 3.9.0 is released and 4.0.x is progressing, I'd like to understand
everyone's expectations about the 3.9.x branch, and ask for a specific
consensus on Java 23 support.

Some context that I think is relevant to the discussion:
* KIP-1006 [1] proposes a backwards-compatible strategy for handling the
ongoing removal of the SecurityManager, which is merged and due to release
in 4.0.0 [2].
* KIP-1012 [3] rejected ongoing parallel feature development on a 3.x
branch while having trunk on 4.x.
* During the 3.9.0 release, the patch [2] was rejected [4] due to being a
new feature which did not meet the feature freeze deadline.
* Other than the SecurityManager removal, there are additional PRs which
would also need to be backported for full Java 23 support [5] including a
Scala patch upgrade.
* Downstream users are asking for a backport [6] because adding support for
Java 23 would obligate them to also include the 4.0 breaking changes.

So while adding Java version support in the past has been a KIP-less
feature and normally only appears in the next version, it happens to align
with a major version bump this time. This will cause additional pain for
users if we do not elect to backport this.

Thanks,
Greg

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support
[2] https://github.com/apache/kafka/pull/16522
[3]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8+and+3.9+release
[4] https://lists.apache.org/thread/xy5rwd1w274qgpwf3qxxnzlqpoly5d4p
[5] https://issues.apache.org/jira/browse/KAFKA-17638
[6] https://github.com/apache/kafka/pull/16522#issuecomment-2488340682


Re: Proper way to horizontally scale kafka 3 kraft cluster

2024-12-04 Thread Greg Harris
Hi Gregory,

The ability to add or remove controllers was added by KIP-853 in v3.9.0.
You will not be able to easily change the set of controllers in v3.6.1. We
have relevant documentation here [1].

As far as I understand, there's also a caveat that existing Kraft clusters
that use a static quorum cannot use KIP-853 even after upgrading [2].
I think to have a dynamic quorum, you will need to either create a new 3.9+
cluster, or perform the ZK->Kraft migration while running 3.9+.

I hope this helps,
Greg Harris

[1] https://kafka.apache.org/documentation/#kraft_reconfig
[2] https://issues.apache.org/jira/browse/KAFKA-16538

On Wed, Dec 4, 2024 at 10:04 AM Gregory Rybalka 
wrote:

> I am working with a Kafka 3.6.1 cluster (KRaft mode enabled) and would
> like some guidance on scaling Kafka brokers and controllers. Below are the
> details of my setup and the steps I followed, along with some challenges
> encountered. So before going on production, I tested the scaling process in
> a 2-node test environment. ( broker and KRaft controllers on the same nodes
> )
>
> Test Cluster Setup:
>
> Initial Configuration:
>
> Nodes: 2
> Controller quorum configuration on nodes: controller.quorum.voters=
> 0@172.26.1.103:9093,1@172.26.1.189:9093
>
> Scaling Process:
>
> Added a new node (172.26.1.81).
> Configured controller.quorum.voters on the new node as:
> controller.quorum.voters=0@172.26.1.103:9093,1@172.26.1.189:9093,
> 2@172.26.1.81:9093
> Started Kafka on the new node, which connected successfully as an
> observer in the KRaft quorum.
>
> Issues Encountered:
>
> The new node was listed as an observer instead of a voter. after
> starting
>
> ClusterId:  mXMb-Ah9Q8uNFoMtqGrBag
> LeaderId:   0
> LeaderEpoch:7
> HighWatermark:  33068
> MaxFollowerLag: 0
> MaxFollowerLagTimeMs:   0
> CurrentVoters:  [0,1]
> CurrentObservers:   [2]
>
> Updating controller.quorum.voters on the old nodes caused an error:
>
> [2024-12-02 12:04:11,314] ERROR [SharedServer id=0] Got exception while
> starting SharedServer (kafka.server.SharedServer)
> java.lang.IllegalStateException: Configured voter set: [0, 1, 2] is
> different from the voter set read from the state file: [0, 1]. Check if the
> quorum configuration is up to date, or wipe out the local state file if
> necessary
> at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
> at
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:375)
> at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:248)
> at kafka.raft.KafkaRaftManager.(RaftManager.scala:174)
> at kafka.server.SharedServer.start(SharedServer.scala:260)
> at kafka.server.SharedServer.startForController(SharedServer.scala:132)
> at kafka.server.ControllerServer.startup(ControllerServer.scala:192)
> at
> kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
> at
> kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
> at scala.Option.foreach(Option.scala:437)
> at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
> at kafka.Kafka$.main(Kafka.scala:113)
> at kafka.Kafka.main(Kafka.scala)
> [2024-12-02 12:04:11,325] INFO [ControllerServer id=0] Waiting for
> controller quorum voters future (kafka.server.ControllerServer)
> [2024-12-02 12:04:11,328] INFO [ControllerServer id=0] Finished waiting
> for controller quorum voters future (kafka.server.ControllerServer)
> [2024-12-02 12:04:11,331] ERROR Encountered fatal fault: caught exception
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.NullPointerException: Cannot invoke
> "kafka.raft.KafkaRaftManager.apiVersions(" because the return value of
> "kafka.server.SharedServer.raftManager()" is null
> at kafka.server.ControllerServer.startup(ControllerServer.scala:205)
> at
> kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
> at
> kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
> at scala.Option.foreach(Option.scala:437)
> at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
> at kafka.Kafka$.main(Kafka.scala:113)
> at kafka.Kafka.main(Kafka.scala)
>
> So according to logs I need to “wipe out the local state file.” Okay so
> the file which contains the word “state” is located in the data.dir folder
>
> /var/lib/kafka/data/__cluster-metadata-0
>
> So I delete that file from old broker 103 and restart Kafka, which
> completed successfully. So I asked the 103 node about KRaft quorum status
> and got:
>
> ClusterId:  mXMb-Ah9Q8uNFoMtqGrBag
> LeaderId:   2
> L

Re: Extracting key-value pair from Produce Request API.

2024-12-27 Thread Greg Harris
Hi,

Thanks for your question.

It appears you're using the legacy consumer API, which was removed in 2.0.0
and is no longer supported.
I would strongly suggest building on top of the modern Java Consumer API at
this time.

The modern API exposes the deserialized headers via the
ConsumerRecord#headers method:
https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html

Hope this helps,
Greg

On Fri, Dec 27, 2024, 10:19 AM Chain Head  wrote:

> Hi,
> I am struggling to get the key-value pair from the Produce Request API. I
> want to write it to a Buffer for further processing. I can't seem to get
> the `k` and `v` values whereas the `.keySize` and `.valueSize` are reported
> correctly. Please advise how to extract the key value pairs from the
> Produce request API payload.
>
> For better format, see https://pastebin.com/ZKad1ET6
>
> MemoryRecords partitionRecords = (MemoryRecords)
> partitionData.records();
> for (RecordBatch batch : partitionRecords.batches()) {
>   // Iterate through reords of a batch
>   Buffer batchBuffer = Buffer.buffer();
>   Iterator it =
> batch.iterator();
>   while (it.hasNext()) {
> org.apache.kafka.common.record.Record record = it.next();
>
> String k = "";
> String v = "";
>
> for (Header header : record.headers()) {
>   v = new String(header.value());
>   // Some logic with k and v to write to a Buffer
> }
>
> if (record.hasKey()) {
>   ByteBuffer keyBuffer = record.key();
>   ByteBuffer valueBuffer = record.value();
>
>   if (record.hasValue()) {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> v = new String(valueBuffer.array(), valueBuffer.position(),
> record.valueSize());
> // Some logic with k and v to write to a Buffer
>   } else {
> k = new String(keyBuffer.array(), keyBuffer.position(),
> record.keySize());
> // Some logic with k and v to write to a Buffer
>   }
> } else {
>   // Empty buffer
> }
>   }
>   }
>


Re: Extracting key-value pair from Produce Request API.

2024-12-27 Thread Greg Harris
Hi,

I apologize for misunderstanding your initial email. Unfortunately I still
don't understand your question. Could you clarify what result you expect
from your code, and what the actual behavior is?

Maybe also try and simplify the reproduction case. I see confusing use of a
String constructor that could be causing your problem or masking it.

Thanks,
Greg


On Fri, Dec 27, 2024, 5:47 PM Chain Head  wrote:

> Hi,
> I am looking at parsing Produce request API on broker side. This is for
> simulating a broker. No consumer is involved. Also, I am using 3.8.0.
>
> On Sat, 28 Dec, 2024, 04:47 Greg Harris, 
> wrote:
>
> > Hi,
> >
> > Thanks for your question.
> >
> > It appears you're using the legacy consumer API, which was removed in
> 2.0.0
> > and is no longer supported.
> > I would strongly suggest building on top of the modern Java Consumer API
> at
> > this time.
> >
> > The modern API exposes the deserialized headers via the
> > ConsumerRecord#headers method:
> >
> >
> https://kafka.apache.org/39/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> >
> > Hope this helps,
> > Greg
> >
> > On Fri, Dec 27, 2024, 10:19 AM Chain Head  wrote:
> >
> > > Hi,
> > > I am struggling to get the key-value pair from the Produce Request
> API. I
> > > want to write it to a Buffer for further processing. I can't seem to
> get
> > > the `k` and `v` values whereas the `.keySize` and `.valueSize` are
> > reported
> > > correctly. Please advise how to extract the key value pairs from the
> > > Produce request API payload.
> > >
> > > For better format, see https://pastebin.com/ZKad1ET6
> > >
> > > MemoryRecords partitionRecords = (MemoryRecords)
> > > partitionData.records();
> > > for (RecordBatch batch : partitionRecords.batches()) {
> > >   // Iterate through reords of a batch
> > >   Buffer batchBuffer = Buffer.buffer();
> > >   Iterator it =
> > > batch.iterator();
> > >   while (it.hasNext()) {
> > > org.apache.kafka.common.record.Record record = it.next();
> > >
> > > String k = "";
> > > String v = "";
> > >
> > > for (Header header : record.headers()) {
> > >   v = new String(header.value());
> > >   // Some logic with k and v to write to a Buffer
> > > }
> > >
> > > if (record.hasKey()) {
> > >   ByteBuffer keyBuffer = record.key();
> > >   ByteBuffer valueBuffer = record.value();
> > >
> > >   if (record.hasValue()) {
> > > k = new String(keyBuffer.array(), keyBuffer.position(),
> > > record.keySize());
> > > v = new String(valueBuffer.array(),
> > valueBuffer.position(),
> > > record.valueSize());
> > > // Some logic with k and v to write to a Buffer
> > >   } else {
> > > k = new String(keyBuffer.array(), keyBuffer.position(),
> > > record.keySize());
> > > // Some logic with k and v to write to a Buffer
> > >   }
> > > } else {
> > >   // Empty buffer
> > > }
> > >   }
> > >   }
> > >
> >
>


Re: JoinGroup API response timing.

2025-01-21 Thread Greg Harris
Hi,

Yes you are correct. The "classic" Kafka Group Protocol is a synchronizing
barrier for all members.
All JoinGroup member responses are returned after all JoinGroup member
requests are received.

Thanks,
Greg

On Tue, Jan 21, 2025 at 7:10 AM Chain Head  wrote:

> Assume that three consumers of a certain group want to connect to a broker
> for a topic with 3 partitions. After the FindCoordinator API is done, the
> consumers send JoinGroup. Since the broker cannot know in advance how many
> consumers are expected to join, it waits group.initial.rebalance.delay.ms
> before starting a rebalance.
>
> Therefore, does this mean the JoinGroup API response of each request is
> "held" until the waiting period is over?
>
> Best regards.
>


Re: Random access to kafka messages

2025-01-24 Thread Greg Harris
Hello Jan,

I also have not heard of a use case like this for Kafka. One statistic that
I think you might need to manage is batch size, and its effect on
compression and read amplification.

Larger batches on the producer side can make your producers more performant
and compression more effective. But large batches will also increase the
amount of data delivered to consumers that is then discarded to read a
single message. This additional data transfer wastes disk bandwidth on the
brokers and network bandwidth on the broker and consuming application.

So while a lot of existing tuning advice and optimizations in Kafka work
with larger batches, you will need to spend some time profiling and making
batch size tradeoffs.

Hope this helps,
Greg

On Fri, Jan 24, 2025, 3:05 AM Ömer Şiar Baysal 
wrote:

> Hi,
>
> The data you gathered shows promising results,  one thing the consider is
> testing how the Page Cache that Kafka utilizes affect the response times,
> which greatly improves response time for the fetch requests that are
> already in the cache since it is stored in memory and may give an
> impression that all the fetch requests performance would be the same, it is
> in fact would be different for non-cached data.
>
> Good luck and let me know if you need more information about page cache.
> Omer Siar Baysal
>
>
> On Fri, Jan 24, 2025, 11:48 Jan Wypych 
> wrote:
>
> > Hello,
> >
> > We are currently designing a system that will ingest some XML messages
> and
> > then it will store them into some kind of long-term storage (years). The
> > access pattern to data shows that new messages (1-2 weeks old) will be
> > frequent, older data will be accessed rarely.
> > We currently chose Kafka as an ingest part, some kind of S3 for cold
> > long-term, but we are still thinking how we should approach hot storage
> > (1-2 weeks). We established that our S3 for hot data is too slow.
> > We have a few options for this hot part of a storage, but one of them is
> > Kafka (it will greatly simplify the whole system and Kafka reliability if
> > extremely high).
> > Each Kafka message can be accessed using the offset/partition pair (we
> > need some metadata from messages anyway, so getting this pair is free for
> > us). Kafka stores its data in segments, each of them has its own index,
> so
> > we do not do a full scan of a topic. Consumer configs can be tweaked, so
> we
> > do not prefetch more than one message, do not commit offsets for consumer
> > group etc. Our initial tests show very promising results with high
> > throughput and low latency (3 brokers, 300GB in 50 partitions, 10k
> > messages/s, average latency under 3ms). Everything we have seen so far
> > tells us that it should work.
> > However, this goes against the common understanding of Kafka usage, as a
> > streaming solution. We searched the internet and could not find such use
> > case deployed.
> > On the other hand, every time we found someone discouraging such use
> case,
> > there was no technical explanation behind it. Just a vague "Kafka was not
> > crated for this, better to use X".
> > So, my question to you is:
> > Does anybody see any technical reason why our approach (fetch messages by
> > offset/partition in random order) should not work? Is there some
> limitation
> > we do not see, that could bite us in production (10-20 TB of data in
> > topics, more than 3 brokers obviously)?
> >
> >  Best regards,
> > Jan Wypych
> >
>


Re: Random access to kafka messages

2025-01-28 Thread Greg Harris
Hello Jan,

Thanks for your question.

It is my understanding that the producer batching strongly informs the
on-disk layout, because appending to the log is done on a per-batch level,
not a per-record level. This is done for any number of reasons, one of them
is being able to append data without uncompressing and recompressing it on
the broker side.
There are situations when Kafka must fall back to recompressing batches
before append, but I don't think they're relevant to this discussion.

The consumer batching is also somewhat informed by the on-disk layout. Data
fetched by the consumers should always be one or more batches, never a
fraction of a batch. This is done to avoid uncompressing and recompressing
the data similar to the Produce side, but also to enable sendfile and block
cache optimizations.
So while consumer batching may be configurable per-client, this is done by
including multiple batches together. Consumer fetches should have a lower
bound, which is the producer batch size set at append time.

I would encourage you to test this and verify the behavior for yourself. I
am somewhat new in this area of the codebase so I could also be missing
something.

Thanks,
Greg

On Tue, Jan 28, 2025 at 12:15 AM Jan Wypych
 wrote:

> Hello Greg,
>
> Thanks for mentioning batch sizes.
> Could you please elaborate on the impact of producer batch size?
> My understanding is that producer just writes messages (in batches of
> configurable size) to Kafka, but these batches does not impact data stored
> on disks. The consumer reading messages should be totally decupled from how
> producer is writing them, but maybe I am missing something.
> For the batch reading we are aware of that and will set fetch.min.bytes
> and fetch.max.bytes to force Kafka not to prefetch more than one message.
> Our understanding (maybe wrong) is that Kafka have many IO threads (I am
> not sure if there are/can be separate reading and writing pools) and each
> one can process IO assignments with different parameters (you can have one
> thread returning data after 10 bytes read and second one waiting for 10MB).
> By this understanding (and, as said, we may be wrong here) batch sizes can
> be an issue, but they are configurable on the individual producer and
> consumer level.
>
> Best regards,
> Jan
>
> -Original Message-
> From: Greg Harris 
> Sent: Friday, January 24, 2025 9:02 PM
> To: Users 
> Subject: Re: Random access to kafka messages
>
> Hello Jan,
>
> I also have not heard of a use case like this for Kafka. One statistic
> that I think you might need to manage is batch size, and its effect on
> compression and read amplification.
>
> Larger batches on the producer side can make your producers more
> performant and compression more effective. But large batches will also
> increase the amount of data delivered to consumers that is then discarded
> to read a single message. This additional data transfer wastes disk
> bandwidth on the brokers and network bandwidth on the broker and consuming
> application.
>
> So while a lot of existing tuning advice and optimizations in Kafka work
> with larger batches, you will need to spend some time profiling and making
> batch size tradeoffs.
>
> Hope this helps,
> Greg
>
> On Fri, Jan 24, 2025, 3:05 AM Ömer Şiar Baysal 
> wrote:
>
> > Hi,
> >
> > The data you gathered shows promising results,  one thing the consider
> > is testing how the Page Cache that Kafka utilizes affect the response
> > times, which greatly improves response time for the fetch requests
> > that are already in the cache since it is stored in memory and may
> > give an impression that all the fetch requests performance would be
> > the same, it is in fact would be different for non-cached data.
> >
> > Good luck and let me know if you need more information about page cache.
> > Omer Siar Baysal
> >
> >
> > On Fri, Jan 24, 2025, 11:48 Jan Wypych
> > 
> > wrote:
> >
> > > Hello,
> > >
> > > We are currently designing a system that will ingest some XML
> > > messages
> > and
> > > then it will store them into some kind of long-term storage (years).
> > > The access pattern to data shows that new messages (1-2 weeks old)
> > > will be frequent, older data will be accessed rarely.
> > > We currently chose Kafka as an ingest part, some kind of S3 for cold
> > > long-term, but we are still thinking how we should approach hot
> > > storage
> > > (1-2 weeks). We established that our S3 for hot data is too slow.
> > > We have a few options for this hot part of a storage, but one of
> > > them is Kafka (it will greatly simplify the whole system an

Re: JoinGroup API response timing.

2025-01-21 Thread Greg Harris
Hi,

Thanks for the follow up.

By "classic" I meant the protocol implemented by the SyncGroup/JoinGroup
API [1]. It's a general group protocol that is still fully supported in
Kraft, and at this time has no intention of being deprecated.
It's called "classic" to distinguish it from the newer KIP-848 [2]
ConsumerGroupHeartbeat API and the share group protocol used in KIP-932
[3]. It is my understanding that these other protocols are _not_ a
synchronizing barrier in the same way the "classic" protocol is.

Hope this helps,
Greg

[1]
https://github.com/apache/kafka/blob/adb033211497e539725366960e1013a4638de59f/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
[3]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

On Tue, Jan 21, 2025 at 4:41 PM Chain Head  wrote:

> Thanks.
>
> By "classic" you mean pre-KRaft consensus? What is the "current" Kafka
> Group protocol?
>
> Best regards.
>
> On Tue, Jan 21, 2025 at 9:55 PM Greg Harris 
> wrote:
>
> > Hi,
> >
> > Yes you are correct. The "classic" Kafka Group Protocol is a
> synchronizing
> > barrier for all members.
> > All JoinGroup member responses are returned after all JoinGroup member
> > requests are received.
> >
> > Thanks,
> > Greg
> >
> > On Tue, Jan 21, 2025 at 7:10 AM Chain Head 
> wrote:
> >
> > > Assume that three consumers of a certain group want to connect to a
> > broker
> > > for a topic with 3 partitions. After the FindCoordinator API is done,
> the
> > > consumers send JoinGroup. Since the broker cannot know in advance how
> > many
> > > consumers are expected to join, it waits
> > group.initial.rebalance.delay.ms
> > > before starting a rebalance.
> > >
> > > Therefore, does this mean the JoinGroup API response of each request is
> > > "held" until the waiting period is over?
> > >
> > > Best regards.
> > >
> >
>


Re: Kraft setup

2025-03-15 Thread Greg Harris
Hi Sisindri,

Thanks for your question.

This is not supported with the current KRaft implementation, and I don't
believe there's any plans to support it in the future.
You will need to migrate from a single ZK cluster to multiple KRaft
quorums, as the final bridge release 3.9 does not support a
chroot-equivalent.
See another thread about this here:
https://lists.apache.org/thread/hsnolbnl05vohj9tqcz4glb3hgkhh338

I hope this helps,
Greg

On Wed, Mar 5, 2025 at 11:32 PM Manabolu Sisindri 
wrote:

> Hi Team,
>
> Can a Kafka Controller Cluster support multiple Kafka setups like ZooKeeper
> does (with chroot paths)?"
>
> Existing setup: In existing setup we are using single zookeeper setup for
> multiple kafka setups(8). one ZooKeeper cluster handling multiple Kafka
> clusters using different paths (/kafka-cluster1, /kafka-cluster2, etc.
>
> Expected one: As we are planning to migrate kafka from zookeeper to kraft
> so i would like to know do we have that possibility in kraft like single
> kafka controller setup can support multiple kafka setups?
>
> Please help me with your insights on this..
>
> --
> Regards,
> Sisindri,
>


Re: OAuth Token Refresh Fails After System Sleep - Issue in ExpiringCredentialRefreshingLogin

2025-03-13 Thread Greg Harris
Hi Adrien,

Thanks for the report!

I had some questions about your observations:

> Upon
> waking, the thread simply continues its sleep operation without any
> awareness that a significant amount of time may have passed.

Were you able to take a stack trace, attach a debugger to see the state of
the thread/variables, or check logging to confirm this?

time.sleep() ultimately calls Thread.sleep(), which delegates to the JVM.
If there is a single Thread.sleep() invocation which never returns after
the system hibernating, I don't think there is much that Kafka can do to
mitigate that, and it would affect many more systems than just the
credential refresh. It's worth ruling this out with some more investigation.

What seems more likely to me is that either:
* The thread is exiting, possibly due to a thread interruption
* The thread is trying to refresh but unable to complete it successfully
for some reason
* The thread erroneously computes the refresh time to still be in the
future and continues to sleep with additional Thread.sleep calls that
behave as expected

With regards to the last point, maybe we're running into this:
https://issues.apache.org/jira/browse/KAFKA-7945 which applies a fallback
10 minute delay.
There are some pretty detailed logs on that code path that could explain
more about the state of the refresh thread.

> Do you consider this behavior a bug that should be addressed? And would
you
> recommend creating a KIP for this issue?

If this is a bug in Kafka and not in the JVM, and the fix is reasonable and
backwards-compatible, we can proceed without a KIP, just as a normal bug
fix.
Please create a JIRA ticket with the results of your investigation, and if
you're interested, assign it to yourself and try to work out a solution.

Thanks,
Greg

On Thu, Mar 13, 2025 at 10:16 AM Adrien Wattez 
wrote:

> Hello Kafka community,
>
> I've encountered an issue with OAuth authentication in Kafka when running
> on a system that goes to sleep/hibernates. I believe I've identified a flaw
> in the token refresh mechanism that affects reliability in certain
> environments.
>
> When using OAuth authentication between brokers and controllers, the token
> refresh mechanism fails after system sleep/hibernation, causing all
> authentication to fail until the service is restarted.
>
> I observed this on my Confluent Platform setup running on a MacBook:
>
> OAuth token was set to refresh at 18:31:29
> System went to sleep at 18:19
> System woke up at 18:53, after the tokens had expired at 18:42
> No refresh login attempt occurred after wakeup
> All authentication failed with expired tokens
> After reviewing the ExpiringCredentialRefreshingLogin class code, I can
> see the issue stems from how the refresh thread sleeps until the next
> scheduled refresh time:
>
> log.info("[Principal={}]: Expiring credential re-login sleeping until:
> {}", principalLogText(),
> new Date(nextRefreshMs));
> time.sleep(nextRefreshMs - nowMs);
> When the system goes to sleep, this thread's execution is suspended. Upon
> waking, the thread simply continues its sleep operation without any
> awareness that a significant amount of time may have passed. There's no
> mechanism to detect that the planned refresh window has been missed due to
> system suspension.
>
> I understand that hibernating a Kafka cluster isn't a common production
> scenario, and this issue might not affect many users in production
> environments. However, I believe this vulnerability in the token refresh
> mechanism could be problematic in certain scenarios like development
> environments, containerized setups, or any situation where process
> suspension might occur.
>
> Do you consider this behavior a bug that should be addressed? And would
> you recommend creating a KIP for this issue?
>
> I'm asking because while this might be a niche case for production, the
> authentication failure is particularly frustrating in development
> environments as it requires a full cluster restart to resolve.
>
> Thanks for your help,
>
> Adrien


Re: Infinite Loop Running Kafka Connect/MM2 on Source Kafka Cluster

2025-02-13 Thread Greg Harris
Hi Mehrtens,

The MirrorSourceConnector/Task relies on the Connect framework to
instantiate the producer used for mirroring.
The "target.cluster.bootstrap.servers" is indeed ineffective for changing
the framework client configuration, that should only affect clients
instantiated within the task (admin clients, offset-syncs producers, etc)

I think the configuration that you need to provide is
"producer.override.bootstrap.servers", in addition to the configuration you
already have.
Some of your producer tuning may also need to be specified via
"producer.override.*" in order to affect the mirrored records.

I don't understand your test cases exactly, are you observing some version
dependence here? I don't recall any recent changes which would invalidate
the above information.

Hope this helps,
Greg

On Thu, Feb 13, 2025 at 9:35 AM Mehrtens, Mazrim
 wrote:

> I’ve found that Kafka Connect never respects the
> “target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task
> config. It always uses the Kafka Connect broker information instead.
> Running Kafka Connect on the source cluster causes an infinite loop of
> messages read from the source cluster, then written back to the same topic
> on the source cluster when using an IdentityReplicationPolicy. Running
> Kafka Connect on a third cluster causes the messages to get written to the
> Kafka Connect cluster, not the configured target cluster. Below are the
> scenarios I tested, and an example of the Kafka Connect task settings used.
> The only scenario that produced the correct result is running Kafka Connect
> on the target server.
>
> Is this a hard requirement? Am I misunderstanding how the MM2 configs get
> used in Kafka Connect? We generally recommend that for MirrorMaker2
> applications, users run Kafka Connect against the “target” Kafka cluster to
> help minimize network latency for the producers. However, in some scenarios
> it makes sense to run Kafka Connect against the “source” Kafka cluster, or
> even a third, unrelated Kafka cluster. This is because we don’t always have
> control over topic creation in the source/target clusters, and want
> MirrorMaker2 to only replicate data/offsets to / from existing topics.
>
>
> connect-distributed.properties:
>
> bootstrap.servers=source.broker.address:9092
> group.id=demo-loop
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> offset.storage.topic=connect-offsets-demo-loop
> offset.storage.replication.factor=3
> config.storage.topic=connect-configs-demo-loop
> config.storage.replication.factor=3
> status.storage.topic=connect-status-demo-loop
> status.storage.replication.factor=3
> offset.flush.interval.ms=1
> connector.client.config.override.policy=All
>
> Kafka Connect MM2 task config:
>
> {
>   "name": "mm2-msc",
>   "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>
> "replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
>   "clusters": "msksource,mskdest",
>   "source.cluster.alias": "msksource",
>   "target.cluster.alias": "mskdest",
>   "target.cluster.bootstrap.servers": "target.broker.address:9092",
>   "source.cluster.bootstrap.servers": "source.broker.address:9092",
>   "topics": "example-topic",
>   "tasks.max": "1",
>   "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
>   "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "replication.factor": "3",
>   "offset-syncs.topic.replication.factor": "3",
>   "sync.topic.acls.interval.seconds": "600",
>   "sync.topic.configs.interval.seconds": "600",
>   "refresh.topics.interval.seconds": "300",
>   "refresh.groups.interval.seconds": "20",
>   "producer.enable.idempotence":"true",
>   "consumer.group.id": "mm2-msc",
>   "source.cluster.max.poll.records" : "5",
>   "source.cluster.receive.buffer.bytes" : "33554432",
>   "source.cluster.send.buffer.bytes" : "33554432",
>   "source.cluster.max.partition.fetch.bytes" : "33554432",
>   "source.cluster.message.max.bytes" : "37755000",
>   "source.cluster.compression.type" : "gzip",
>   "source.cluster.max.request.size" : "26214400",
>   "source.cluster.buffer.memory" : "524288000",
>   "source.cluster.batch.size" : "524288",
>   "target.cluster.max.poll.records" : "2",
>   "target.cluster.receive.buffer.bytes" : "33554432",
>   "target.cluster.send.buffer.bytes" : "33554432",
>   "target.cluster.max.partition.fetch.bytes" : "33554432",
>   "target.cluster.message.max.bytes" : "37755000",
>   "target.cluster.compression.type" : "gzip",
>   "target.cluster.max.request.size" : "26214400",
>   "target.cluster.buffer.memory" : "524288000",
>   "target.cluster.batch.size" : "52428"
> }
>
>
>
>
> Test
>
> Kafka Connect Server
>
> Kafka Connect/ MM2 Version
>
> Offset Sync Location
>
> Source Cluster Version
>
> Target Cluster Ve

Re: Question on Kafka Connect worker offset reset config for internal topics

2025-08-14 Thread Greg Harris
Hi Imcom Jin,

Thanks for your question!

It is expected behavior that Connect's internal topics are read completely
from the beginning each time the worker starts, regardless of the
auto.offset.reset configuration [1].
This is because they are compacted topics, and the first message in the
topic may be necessary for correctness reasons. For example, if a worker
only reads from the latest offset of the status topic, it may not know the
status of long-running stable tasks.

If you want to reduce the startup time, I suggest reducing the segment
rolling configurations [2,3] for the internal topics. This will permit
Kafka to compact away the duplicate status messages sooner, preventing them
from being read on a future startup. This was previously reported [4] but
we have not yet changed the default.

I hope this helps,
Greg

[1]
https://github.com/apache/kafka/blob/c4fb1008c4856c8cf9594269c86323753e6860ce/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L274-L278
[2] https://kafka.apache.org/documentation/#topicconfigs_segment.bytes
[3] https://kafka.apache.org/documentation/#topicconfigs_segment.ms
[4] https://issues.apache.org/jira/browse/KAFKA-15086

On Thu, Aug 14, 2025 at 9:44 AM Imcom JIN  wrote:

> Hi dear Kafka team,
>
> I see that no matter what properties I give to the connector, the offset
> reset config for internal topics, especially the offset storage topic, say
> my-connect-offsets always use "earliest" which leads to very long bootstrap
> time during restart or stuck workers
>
> Log sample and config sample print in the log
>
> 2025-08-12 10:10:45,531 INFO [Consumer
> clientId=cbdhk04-data-cluster-offsets, groupId=cbdhk04-data-cluster]
> Seeking to earliest offset of partition
>
> root@cbd:/usr/local/nxg/docker/kafka-connect# docker logs
> connect-replication-8085 | grep "auto.offset.reset = earliest" -C2
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = earliest
>
> My connect-districuted.properties contains the following config
>
> producer.override.auto.offset.reset=latest
> consumer.override.auto.offset.reset=latest
> producer.auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> auto.offset.reset=latest
> connector.client.config.override.policy=All
>
> None of the above can change the behaviour of the consumer initialized by
> connect to consume internal topics.
>
> What's the expected behaviour? How to improve the bootstrap time for havey
> connect cluster?
> What properties should I use to change the consumer config if possible at
> all.
>
> Thanks in advance
>
> --
> *Imcom Jin*
> Software Engineer Manager, SEG
> T :  +8613552756336
>
> *NEXUSGUARD*
> www.nexusguard.com
> LinkedIn  • Twitter
>  • Facebook
> 
>
>
>
> Disclaimer: This e-mail message contains information intended solely for
> the intended recipient and is confidential or private in nature. If you are
> not the intended recipient, you must not read, disseminate, distribute,
> copy or otherwise use this message or any file attached to this message.
> Any such unauthorized use is prohibited and may be unlawful. If you have
> received this message in error, please notify the sender immediately by
> email, facsimile or telephone and then delete the original message from
> your machine.
>


Re: Increased latency when upgrading brokers to V4.0.0?

2025-08-21 Thread Greg Harris
Hi Daniel,

I don't have any changes in mind that might be affecting the latency
increase from a broker upgrade. Since you're using replication factor 1, it
shouldn't be related to the replication protocol or inter-broker network
latency.

Are you able to take a profile [1] of the leader broker and make some flame
graphs to compare 3.9.1 and 4.0.0? I hope that a 20x difference would be
visible on the Wall profile.

Thanks,
Greg

[1] https://github.com/async-profiler/async-profiler

On Thu, Aug 21, 2025 at 8:53 AM Daniel Germon
 wrote:

> Hi Greg,
>
> I have changed the linger.ms configuration on the producer and yes it
> explains the difference in client version latencies, but I am still seeing
> the broker latency differences with this configuration.
>
> Kafka broker 3.9.1, client 4.0.0, linger.ms = 0
>
> 50th percentile: 310us
> 90th percentile: 414us
> 99th percentile: 825us
> 99.9th percentile: 9813us
>
> Kafka broker 4.0.0, client 4.0.0, linger.ms = 0
>
> 50th percentile: 8193us
> 90th percentile: 10373us
> 99th percentile: 11109us
> 99.9th percentile: 13812us
>
>
> Regards,
> Dan
>
> This message and its attachments are confidential, may not be disclosed or
> used by any person other than the addressee and are intended only for the
> named recipient(s). If you are not the intended recipient, please notify
> the sender immediately and delete any copies of this message.
>
> LMAX Group is the holding company of LMAX Exchange, LMAX Global and LMAX
> Digital. Our registered address is Yellow Building, 1A Nicholas Road,
> London W11 4AN.
>


Re: Increased latency when upgrading brokers to V4.0.0?

2025-08-21 Thread Greg Harris
Hi all,

There were several configuration defaults that were changed with the
release of 4.0.0 [1], I wonder if one or more of them could be affecting
your observed end-to-end latency.
Notably, the linger.ms on the producer side has increased from 0ms to 5ms.
You may consider running 4.0.0 clients but with linger.ms set to 0 to see
if that resolves this latency regression.

Thanks,
Greg

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations

On Thu, Aug 21, 2025 at 3:40 AM Daniel Germon
 wrote:

> Hi Paul,
>
> Thank you for taking the time to reply! I have just ran the benchmarks
> using the v4.0.0 clients and if anything the latencies are actually worse
> than on client v3.9.1:
>
>
> Kafka broker 3.9.1 client 3.9.1
>
> 50th percentile: 297 us
> 90th percentile: 366 us
> 99th percentile: 528 us
> 99.9th percentile: 1,460 us
>
> Kafka broker 3.9.1 client 4.0.0
>
> 50th percentile: 3,295 us
> 90th percentile: 5,418 us
> 99th percentile: 5,712 us
> 99.9th percentile: 6,602 us
>
> Kafka broker 4.0.0 client 3.9.1
>
> 50th percentile:8,178 us
> 90th percentile: 10,355 us
> 99th percentile: 11,042 us
> 99.9th percentile: 13,638 us
>
> Kafka broker 4.0.0 client 4.0.0
>
> 50th percentile: 10,889 us
> 90th percentile: 13,914 us
> 99th percentile: 15,750 us
> 99.9th percentile: 18,167 us
>
> I did run the internal kafka-e2e-latency tool however did not see any
> noticeable difference, I believe this is because the tool is waiting for a
> record to be received before sending another and so overall the throughput
> would be lower than the setup I mention.
>
> Looking forward to see if you notice anything on your internal benchmarks.
>
> Regards,
> Dan
>
> This message and its attachments are confidential, may not be disclosed or
> used by any person other than the addressee and are intended only for the
> named recipient(s). If you are not the intended recipient, please notify
> the sender immediately and delete any copies of this message.
>
> LMAX Group is the holding company of LMAX Exchange, LMAX Global and LMAX
> Digital. Our registered address is Yellow Building, 1A Nicholas Road,
> London W11 4AN.
>


Re: MirrorMaker2: Offset semantics and message replication when switching consumers

2025-09-12 Thread Greg Harris
Hi Kota,

Thanks for your questions!

> - Are there any circumstances where the target (MSK) offsets could be
>  overwritten by smaller source offsets while consumers are already running
>  on MSK?
>   - I assume that this kind of behavior is not intended by design.

No, as soon as you start the first consumer on the target cluster, MM2
offset syncing will stop for that group and offsets can only be changed by
the target consumers. This is a property enforced by the consumer group
coordinator, not a MM2 feature specifically.

However, it is extremely likely that the target consumer will get some
messages that already had offsets committed by the source consumers, even
if you stop the source consumers first. Your consumer application should be
able to tolerate this re-delivery if you want to do a live/staged migration
(consumers before producers) like you described.

> - Will new messages written to onpre-kafka continue to replicate to MSK
>   via MM2, and remain readable by consumers from MSK?

Yes, the MirrorSourceConnector will continue mirroring the topic. Consumers
on the target cluster will be able to read the mirrored messages and commit
offsets to the target cluster.

Hope this helps,
Greg

On Thu, Sep 11, 2025 at 12:50 PM Kota Yagi  wrote:

> Summary
> I'm migrating from an on-prem Apache Kafka cluster (“onpre-kafka”) to an
> AWS MSK cluster using MirrorMaker 2 (MM2).
> After enabling one-way replication (onpre-kafka → MSK), I plan to cut
> consumers over to MSK while producers still write to onpre-kafka.
> I’d like to confirm the exact offset semantics.
>
> Topology
>
>- Source: onpre-kafka
>- Target: MSK
>- Replication: MM2 (MirrorSourceConnector + MirrorHeartbeatConnector +
>MirrorCheckpointConnector)
>- Direction: source → target (one-way)
>- Producers: continue writing to onpre-kafka
>- Consumers: switch from onpre-kafka to MSK after offset translation
>
> Scenario
>
>1. MM2 replicates topics from onpre-kafka to MSK.
>2. I start consuming from MSK (consumers move), while producers still
>write to onpre-kafka.
>3. MSK consumer offsets advance (e.g., 100 → 102). onpre-kafka consumer
>group offsets remain at 100.
>
>
> Questions
>
>- Are there any circumstances where the target (MSK) offsets could be
>overwritten by smaller source offsets while consumers are already
> running
>on MSK?
>   - I assume that this kind of behavior is not intended by design.
>- Will new messages written to onpre-kafka continue to replicate to MSK
>via MM2, and remain readable by consumers from MSK?
>   - I assume that consumers can keep consuming new messages from MSK
>   that are replicated via MM2.
>
>
> Thanks in advance!
>