This is a very nice summary of the consistency / correctness issues
possible with a commit log.
> (assuming it’s publishing asynchronously and in an open loop)
It's perhaps already clear to folks here, but -- if you *don't* do that,
and instead only send one batch of messages at a time and check
I've been trying to understand what is being proposed in this KIP and I've put
down some notes with some feedback from Ben that I wanted to share for
feedback. I'm not really following the flow of the thread, since I've read a
few sources to get to this, and I apologize for that.
Here is how I
Couple of other things.
A. In the discussion, we talked about the usage of getting the latest high
watermark from the broker. Currently, the high watermark in a partition can
go back a bit for a short period of time during leader change. So, the high
watermark returned in the getOffset api is not
Jun, I see. So this only applies to uncompressed messages. Maybe that is
fine given most user will probably turn on compression?
I think the first approach is a more general approach but from application
point of view might harder to implement. I am thinking is it easier for the
application simply
Jun,
You are right that Jay's example omits the consuming part, and the local
state should be maintained only by consumed messages so that all replicas
are eventually consistent. Assuming that, the beauty of conditional push is
that it is safe for clients to do this independently. It guarantees th
Jay,
The code that you put there seems to be designed for the second case where
there is a master. In the first case when there is no master, updates can
happen from multiple replicas. Therefore, to maintain the local view, each
replica can't just update the local view using only the updates sent
Hey Jun,
Yeah I think Ben is right, both these cases are covered. The algorithm is
something like
while(true) {
v = get_local_val(key)
v' = modify(v)
try {
log_to_kafka(v')
put_local_val(k, v')
break
} catch(CasFailureException e) {
warn("optimistic lock failure)
}
}
W
Jiangjie -- it seems to me that, while there are cases where you want
conservative producer settings like you suggest, there are others
enabled by this KIP where pipelining and retries are not an issue.
As a toy example, I've adapted the producer performance test to behave
as an idempotent produce
This is a great summary of the commit log options. Some additional comments:
1. One way to handle a transient failure is to treat it the same way
as a conditional publish failure: recompute the post-value before
retrying. (I believe this is enough to make this approach work
correctly.)
2. You can
A couple of thoughts on the commit log use case. Suppose that we want to
maintain multiple replicas of a K/V store backed by a shared Kafka
topic/partition as a commit log. There are two possible ways to use Kafka
as a commit log.
1. The first approach allows multiple producers to publish to Kafka
Jiangjie: I think giving users the possibility of defining a custom policy
for handling rejections is a good idea. For instance, this will allow Kafka
to act as an event store in an Event Sourcing application. If the event(s)
are rejected by the store, the original command may need to be re-validat
@Ewen, good point about batching. Yes, it would be tricky if we want to do
a per-key conditional produce. My understanding is that the prerequisite of
this KIP is:
1. Single producer for each partition.
2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER
The major problem i
So I had another look at the 'Idempotent Producer' proposal this
afternoon, and made a few notes on how I think they compare; if I've
made any mistakes, I'd be delighted if someone with more context on
the idempotent producer design would correct me.
As a first intuition, you can think of the 'con
@Becket - for compressed batches, I think this just works out given the KIP
as described. Without the change you're referring to, it still only makes
sense to batch messages with this KIP if all the expected offsets are
sequential (else some messages are guaranteed to fail). I think that
probably j
Tangent: I think we should complete the move of Produce / Fetch RPC to
the client libraries before we add more revisions to this protocol.
On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
wrote:
> I missed yesterday's KIP hangout. I'm currently working on another KIP for
> enriched metadata of mess
I missed yesterday's KIP hangout. I'm currently working on another KIP for
enriched metadata of messages. Guozhang has already created a wiki page
before (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
We plan to fill the relative offset to the offset field in
That's a fair point. I've added some imagined job logic to the KIP, so
we can make sure the proposal stays in sync with the usages we're
discussing. (The logic is just a quick sketch for now -- I expect I'll
need to elaborate it as we get into more detail, or to address other
concerns...)
On Tue,
In KV store usage, all instances are writers, aren't they? There is no
leader or master, thus there is no fail over. The offset based CAS ensures
an update is based on the latest value and doesn't care who is writing the
new value.
I think the idea of the offset based CAS is great. I think it work
For 1, yes, when there is a transient leader change, it's guaranteed that a
prefix of the messages in a request will be committed. However, it seems
that the client needs to know what subset of messages are committed in
order to resume the sending. Then the question is how.
As Flavio indicated, fo
Yes, sorry, I think this is right -- it's pretty application-specific.
One thing to note: in a large subset of cases (ie. bulk load,
copycat-type, mirrormaker) the correct response is not to resend the
message at all; if there's already a message at that offset, it's
because another instance of the
I'm with you on the races that could happen in the scenarios you describe,
but I'm still not convinced that conditionally updating is the best call.
Instead of conditionally updating, the broker could fence off the old owner
to avoid spurious writes, and that's valid for all attempts. The advantage
I'm with you on the races that could happen in the scenarios you describe,
but I'm still not convinced that conditionally updating is the best call.
Instead of conditionally updating, the broker could fence off the old owner
to avoid spurious writes, and that's valid for all attempts. The advantage
It would be worth fleshing out the use cases a bit more and thinking
through the overlap with the other proposals for transactions and
idempotence (since likely we will end up with both).
The advantage of this proposal is that it is really simple.
If we go through use cases:
1. Stream processing:
Up to Ben to clarify, but I'd think that in this case, it is up to the
logic of B to decide what to do. B knows that the offset isn't what it
expects, so it can react accordingly. If it chooses to try again, then it
should not violate any application invariant.
-Flavio
On Fri, Jul 17, 2015 at 9:4
Hi Jun,
Thanks for the close reading! Responses inline.
> Thanks for the write-up. The single producer use case you mentioned makes
> sense. It would be useful to include that in the KIP wiki.
Great -- I'll make sure that the wiki is clear about this.
> 1. What happens when the leader of the pa
The per-key generalization is useful. As Jiangjie mentioned in KAFKA-2260,
one thing that we need to sort out is what happens if a produce request has
messages with different keys and some of the messages have expected offsets
while some others don't. Currently, the produce response has an error co
Hi, Ben,
Thanks for the write-up. The single producer use case you mentioned makes
sense. It would be useful to include that in the KIP wiki. A couple
questions on the design details.
1. What happens when the leader of the partition changes in the middle of a
produce request? In this case, the pr
Just wanted to flag a little discussion that happened on the ticket:
https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
In particular, Yasuhiro Matsuda proposed an interesting variant on
Hi all,
So, perhaps it's worth adding a couple specific examples of where this
feature is useful, to make this a bit more concrete:
- Suppose I'm using Kafka as a commit log for a partitioned KV store,
like Samza or Pistachio (?) do. We bootstrap the process state by
reading from that partition,
Good concept. I have a question though.
Say there are two producers A and B. Both producers are producing to same
partition.
- A sends a message with expected offset, x1
- Broker accepts is and sends an Ack
- B sends a message with expected offset, x1
- Broker rejects it, sends nack
- B sends mess
If we have 2 producers producing to a partition, they can be out of order,
then how does one producer know what offset to expect as it does not
interact with other producer?
Can you give an example flow that explains how it works with single
producer and with multiple producers?
Thanks,
Mayures
I like this feature, it reminds me of conditional updates in zookeeper. I'm not
sure if it'd be best to have some mechanism for fencing rather than a
conditional write like you're proposing. The reason I'm saying this is that the
conditional write applies to requests individually, while it sound
Hi there,
I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
And there's some previous discussion on the ticket and the users list:
ht
33 matches
Mail list logo