[jira] [Created] (KAFKA-16201) Kafka exception - org.apache.kafka.common.errors.NotLeaderOrFollowerException

2024-01-29 Thread Yogesh (Jira)
Yogesh created KAFKA-16201:
--

 Summary: Kafka exception - 
org.apache.kafka.common.errors.NotLeaderOrFollowerException
 Key: KAFKA-16201
 URL: https://issues.apache.org/jira/browse/KAFKA-16201
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.6.1
 Environment: AWS EKS
Reporter: Yogesh


I am deploying Kafka inside Kubernetes cluster in HA mode (multiple brokers). 
The deployment consists of

Kubernetes
Kafka 3.6.1
Refer to the following files used in the deployment

Dockerfile

 
{code:java}
FROM eclipse-temurin:17.0.9_9-jdk-jammy

ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin

LABEL name="kafka" version=${KAFKA_VERSION}

RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz 
https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
 \
 && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
 && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
 && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
 && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz

COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"] {code}
 

 

entrypoint.sh

 
{code:java}
#!/bin/bash

NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"

ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"

CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
if [[ $i != $REPLICAS ]]; then

CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done

mkdir -p $SHARE_DIR/$NODE_ID

if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi

sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e 
"s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+"
 \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties

JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"

echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> 
/opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >> 
/opt/kafka/config/kraft/server.properties
echo -e 
"\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"
 >> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >> 
/opt/kafka/config/kraft/server.properties

kafka-storage.sh format -t $CLUSTER_ID -c 
/opt/kafka/config/kraft/server.properties

exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties {code}
 

 

Kafka.yaml
{code:java}
apiVersion: v1
kind: Namespace
metadata:
  name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-volume
  labels:
type: local
spec:
  storageClassName: manual
  capacity:
storage: 10Gi
  accessModes:
- ReadWriteOnce
  hostPath:
path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-pv-claim
  namespace: kafka-kraft
spec:
  storageClassName: manual
  accessModes:
- ReadWriteOnce
  resources:
requests:
  storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
app: kafka-app
  namespace: kafka-kraft
spec:
  clusterIP: None
  ports:
- name: '9092'
  port: 9092
  protocol: TCP
  targetPort: 9092
  selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
app: kafka-app
  namespace: kafka-kraft
spec:
  serviceName: kafka-svc
  replicas: 5
  selector:
matchLabels:
  app: kafka-app
  template:
metadata:
  labels:
app: kafka-app
spec:
  volumes:
- name: kafka-storage
  persistentVolumeClaim:
claimName: kafka-pv-claim
  containers:
- name: kafka-container
  image: myimage/kafka-kraft:1.0
  ports:
- containerPort: 9092
- containerPort: 9093
  env:
- name: REPLICAS
  value: '5'
- name: SERVICE
  value: kafka-svc
- name: NAMESPACE
  value: kafka-kraft
- name: SHARE_DIR
  value: /mnt/kafka

[jira] [Created] (KAFKA-16202) Extra dot in error message in producer

2024-01-29 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16202:
--

 Summary: Extra dot in error message in producer
 Key: KAFKA-16202
 URL: https://issues.apache.org/jira/browse/KAFKA-16202
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison


If the broker hits a StorageException while handling a record from the 
producer, the producer prints the following warning:

[2024-01-29 15:33:30,722] WARN [Producer clientId=console-producer] Received 
invalid metadata error in produce request on partition topic1-0 due to 
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to 
access log file on the disk.. Going to request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender)

There's an extra dot between disk and Going.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16203) AutoCommit of empty offsets blocks following requests due to inflight flag

2024-01-29 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16203:
--

 Summary: AutoCommit of empty offsets blocks following requests due 
to inflight flag
 Key: KAFKA-16203
 URL: https://issues.apache.org/jira/browse/KAFKA-16203
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


The logic for auto-committing offsets completes without generating a request, 
but mistakenly leaves the inflight request flag on. This makes that following 
auto-commits won't generate requests, even if offsets have been consumed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type

2024-01-29 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16095.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Update list group state type filter to include the states for the new 
> consumer group type
> -
>
> Key: KAFKA-16095
> URL: https://issues.apache.org/jira/browse/KAFKA-16095
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Lan Ding
>Priority: Minor
> Fix For: 3.8.0
>
>
> # While using *—list —state* the current accepted values correspond to the 
> classic group type states. We need to include support for the new group type 
> states.
>  ## Consumer Group: Should list the state of the group. Accepted Values: 
>  ### _UNKNOWN(“unknown”)_
>  ### {_}EMPTY{_}("empty"),
>  ### *{_}ASSIGNING{_}("assigning"),*
>  ### *{_}RECONCILING{_}("reconciling"),*
>  ### {_}STABLE{_}("stable"),
>  ### {_}DEAD{_}("dead");
>  # 
>  ## Classic Group : Should list the state of the group. Accepted Values: 
>  ### {_}UNKNOWN{_}("Unknown"),
>  ### {_}EMPTY{_}("Empty");
>  ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),*
>  ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),*
>  ### {_}STABLE{_}("Stable"),
>  ### {_}DEAD{_}("Dead")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16204) Stray file core/00000000000000000001.snapshot created when running core tests

2024-01-29 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16204:
--

 Summary: Stray file core/0001.snapshot created 
when running core tests
 Key: KAFKA-16204
 URL: https://issues.apache.org/jira/browse/KAFKA-16204
 Project: Kafka
  Issue Type: Improvement
  Components: core, unit tests
Reporter: Mickael Maison


When running the core tests I often get a file called 
core/0001.snapshot created in my kafka folder. It looks like 
one of the test does not clean its resources properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] 3.7.0 RC2

2024-01-29 Thread Omnia Ibrahim
Hi Stan and Gaurav, 
Just to clarify some points mentioned here before 
 KAFKA-14616: I raised a year ago so it's not related to JBOD work. It is 
rather a blocker bug for KRAFT in general. The PR from Colin should fix this. 
Am not sure if it is a blocker for 3.7 per-say as it was a major bug since 3.3 
and got missed from all other releases.
 
Regarding the JBOD's work: 
KAFKA-16082:  Is not a blocker for 3.7 instead it's nice fix. The pr 
https://github.com/apache/kafka/pull/15136 is quite a small one and was 
approved by Proven and I but it is waiting for a committer's approval.
KAFKA-16162: This is a blocker for 3.7.  Same it’s a small pr 
https://github.com/apache/kafka/pull/15270 and it is approved Proven and I and 
the PR is waiting for committer's approval. 
KAFKA-16157: This is a blocker for 3.7. There is one small suggestion for the 
pr https://github.com/apache/kafka/pull/15263 but I don't think any of the 
current feedback is blocking the pr from getting approved. Assuming we get a 
committer's approval on it. 
KAFKA-16195:  Same it's a blocker but it has approval from Proven and I and we 
are waiting for committer's approval on the pr 
https://github.com/apache/kafka/pull/15262. 

If we can’t get a committer approval for KAFKA-16162, KAFKA-16157 and 
KAFKA-16195  in time for 3.7 then we can mark JBOD as early release assuming we 
merge at least KAFKA-16195.

Regards, 
Omnia

> On 26 Jan 2024, at 15:39, ka...@gnarula.com wrote:
> 
> Apologies, I duplicated KAFKA-16157 twice in my previous message. I intended 
> to mention KAFKA-16195
> with the PR at https://github.com/apache/kafka/pull/15262 as the second JIRA.
> 
> Thanks,
> Gaurav
> 
>> On 26 Jan 2024, at 15:34, ka...@gnarula.com wrote:
>> 
>> Hi Stan,
>> 
>> I wanted to share some updates about the bugs you shared earlier.
>> 
>> - KAFKA-14616: I've reviewed and tested the PR from Colin and have observed
>> the fix works as intended.
>> - KAFKA-16162: I reviewed Proven's PR and found some gaps in the proposed 
>> fix. I've
>> therefore raised https://github.com/apache/kafka/pull/15270 following a 
>> discussion with Luke in JIRA.
>> - KAFKA-16082: I don't think this is marked as a blocker anymore. I'm 
>> awaiting
>> feedback/reviews at https://github.com/apache/kafka/pull/15136
>> 
>> In addition to the above, there are 2 JIRAs I'd like to bring everyone's 
>> attention to:
>> 
>> - KAFKA-16157: This is similar to KAFKA-14616 and is marked as a blocker. 
>> I've raised
>> https://github.com/apache/kafka/pull/15263 and am awaiting reviews on it.
>> - KAFKA-16157: I raised this yesterday and have addressed feedback from 
>> Luke. This should
>> hopefully get merged soon.
>> 
>> Regards,
>> Gaurav
>> 
>> 
>>> On 24 Jan 2024, at 11:51, ka...@gnarula.com wrote:
>>> 
>>> Hi Stanislav,
>>> 
>>> Thanks for bringing these JIRAs/PRs up.
>>> 
>>> I'll be testing the open PRs for KAFKA-14616 and KAFKA-16162 this week and 
>>> I hope to have some feedback
>>> by Friday. I gather the latter JIRA is marked as a WIP by Proven and he's 
>>> away. I'll try to build on his work in the meantime.
>>> 
>>> As for KAFKA-16082, we haven't been able to deduce a data loss scenario. 
>>> There's a PR open
>>> by me for promoting an abandoned future replica with approvals from Omnia 
>>> and Proven,
>>> so I'd appreciate a committer reviewing it.
>>> 
>>> Regards,
>>> Gaurav
>>> 
>>> On 23 Jan 2024, at 20:17, Stanislav Kozlovski 
>>>  wrote:
 
 Hey all, I figured I'd give an update about what known blockers we have
 right now:
 
 - KAFKA-16101: KRaft migration rollback documentation is incorrect -
 https://github.com/apache/kafka/pull/15193; This need not block RC
 creation, but we need the docs updated so that people can test properly
 - KAFKA-14616: Topic recreation with offline broker causes permanent URPs -
 https://github.com/apache/kafka/pull/15230 ; I am of the understanding that
 this is blocking JBOD for 3.7
 - KAFKA-16162: New created topics are unavailable after upgrading to 3.7 -
 a strict blocker with an open PR https://github.com/apache/kafka/pull/15232
 - although I understand Proveen is out of office
 - KAFKA-16082: JBOD: Possible dataloss when moving leader partition - I am
 hearing mixed opinions on whether this is a blocker (
 https://github.com/apache/kafka/pull/15136)
 
 Given that there are 3 JBOD blocker bugs, and I am not confident they will
 all be merged this week - I am on the edge of voting to revert JBOD from
 this release, or mark it early access.
 
 By all accounts, it seems that if we keep with JBOD the release will have
 to spill into February, which is a month extra from the time-based release
 plan we had of start of January.
 
 Can I ask others for an opinion?
 
 Best,
 Stan
 
 On Thu, Jan 18, 2024 at 1:21 PM Luke Chen  wrote:
 
> Hi all,
> 
> I think I've found another blocker issue: KAFKA-16

Re: [VOTE] KIP-974: Docker Image for GraalVM based Native Kafka Broker

2024-01-29 Thread Krishna Agarwal
Hi,
A gentle nudge to cast your vote on the KIP.
Your input is highly valued.

Regards,
Krishna


On Mon, Nov 20, 2023 at 11:53 AM Krishna Agarwal <
krishna0608agar...@gmail.com> wrote:

> Hi,
> I'd like to call a vote on KIP-974 which aims to publish a docker image
> for GraalVM based Native Kafka Broker.
>
> KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker
>
> Discussion thread -
> https://lists.apache.org/thread/98wnx4w92fqj5wymkqlqyjsvzxz277hk
>
> Regards,
> Krishna
>


[jira] [Created] (KAFKA-16205) Reduce number of metadata requests during hybrid mode

2024-01-29 Thread David Arthur (Jira)
David Arthur created KAFKA-16205:


 Summary: Reduce number of metadata requests during hybrid mode
 Key: KAFKA-16205
 URL: https://issues.apache.org/jira/browse/KAFKA-16205
 Project: Kafka
  Issue Type: Improvement
  Components: controller, kraft
Affects Versions: 3.6.0, 3.5.0, 3.4.0, 3.7.0
Reporter: David Arthur


When migrating a cluster with a high number of brokers and partitions, it is 
possible for the controller channel manager queue to get backed up. This can 
happen when many small RPCs are generated in response to several small 
MetadataDeltas being handled MigrationPropagator.

 

In the ZK controller, various optimizations have been made over the years to 
reduce the number of UMR and LISR sent during controlled shutdown or other 
large metadata events. For the ZK to KRaft migration, we use the MetadataLoader 
infrastructure to learn about and propagate metadata to ZK brokers.

 

We need to improve the batching in MigrationPropagator to avoid performance 
issues during the migration of large clusters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16206) ZkConfigMigrationClient tries to delete topic configs twice

2024-01-29 Thread David Arthur (Jira)
David Arthur created KAFKA-16206:


 Summary: ZkConfigMigrationClient tries to delete topic configs 
twice
 Key: KAFKA-16206
 URL: https://issues.apache.org/jira/browse/KAFKA-16206
 Project: Kafka
  Issue Type: Bug
  Components: migration, kraft
Reporter: David Arthur


When deleting a topic, we see spurious ERROR logs from 
kafka.zk.migration.ZkConfigMigrationClient:
 
{code:java}
Did not delete ConfigResource(type=TOPIC, name='xxx') since the node did not 
exist. {code}

This seems to happen because ZkTopicMigrationClient#deleteTopic is deleting the 
topic, partitions, and config ZNodes in one shot. Subsequent calls from 
KRaftMigrationZkWriter to delete the config encounter a NO_NODE since the ZNode 
is already gone.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1014: Managing Unstable Metadata Versions in Apache Kafka

2024-01-29 Thread Colin McCabe
On Tue, Jan 23, 2024, at 11:21, Jun Rao wrote:
> Hi, Proven,
>
> Thanks for the KIP.
>
> I am not sure about the reordering approach proposed in the KIP. Let's say
> in a release we have features X and Y, depending on MV IV1 and IV2,
> respectively. At the release time, feature Y is ready, but X is not. I
> guess the proposal is to move IV1 to a new MV IV3?

Hi Jun,

In your example, if X is not ready, it should be moved into an unstable MV. 
Then if Y is ready, it can be moved into a stable MV and we can advance the 
last stable MV.

> The issue is that IV2
> could have made changes on top of IV1. For example, IV2 could have evolved
> the schema of the same inter broker request as IV1. In that case, what does
> IV3 represent? We can't simply take the changes associated with IV1 since
> it could have conflicts with IV2.

If Y changed the same file as X, we can just edit the file so that Y's change 
come first.

Nobody using a stable MV should be affected, since they will be generating 
records on an older (stable) version.

> Even when there are no conflicts, I am not sure if the approach supports
> the trunk deployment model. Let's say we move two unstable MV IV3 and IV4
> to IV7 and IV8. If someone deployed code corresponding to IV4 and later
> upgraded the code to IV7, he/she (1) wouldn't be able to set IV4 in the
> upgrade code since it no longer exists and (2) would be surprised that the
> request protocol that worked under IV4 doesn't work now.
>

Upgrades are not supported if you are running an unstable MV. It's intended 
only for testing.

Colin

>
> Thanks,
>
> Jun
>
> On Fri, Jan 19, 2024 at 2:13 PM Artem Livshits
>  wrote:
>
>> Hi Colin,
>>
>> >  I think feature flags are somewhat orthogonal to the stable / unstable
>> discussion
>>
>> I think feature flags can be used as an alternative to achieve similar
>> results as stable / unstable functionality.  As well as long-lived feature
>> branches.  In my experience, I've seen feature flags to be more successful
>> than feature branches for changes of existing functionality.  I also think
>> that stable / unstable MV would work better than feature branches. I just
>> wanted to mention it for completeness, not sure if we should start a full
>> fledged discussion on these topics.
>>
>> > I'm struggling a bit with your phrasing. Are you suggesting that unstable
>> MVs would not be able to be in trunk?
>>
>> Unstable MV should be in trunk.  The wording is related to when we can
>> promote "unstable" to "stable".
>>
>> -Artem
>>
>>
>> On Mon, Jan 15, 2024 at 10:03 PM Colin McCabe  wrote:
>>
>> > On Fri, Jan 12, 2024, at 11:32, Artem Livshits wrote:
>> > > I think using feature flags (whether we support a framework and tooling
>> > for
>> > > feature flags or just an ad-hoc XyzEnabled flag) can be an alternative
>> to
>> > > this KIP.  I think the value of this KIP is that it's trying to
>> propose a
>> > > systemic approach for gating functionality that may take multiple
>> > releases
>> > > to develop.  A problem with ad-hoc feature flags is that it's useful
>> > during
>> > > feature development, so that people who are working on this feature (or
>> > are
>> > > interested in beta-testing the feature) can get early access (without
>> any
>> > > guarantees on compatibility or even correctness); but then the feature
>> > > flags often stick forever after the feature development is done (and as
>> > > time moves one, the new code is written in such a way that it's not
>> > > possible to turn the feature off any more cleanly).
>> > >
>> >
>> > Hi Artem,
>> >
>> > I think feature flags are somewhat orthogonal to the stable / unstable
>> > discussion. Even if every new feature was a feature flag, you probably
>> > still wouldn't want to stabilize the features immediately, to avoid
>> > maintaining a lot of alpha stuff forever.
>> >
>> > (I also think that feature flags should be used sparingly, if at all,
>> > because of the way that they exponentially increase the test matrix. But
>> > that's a tangent, I think, given the first point...)
>> >
>> > >
>> > > I'd also clarify how I think about "stable".  Ismael made a comment "
>> > > something is stable in the "this is battle-tested" sense.".  I don't
>> > think
>> > > it has to be "battle-tested", it just has to meet the bar of being in
>> the
>> > > trunk.  Again, thinking of a small single-commit feature -- to commit
>> to
>> > > trunk, the feature doesn't have to be "battle-tested", but it should be
>> > > complete (and not just a bunch of TODOs), with tests written and some
>> > level
>> > > of dev-testing done, so that once the release is cut, we can find and
>> fix
>> > > bugs and make it release-quality (as opposed to reverting the whole
>> > > thing).  We can apply the same "stability" bar for features to be in
>> the
>> > > stable MV -- fully complete, tests written and some level of dev
>> testing
>> > > done.
>> > >
>> >
>> > I'm struggling a bit with your phrasing. Are you suggesting that unstabl

Re: [DISCUSS] KIP-1014: Managing Unstable Metadata Versions in Apache Kafka

2024-01-29 Thread Jun Rao
Hi, Colin,

Thanks for the reply.

"If Y changed the same file as X, we can just edit the file so that Y's
change come first."

It seems the effort is more than just editing the file. One has to change
all the logic around the old IV. Also, what happens to the client? A client
may have implemented a version for a request corresponding to an unstable
feature. When the unstable feature is re-ordered, do all clients (including
those non-java ones) need to change the PRC implementation for the existing
underlying requests?

Jun

On Mon, Jan 29, 2024 at 9:57 AM Colin McCabe  wrote:

> On Tue, Jan 23, 2024, at 11:21, Jun Rao wrote:
> > Hi, Proven,
> >
> > Thanks for the KIP.
> >
> > I am not sure about the reordering approach proposed in the KIP. Let's
> say
> > in a release we have features X and Y, depending on MV IV1 and IV2,
> > respectively. At the release time, feature Y is ready, but X is not. I
> > guess the proposal is to move IV1 to a new MV IV3?
>
> Hi Jun,
>
> In your example, if X is not ready, it should be moved into an unstable
> MV. Then if Y is ready, it can be moved into a stable MV and we can advance
> the last stable MV.
>
> > The issue is that IV2
> > could have made changes on top of IV1. For example, IV2 could have
> evolved
> > the schema of the same inter broker request as IV1. In that case, what
> does
> > IV3 represent? We can't simply take the changes associated with IV1 since
> > it could have conflicts with IV2.
>
> If Y changed the same file as X, we can just edit the file so that Y's
> change come first.
>
> Nobody using a stable MV should be affected, since they will be generating
> records on an older (stable) version.
>
> > Even when there are no conflicts, I am not sure if the approach supports
> > the trunk deployment model. Let's say we move two unstable MV IV3 and IV4
> > to IV7 and IV8. If someone deployed code corresponding to IV4 and later
> > upgraded the code to IV7, he/she (1) wouldn't be able to set IV4 in the
> > upgrade code since it no longer exists and (2) would be surprised that
> the
> > request protocol that worked under IV4 doesn't work now.
> >
>
> Upgrades are not supported if you are running an unstable MV. It's
> intended only for testing.
>
> Colin
>
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jan 19, 2024 at 2:13 PM Artem Livshits
> >  wrote:
> >
> >> Hi Colin,
> >>
> >> >  I think feature flags are somewhat orthogonal to the stable /
> unstable
> >> discussion
> >>
> >> I think feature flags can be used as an alternative to achieve similar
> >> results as stable / unstable functionality.  As well as long-lived
> feature
> >> branches.  In my experience, I've seen feature flags to be more
> successful
> >> than feature branches for changes of existing functionality.  I also
> think
> >> that stable / unstable MV would work better than feature branches. I
> just
> >> wanted to mention it for completeness, not sure if we should start a
> full
> >> fledged discussion on these topics.
> >>
> >> > I'm struggling a bit with your phrasing. Are you suggesting that
> unstable
> >> MVs would not be able to be in trunk?
> >>
> >> Unstable MV should be in trunk.  The wording is related to when we can
> >> promote "unstable" to "stable".
> >>
> >> -Artem
> >>
> >>
> >> On Mon, Jan 15, 2024 at 10:03 PM Colin McCabe 
> wrote:
> >>
> >> > On Fri, Jan 12, 2024, at 11:32, Artem Livshits wrote:
> >> > > I think using feature flags (whether we support a framework and
> tooling
> >> > for
> >> > > feature flags or just an ad-hoc XyzEnabled flag) can be an
> alternative
> >> to
> >> > > this KIP.  I think the value of this KIP is that it's trying to
> >> propose a
> >> > > systemic approach for gating functionality that may take multiple
> >> > releases
> >> > > to develop.  A problem with ad-hoc feature flags is that it's useful
> >> > during
> >> > > feature development, so that people who are working on this feature
> (or
> >> > are
> >> > > interested in beta-testing the feature) can get early access
> (without
> >> any
> >> > > guarantees on compatibility or even correctness); but then the
> feature
> >> > > flags often stick forever after the feature development is done
> (and as
> >> > > time moves one, the new code is written in such a way that it's not
> >> > > possible to turn the feature off any more cleanly).
> >> > >
> >> >
> >> > Hi Artem,
> >> >
> >> > I think feature flags are somewhat orthogonal to the stable / unstable
> >> > discussion. Even if every new feature was a feature flag, you probably
> >> > still wouldn't want to stabilize the features immediately, to avoid
> >> > maintaining a lot of alpha stuff forever.
> >> >
> >> > (I also think that feature flags should be used sparingly, if at all,
> >> > because of the way that they exponentially increase the test matrix.
> But
> >> > that's a tangent, I think, given the first point...)
> >> >
> >> > >
> >> > > I'd also clarify how I think about "stable".  Ismael made a comment
> "
> >> > > something is s

[jira] [Created] (KAFKA-16207) Implement KRaft internal listener for KRaftVersionRecord and VotersRecord

2024-01-29 Thread Jira
José Armando García Sancio created KAFKA-16207:
--

 Summary: Implement KRaft internal listener for KRaftVersionRecord 
and VotersRecord
 Key: KAFKA-16207
 URL: https://issues.apache.org/jira/browse/KAFKA-16207
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-853: KRaft Voters Change

2024-01-29 Thread José Armando García Sancio
Hi all,

This DISCUSS thread was continue in a new thread at:

[DISCUSS] KIP-853: KRaft Controller Membership Changes:
https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46

Thanks!
-- 
-José


[jira] [Created] (KAFKA-16208) Design new Consumer timeout policy

2024-01-29 Thread Kirk True (Jira)
Kirk True created KAFKA-16208:
-

 Summary: Design new Consumer timeout policy
 Key: KAFKA-16208
 URL: https://issues.apache.org/jira/browse/KAFKA-16208
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, documentation
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


This task is to design and document the timeout policy for the new Consumer 
implementation.

The documentation lives here: 
https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-29 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20. So for the dual-write recipe, we should always call
InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
change the following in the example to use InitProducerId(true) instead?
1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
Also, could Flink just follow the dual-write recipe? It's simpler if there
is one way to solve the 2pc issue.

21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
Integer.MAX_VALUE?

24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
request to use the ongoing pid. With 2pc, the coordinator now expects the
endTxn request to use the next pid. So, the flow is different, right?

25. "We send out markers using the original ongoing transaction ProducerId
and ProducerEpoch"
We should use ProducerEpoch + 1 in the marker, right?

Jun

On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
 wrote:

> Hi Jun,
>
> > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
>
> keepPreparedTxn=true informs the transaction coordinator that it should
> keep the ongoing transaction, if any.  If the keepPreparedTxn=false, then
> any ongoing transaction is aborted (this is exactly the current behavior).
> enable2Pc is a separate argument that is controlled by the
> *transaction.two.phase.commit.enable *setting on the client.
>
> To start 2PC, the client just needs to set
> *transaction.two.phase.commit.enable*=true in the config.  Then if the
> client knows the status of the transaction upfront (in the case of Flink,
> Flink keeps the knowledge if the transaction is prepared in its own store,
> so it always knows upfront), it can set keepPreparedTxn accordingly, then
> if the transaction was prepared, it'll be ready for the client to complete
> the appropriate action; if the client doesn't have a knowledge that the
> transaction is prepared, keepPreparedTxn is going to be false, in which
> case we'll get to a clean state (the same way we do today).
>
> For the dual-write recipe, the client doesn't know upfront if the
> transaction is prepared, this information is implicitly encoded
> PreparedTxnState value that can be used to resolve the transaction state.
> In that case, keepPreparedTxn should always be true, because we don't know
> upfront and we don't want to accidentally abort a committed transaction.
>
> The forceTerminateTransaction call can just use keepPreparedTxn=false, it
> actually doesn't matter if it sets Enable2Pc flag.
>
> > 21. TransactionLogValue: Do we need some field to identify whether this
> is written for 2PC so that ongoing txn is never auto aborted?
>
> The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
> enabled.  I've added a note to the KIP about this.
>
> > 22
>
> You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
> ProducerId=73, ProducerEpoch=MAX).
>
> > 23. It's a bit weird that Enable2Pc is driven by a config while
> KeepPreparedTxn is from an API param ...
>
> The intent to use 2PC doesn't change from transaction to transaction, but
> the intent to keep prepared txn may change from transaction to
> transaction.  In dual-write recipes the distinction is not clear, but for
> use cases where keepPreparedTxn value is known upfront (e.g. Flink) it's
> more prominent.  E.g. a Flink's Kafka sink operator could be deployed with
> *transaction.two.phase.commit.enable*=true hardcoded in the image, but
> keepPreparedTxn cannot be hardcoded in the image, because it depends on the
> job manager's state.
>
> > 24
>
> The flow is actually going to be the same way as it is now -- the "main"
> producer id + epoch needs to be used in all operations to prevent fencing
> (it's sort of a common "header" in all RPC calls that follow the same
> rules).  The ongoing txn info is just additional info for making a commit /
> abort decision based on the PreparedTxnState from the DB.
>
> --Artem
>
> On Thu, Jan 25, 2024 at 11:05 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply. A few more comments.
> >
> > 20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I
> got
> > the following (1) to start 2pc, we call
> > InitProducerId(keepPreparedTxn=false); (2) when the producer fails and
> > needs to do recovery, it calls InitProducerId(keepPreparedTxn=true); (3)
> > Admin.forceTerminateTransaction() calls
> > InitProducerId(keepPreparedTxn=false).
> > 20.1 In (1), when a producer calls InitProducerId(false) with 2pc
> enabled,
> > and there is an ongoing txn, should the server return an error to the
> > InitProducerId request? If so, what would be the error code?
> > 20.2 How do we distinguish between (1) and (3)? It's the same API call
> but
> > (1) doesn't abort ongoing txn and (2) does.
> > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the ongoing
> > txn. So, set

[jira] [Created] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16209:
-

 Summary: fetchSnapshot might return null if topic is created 
before v2.8
 Key: KAFKA-16209
 URL: https://issues.apache.org/jira/browse/KAFKA-16209
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.1
Reporter: Luke Chen


Remote log manager will fetch snapshot via ProducerStateManager 
[here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
 but the snapshot map might get nothing if the topic has no snapshot created, 
ex: topics before v2.8. Need to fix it to avoid NPE.

old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.7 #77

2024-01-29 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-1018: Introduce max remote fetch timeout config

2024-01-29 Thread Kamal Chandraprakash
Hi all,

I have opened a KIP-1018

to introduce dynamic max-remote-fetch-timeout broker config to give more
control to the operator.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests

Let me know if you have any feedback or suggestions.

--
Kamal


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.7 #78

2024-01-29 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 337851 lines...]
[2024-01-30T06:58:52.223Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testZooKeeperSessionStateMetric() STARTED
[2024-01-30T06:58:52.223Z] 
[2024-01-30T06:58:52.223Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testZooKeeperSessionStateMetric() PASSED
[2024-01-30T06:58:52.223Z] 
[2024-01-30T06:58:52.223Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testExceptionInBeforeInitializingSession() STARTED
[2024-01-30T06:58:53.322Z] 
[2024-01-30T06:58:53.322Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testExceptionInBeforeInitializingSession() PASSED
[2024-01-30T06:58:53.322Z] 
[2024-01-30T06:58:53.322Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetChildrenExistingZNode() STARTED
[2024-01-30T06:58:53.322Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetChildrenExistingZNode() PASSED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnection() STARTED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnection() PASSED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testZNodeChangeHandlerForCreation() STARTED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testZNodeChangeHandlerForCreation() PASSED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetAclExistingZNode() STARTED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetAclExistingZNode() PASSED
[2024-01-30T06:58:53.323Z] 
[2024-01-30T06:58:53.323Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testSessionExpiryDuringClose() STARTED
[2024-01-30T06:58:54.422Z] 
[2024-01-30T06:58:54.422Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testSessionExpiryDuringClose() PASSED
[2024-01-30T06:58:54.422Z] 
[2024-01-30T06:58:54.422Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testReinitializeAfterAuthFailure() STARTED
[2024-01-30T06:58:56.503Z] 
[2024-01-30T06:58:56.503Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testReinitializeAfterAuthFailure() PASSED
[2024-01-30T06:58:56.503Z] 
[2024-01-30T06:58:56.503Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testSetAclNonExistentZNode() STARTED
[2024-01-30T06:58:56.503Z] 
[2024-01-30T06:58:56.503Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testSetAclNonExistentZNode() PASSED
[2024-01-30T06:58:56.503Z] 
[2024-01-30T06:58:56.503Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnectionLossRequestTermination() STARTED
[2024-01-30T06:59:06.242Z] 
[2024-01-30T06:59:06.242Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnectionLossRequestTermination() PASSED
[2024-01-30T06:59:06.242Z] 
[2024-01-30T06:59:06.242Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testExistsNonExistentZNode() STARTED
[2024-01-30T06:59:06.242Z] 
[2024-01-30T06:59:06.242Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testExistsNonExistentZNode() PASSED
[2024-01-30T06:59:06.242Z] 
[2024-01-30T06:59:06.242Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetDataNonExistentZNode() STARTED
[2024-01-30T06:59:07.342Z] 
[2024-01-30T06:59:07.342Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testGetDataNonExistentZNode() PASSED
[2024-01-30T06:59:07.342Z] 
[2024-01-30T06:59:07.342Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnectionTimeout() STARTED
[2024-01-30T06:59:09.423Z] 
[2024-01-30T06:59:09.423Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testConnectionTimeout() PASSED
[2024-01-30T06:59:09.423Z] 
[2024-01-30T06:59:09.423Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testBlockOnRequestCompletionFromStateChangeHandler() 
STARTED
[2024-01-30T06:59:09.423Z] 
[2024-01-30T06:59:09.423Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZooKeeperClientTest > testBlockOnRequestCompletionFromStateChangeHandler() 
PASSED
[2024-0