Re: Jira Access

2022-08-07 Thread Mickael Maison
Hi Madhu,

Done! Thanks for your interest in Apache Kafka.

Mickael

On Sat, Aug 6, 2022 at 7:26 PM Madhusudhan Reddy Vennapusa
 wrote:
>
> Hi Team,
>
> Please add me to the contributor list, my Jira account username: *sudhan499*
>
> Regards,
> Madhu


Re: [DISCISS] KIP-860: Add client-provided option to guard against unintentional replication factor change during partition reassignments

2022-08-07 Thread Stanislav Kozlovski
Thank you for the reviews.

Vikas,
> > In the case of an already-reassigning partition being reassigned again,
the validation compares the targetReplicaSet size of the reassignment to
the targetReplicaSet size of the new reassignment and throws if those
differ.
> Can you add more detail to this, or clarify what is targetReplicaSet (for
e.g. why not sourceReplicaSet?) and how the target replica set will be
calculated?
If a reassignment is ongoing, such that [1,2,3] => [4,5,6] (the replica set
in Kafka will be [1,2,3,4,5,6] during the reassignment), and you try to
issue a new reassignment (e.g [7,8,9], Kafka should NOT think that the RF
of the partition is 6 just because a reassignment is ongoing. Hence, we
compare [4,5,6]'s length to [7,8,9]
The targetReplicaSet is a term we use in KIP-455
.
It means the desired replica set that a given reassignment is trying to
achieve. Here we compare said set of the on-going reassignment to the new
reassignment.

Note this can happen during an RF change too. e.g [1,2,3] => [4,5,6,7] (RF
change, intermediate set is [1,2,3,4,5,6,7]), and we try to do a
reassignment to [9,10,11], the logic will compare [4,5,6,7] to [9,10,11].
In such a situation where one wants to cancel the RF increase and reassign
again, one first needs to cancel the existing reassignment via the API (no
special action required despite RF change)


> And what about the reassign partitions CLI? Do we want to expose the
option there too?
Yes, this is already present in the KIP if I'm not mistaken. We describe it
in "Accordingly, the kafka-reassign-partitions.sh tool will be updated to
allow supplying the new option:"
I have edited the KIP to contain two clear paragraphs called Admin API and
CLI now.

Colin,

>  it would be nice for the first paragraph to be a bit more explicit about
this goal.
sounds good, updated it with that suggestion.

> client-side forward compatibility
I was under the assumption that it is not recommended to upgrade clients
before brokers, but a quick search cleared it up to me that we're pretty
intentional about allowing that

.
Do you happen to know if we have any policy on client-side forward
compatibility with regard to such things -- extending "write" APIs (that
mutate the state) with fields that conditionally limit that modification?
It seems like a rare use case to me, hence renaming it to something like
tryDisableReplicationFactorChange may unnecessary impair the API.

Would Admin API documentation that says "this is supported only from
brokers on version X and above. If not supported, the default behavior (no
replication factor guards) is applied" be sufficient?

Best,
Stanislav


On Fri, Aug 5, 2022 at 8:32 PM Colin McCabe  wrote:

> Hi Stanislav,
>
> Thanks for the KIP. I think this is a nice solution to the problem of not
> wanting to change the replication factor during reassignments.
>
> Just from a writing point of view, it would be nice for the first
> paragraph to be a bit more explicit about this goal. Maybe lead with "Many
> times, we don't want to change the replication factor of a partition during
> reassignment..." As it is, we're talking about metadata races before we've
> even explained what the goal is that the metadata races are thwarting. :)
>
> I like the RPC and command-line format changes; they are well-done and
> well-written. One thing we do need to spell out, those, is what the
> behavior is when the server does not support this new option. The simplest
> thing to do would be for the client to throw UnsupportedVersionException
> with an exception message indicating what the problem is. Then the caller
> could catch this and re-try the call without the flag (or give up, as
> appropriate?)
>
> The other option is to continue on but not actually protect replication
> factor. If we do this, at minimum we'd need to rename the flag something
> like "try to protect replication factor" to make it clear that it's
> best-effort.
>
> It's sort of debatable which way is better. In principle the UVE sounds
> nicer, but in practice maybe the other behavior is best? I suspect most
> systems would turn around and retry without the flag in the event of a
> UVE...
>
> best,
> Colin
>
>
> On Thu, Aug 4, 2022, at 13:37, Vikas Singh wrote:
> > Thanks Stanislav for the KIP. Seems like a reasonable proposal,
> > preventing users from accidentally altering the replica set under certain
> > conditions. I have couple of comments:
> >
> >
> >> In the case of an already-reassigning partition being reassigned again,
> > the validation compares the targetReplicaSet size of the reassignment to
> > the targetReplicaSet size of the new reassignment and throws if those
> > differ.
> > Can you add more detail to this, or clarify what is targetReplicaSet (for
> > e.g. why not sourceReplicaSet?) a

Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-08-07 Thread Sagar
Hey Nick,

Since we are adding a new method to the public interface, we should
probably decide the necessity of doing so, more so when you say that it's
an alternative to something already existing. My suggestion would be to
still modify the KIP around the new API, highlight how it's an alternative
to something already existing and why we should add the new API. You have
already explained streaming recursion, so that's one added benefit we get
as part of the new API. So, try to expand a little bit around those points.
Graph traversal should be fine as an example. You could make it slightly
more clear.

Let me know if it makes sense.

Thank you for your work on this!

Thanks!
Sagar.


On Fri, Aug 5, 2022 at 8:43 PM Nick Telford  wrote:

> Hi Sagar,
>
> Thanks for reading through my proposal.
>
> While the 2 new methods were originally intended for the recursive
> use-case, they could also be used as an alternative means of wiring two
> different KStreams together. The main reason I didn't document this in the
> KIP is that using the API for this doesn't bring anything new to the table:
> it's just an alternative form of something that already exists. If you
> believe it would be helpful, I can document this in more detail. I can
> re-orient the KIP around the new methods themselves, but I felt there was
> more value in the KIP emphasizing the new functionality and algorithms that
> they enable.
>
> What additional context would you like to see in the KIP? Some more
> examples of recursive algorithms that would benefit? A more concrete
> example than generic graph traversal? Something else?
>
> Regards,
>
> Nick Telford
>
> On Fri, 5 Aug 2022 at 11:02, Sagar  wrote:
>
> > Hey Nick,
> >
> > Thanks for the KIP. This seems like a great addition. However, just
> > wondering if the 2 new methods that you plan to add are meant only for
> > streaming recursion? I would imagine they could be repurposed for other
> use
> > cases as well? If yes, then probably the KIP should revolve around the
> > addition of adding these methods which would btw also support streaming
> > recursion. IMHO adding 2 new methods just for streaming recursion seems
> > slightly odd to me.
> >
> > Also, pardon my ignorance here, but I don't have much insight into
> > streaming recursion. You can add some more context to it.
> >
> > Thanks!
> > Sagar.
> >
> >
> >
> >
> > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > URL:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> > >
> > > Here's a KIP for extending the Streams DSL API to support "streaming
> > > recursion". See the Motivation section for details on what I mean by
> > this,
> > > along with an example of recursively counting nodes in a graph.
> > >
> > > I haven't included changes for the PAPI, mostly because I don't use it,
> > so
> > > I'm not as familiar with the idioms there. If you can think of a good
> > > analogue for a new PAPI method, I'm happy to include it in the KIP.
> > >
> > > Regards,
> > >
> > > Nick Telford
> > >
> >
>


Re: [DISCUSS] KIP-844: Transactional State Stores

2022-08-07 Thread Guozhang Wang
Hello Alex,

I made a pass on your WIP PR (thanks for putting it up! It helps for me to
understand many details). And here are some more thoughts:

1. I looked over all the places where `.transactional()` is called, and I
still think that they are not needed, and hence we could consider removing
this function in the Materialized API. We could, instead, have a global
config to specify if the built-in stores should be transactional or not.
For compatibility this config's default value would be false, but in the
future we would flip it to true.

2. We could simplify our implementation for `commit`, and no need to write
a txn marker file (more detailed thoughts in the PR). The main reason is
that the returned offset from `recover` is not guaranteed to represent the
image of the current local store anyways (we may also consider renaming it
to something else since "recover" indicates some guarantees and hence maybe
confusing). I.e. in `commit`, our built-in txn store just copy the tmp
store to the actual store and then return; in `recover`, our built-in txn
store could just be a no-op and return the passed in offset, and non-txn
store would just return 0 (or null, as in your current PR) which would
cause the caller ProcessorStateManager to do the wiping.

Some more detailed comments can be found in the PR. Let's chat about these
two meta thoughts here and leave the detailed implementations in the PR.


Guozhang


On Thu, Aug 4, 2022 at 7:39 AM Alexander Sorokoumov
 wrote:

> Hey Bruno,
>
> Thank you for the suggestions and the clarifying questions. I believe that
> they cover the core of this proposal, so it is crucial for us to be on the
> same page.
>
> 1. Don't you want to deprecate StateStore#flush().
>
>
> Good call! I updated both the proposal and the prototype.
>
>  2. I would shorten Materialized#withTransactionalityEnabled() to
> > Materialized#withTransactionsEnabled().
>
>
> Turns out, these methods are no longer necessary. I removed them from the
> proposal and the prototype.
>
>
> > 3. Could you also describe a bit more in detail where the offsets passed
> > into commit() and recover() come from?
>
>
> The offset passed into StateStore#commit is the last offset committed to
> the changelog topic. The offset passed into StateStore#recover is the last
> checkpointed offset for the given StateStore. Let's look at steps 3 and 4
> in the commit workflow. After the TaskExecutor/TaskManager commits, it
> calls
> StreamTask#postCommit[1] that in turn:
> a. updates the changelog offsets via
> ProcessorStateManager#updateChangelogOffsets[2]. The offsets here come from
> the RecordCollector[3], which tracks the latest offsets the producer sent
> without exception[4, 5].
> b. flushes/commits the state store in AbstractTask#maybeCheckpoint[6]. This
> method essentially calls ProcessorStateManager methods - flush/commit[7]
> and checkpoint[8]. ProcessorStateManager#commit goes over all state stores
> that belong to that task and commits them with the offset obtained in step
> `a`. ProcessorStateManager#checkpoint writes down those offsets for all
> state stores, except for non-transactional ones in the case of EOS.
>
> During initialization, StreamTask calls
> StateManagerUtil#registerStateStores[8] that in turn calls
> ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. At the
> moment, this method assigns checkpointed offsets to the corresponding state
> stores[10]. The prototype also calls StateStore#recover with the
> checkpointed offset and assigns the offset returned by recover()[11].
>
> 4. I do not quite understand how a state store can roll forward. You
> > mention in the thread the following:
>
>
> The 2-state-stores commit looks like this [12]:
>
>1. Flush the temporary state store.
>2. Create a commit marker with a changelog offset corresponding to the
>state we are committing.
>3. Go over all keys in the temporary store and write them down to the
>main one.
>4. Wipe the temporary store.
>5. Delete the commit marker.
>
>
> Let's consider crash failure scenarios:
>
>- Crash failure happens between steps 1 and 2. The main state store is
>in a consistent state that corresponds to the previously checkpointed
>offset. StateStore#recover throws away the temporary store and proceeds
>from the last checkpointed offset.
>- Crash failure happens between steps 2 and 3. We do not know what keys
>from the temporary store were already written to the main store, so we
>can't roll back. There are two options - either wipe the main store or
> roll
>forward. Since the point of this proposal is to avoid situations where
> we
>throw away the state and we do not care to what consistent state the
> store
>rolls to, we roll forward by continuing from step 3.
>- Crash failure happens between steps 3 and 4. We can't distinguish
>between this and the previous scenario, so we write all the keys from
> the
>temporary store. This is okay be