Hey Beckett,

I was proposing splitting up the KIP just for simplicity of discussion. You
can still implement them in one patch. I think otherwise it will be hard to
discuss/vote on them since if you like the offset proposal but not the time
proposal what do you do?

Introducing a second notion of time into Kafka is a pretty massive
philosophical change so it kind of warrants it's own KIP I think it isn't
just "Change message format".

WRT time I think one thing to clarify in the proposal is how MM will have
access to set the timestamp? Presumably this will be a new field in
ProducerRecord, right? If so then any user can set the timestamp, right?
I'm not sure you answered the questions around how this will work for MM
since when MM retains timestamps from multiple partitions they will then be
out of order and in the past (so the max(lastAppendedTimestamp,
currentTimeMillis) override you proposed will not work, right?). If we
don't do this then when you set up mirroring the data will all be new and
you have the same retention problem you described. Maybe I missed
something...?

My main motivation is that given that both Samza and Kafka streams are
doing work that implies a mandatory client-defined notion of time, I really
think introducing a different mandatory notion of time in Kafka is going to
be quite odd. We should think hard about how client-defined time could
work. I'm not sure if it can, but I'm also not sure that it can't. Having
both will be odd. Did you chat about this with Yi/Kartik on the Samza side?

When you are saying it won't work you are assuming some particular
implementation? Maybe that the index is a monotonically increasing set of
pointers to the least record with a timestamp larger than the index time?
In other words a search for time X gives the largest offset at which all
records are <= X?

For retention, I agree with the problem you point out, but I think what you
are saying in that case is that you want a size limit too. If you use
system time you actually hit the same problem: say you do a full dump of a
DB table with a setting of 7 days retention, your retention will actually
not get enforced for the first 7 days because the data is "new to Kafka".

-Jay


On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Jay,
>
> Thanks for the comments. Yes, there are actually three proposals as you
> pointed out.
>
> We will have a separate proposal for (1) - version control mechanism. We
> actually thought about whether we want to separate 2 and 3 internally
> before creating the KIP. The reason we put 2 and 3 together is it will
> saves us another cross board wire protocol change. Like you said, we have
> to migrate all the clients in all languages. To some extent, the effort to
> spend on upgrading the clients can be even bigger than implementing the new
> feature itself. So there are some attractions if we can do 2 and 3 together
> instead of separately. Maybe after (1) is done it will be easier to do
> protocol migration. But if we are able to come to an agreement on the
> timestamp solution, I would prefer to have it together with relative offset
> in the interest of avoiding another wire protocol change (the process to
> migrate to relative offset is exactly the same as migrate to message with
> timestamp).
>
> In terms of timestamp. I completely agree that having client timestamp is
> more useful if we can make sure the timestamp is good. But in reality that
> can be a really big *IF*. I think the problem is exactly as Ewen mentioned,
> if we let the client to set the timestamp, it would be very hard for the
> broker to utilize it. If broker apply retention policy based on the client
> timestamp. One misbehave producer can potentially completely mess up the
> retention policy on the broker. Although people don't care about server
> side timestamp. People do care a lot when timestamp breaks. Searching by
> timestamp is a really important use case even though it is not used as
> often as searching by offset. It has significant direct impact on RTO when
> there is a cross cluster failover as Todd mentioned.
>
> The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
> guarantee monotonic increase of the timestamp. Many commercial system
> actually do something similar to this to solve the time skew. About
> changing the time, I am not sure if people use NTP like using a watch to
> just set it forward/backward by an hour or so. The time adjustment I used
> to do is typically to adjust something like a minute  / week. So for each
> second, there might be a few mircoseconds slower/faster but should not
> break the clock completely to make sure all the time-based transactions are
> not affected. The one minute change will be done within a week but not
> instantly.
>
> Personally, I think having client side timestamp will be useful if we don't
> need to put the broker and data integrity under risk. If we have to choose
> from one of them but not both. I would prefer server side timestamp because
> for client side timestamp there is always a plan B which is putting the
> timestamp into payload.
>
> Another reason I am reluctant to use the client side timestamp is that it
> is always dangerous to mix the control plane with data plane. IP did this
> and it has caused so many different breaches so people are migrating to
> something like MPLS. An example in Kafka is that any client can construct a
> LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest (you
> name it) and send it to the broker to mess up the entire cluster, also as
> we already noticed a busy cluster can respond quite slow to controller
> messages. So it would really be nice if we can avoid giving the power to
> clients to control the log retention.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com> wrote:
>
> > So, with regards to why you want to search by timestamp, the biggest
> > problem I've seen is with consumers who want to reset their timestamps
> to a
> > specific point, whether it is to replay a certain amount of messages, or
> to
> > rewind to before some problem state existed. This happens more often than
> > anyone would like.
> >
> > To handle this now we need to constantly export the broker's offset for
> > every partition to a time-series database and then use external processes
> > to query this. I know we're not the only ones doing this. The way the
> > broker handles requests for offsets by timestamp is a little obtuse
> > (explain it to anyone without intimate knowledge of the internal workings
> > of the broker - every time I do I see this). In addition, as Becket
> pointed
> > out, it causes problems specifically with retention of messages by time
> > when you move partitions around.
> >
> > I'm deliberately avoiding the discussion of what timestamp to use. I can
> > see the argument either way, though I tend to lean towards the idea that
> > the broker timestamp is the only viable source of truth in this
> situation.
> >
> > -Todd
> >
> >
> > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava <e...@confluent.io
> >
> > wrote:
> >
> > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > >
> > > > 2. Nobody cares what time it is on the server.
> > > >
> > >
> > > This is a good way of summarizing the issue I was trying to get at,
> from
> > an
> > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention)
> > is
> > > reasonably handled by a server-side timestamp. I really just care that
> a
> > > message is there long enough that I have a chance to process it. #3
> > > (searching by timestamp) only seems useful if we can guarantee the
> > > server-side timestamp is close enough to the original client-side
> > > timestamp, and any mirror maker step seems to break that (even ignoring
> > any
> > > issues with broker availability).
> > >
> > > I'm also wondering whether optimizing for search-by-timestamp on the
> > broker
> > > is really something we want to do given that messages aren't really
> > > guaranteed to be ordered by application-level timestamps on the broker.
> > Is
> > > part of the need for this just due to the current consumer APIs being
> > > difficult to work with? For example, could you implement this pretty
> > easily
> > > client side just the way you would broker-side? I'd imagine a couple of
> > > random seeks + reads during very rare occasions (i.e. when the app
> starts
> > > up) wouldn't be a problem performance-wise. Or is it also that you need
> > the
> > > broker to enforce things like monotonically increasing timestamps since
> > you
> > > can't do the query properly and efficiently without that guarantee, and
> > > therefore what applications are actually looking for *is* broker-side
> > > timestamps?
> > >
> > > -Ewen
> > >
> > >
> > >
> > > > Consider cases where data is being copied from a database or from log
> > > > files. In steady-state the server time is very close to the client
> time
> > > if
> > > > their clocks are sync'd (see 1) but there will be times of large
> > > divergence
> > > > when the copying process is stopped or falls behind. When this occurs
> > it
> > > is
> > > > clear that the time the data arrived on the server is irrelevant, it
> is
> > > the
> > > > source timestamp that matters. This is the problem you are trying to
> > fix
> > > by
> > > > retaining the mm timestamp but really the client should always set
> the
> > > time
> > > > with the use of server-side time as a fallback. It would be worth
> > talking
> > > > to the Samza folks and reading through this blog post (
> > > >
> > >
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > > > )
> > > > on this subject since we went through similar learnings on the stream
> > > > processing side.
> > > >
> > > > I think the implication of these two is that we need a proposal that
> > > > handles potentially very out-of-order timestamps in some kind of
> sanish
> > > way
> > > > (buggy clients will set something totally wrong as the time).
> > > >
> > > > -Jay
> > > >
> > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > The magic byte is used to version message format so we'll need to
> > make
> > > > > sure that check is in place--I actually don't see it in the current
> > > > > consumer code which I think is a bug we should fix for the next
> > release
> > > > > (filed KAFKA-2523). The purpose of that field is so there is a
> clear
> > > > check
> > > > > on the format rather than the scrambled scenarios Becket describes.
> > > > >
> > > > > Also, Becket, I don't think just fixing the java client is
> sufficient
> > > as
> > > > > that would break other clients--i.e. if anyone writes a v1
> messages,
> > > even
> > > > > by accident, any non-v1-capable consumer will break. I think we
> > > probably
> > > > > need a way to have the server ensure a particular message format
> > either
> > > > at
> > > > > read or write time.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi Guozhang,
> > > > >>
> > > > >> I checked the code again. Actually CRC check probably won't fail.
> > The
> > > > >> newly
> > > > >> added timestamp field might be treated as keyLength instead, so we
> > are
> > > > >> likely to receive an IllegalArgumentException when try to read the
> > > key.
> > > > >> I'll update the KIP.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com>
> > > > wrote:
> > > > >>
> > > > >> > Hi, Guozhang,
> > > > >> >
> > > > >> > Thanks for reading the KIP. By "old consumer", I meant the
> > > > >> > ZookeeperConsumerConnector in trunk now, i.e. without this bug
> > > fixed.
> > > > >> If we
> > > > >> > fix the ZookeeperConsumerConnector then it will throw exception
> > > > >> complaining
> > > > >> > about the unsupported version when it sees message format V1.
> > What I
> > > > was
> > > > >> > trying to say is that if we have some ZookeeperConsumerConnector
> > > > running
> > > > >> > without the fix, the consumer will complain about CRC mismatch
> > > instead
> > > > >> of
> > > > >> > unsupported version.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Jiangjie (Becket) Qin
> > > > >> >
> > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> >> Thanks for the write-up Jiangjie.
> > > > >> >>
> > > > >> >> One comment about migration plan: "For old consumers, if they
> see
> > > the
> > > > >> new
> > > > >> >> protocol the CRC check will fail"..
> > > > >> >>
> > > > >> >> Do you mean this bug in the old consumer cannot be fixed in a
> > > > >> >> backward-compatible way?
> > > > >> >>
> > > > >> >> Guozhang
> > > > >> >>
> > > > >> >>
> > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
> > > > <j...@linkedin.com.invalid
> > > > >> >
> > > > >> >> wrote:
> > > > >> >>
> > > > >> >> > Hi,
> > > > >> >> >
> > > > >> >> > We just created KIP-31 to propose a message format change in
> > > Kafka.
> > > > >> >> >
> > > > >> >> >
> > > > >> >> >
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> > > > >> >> >
> > > > >> >> > As a summary, the motivations are:
> > > > >> >> > 1. Avoid server side message re-compression
> > > > >> >> > 2. Honor time-based log roll and retention
> > > > >> >> > 3. Enable offset search by timestamp at a finer granularity.
> > > > >> >> >
> > > > >> >> > Feedback and comments are welcome!
> > > > >> >> >
> > > > >> >> > Thanks,
> > > > >> >> >
> > > > >> >> > Jiangjie (Becket) Qin
> > > > >> >> >
> > > > >> >>
> > > > >> >>
> > > > >> >>
> > > > >> >> --
> > > > >> >> -- Guozhang
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>

Reply via email to