Hello Greg, Sorry for the long delay in responses, but very glad to see you have gone this far to resolve the problem. All of your solutions make sense to me.
I think for the first problem your raised, it is a general problem not only for Streams' sending changelog records but for any clients that needs to send data to Kafka, that if the timestamp embedded with the record is wrong it will cause the log rolling / retention to malfunction. The second problem you raised is a good question for Streams specifically, and to make it more general the question is whether we should use processing timestamp or event timestamp when generating changelog records; and as you already observed using event timestamp will cause reprocessing a headache. We have been working towards better improving our time semantics to cope with both semantics than enforcing users to pick one of the two, and as a first step we have extended the punctuate function to allow both event-time based punctuating as well as processing-time based ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics). Will keep you posted whenever we make further steps tackling this. Guozhang On Fri, Jul 7, 2017 at 11:21 AM, Greg Fodor <gfo...@gmail.com> wrote: > After more digging, a good solution seems to be to set the topic timetstamp > policy to LogAppendTime and also update our timestamp extractor to respect > this (by just returning record.timestamp() if the timestamp type is set to > LogAppendTime.) This gets us the semantics we want for the changelog topic > while allowing CreateTime timestamps to land on the other topics. > > On Thu, Jul 6, 2017 at 9:32 PM, Greg Fodor <gfo...@gmail.com> wrote: > > > I managed to answer some of my own questions :) > > > > For future google'ers: > > > > to deal with retention.ms see https://issues.apache.org/ > > jira/browse/KAFKA-4340 > > to deal with early rejection of bad timestamps the > > message.timestamp.difference.max.ms config is relevant discussion here > > https://issues.apache.org/jira/browse/KAFKA-5344 > > > > In our case, we can live with setting the retention.ms during backfills. > > Still would like to know if there are any better practices for dealing > with > > mis-stamped records during backills w/ state store topics. > > > > > > On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor <gfo...@gmail.com> wrote: > > > >> Hey all, we are currently working on migrating our system to kafka 10.2 > >> from 10.0 and one thing that we have hit that I wanted some advice on is > >> dealing with the new log retention/rolling semantics that are based on > >> timestamps. > >> > >> We send telemetry data from installed clients into kafka via kafka REST > >> proxy and the timestamps we land the messages with are "create time" > based > >> that are timestamped on the sender side. We try to adjust for clock skew > >> but this is not perfect and in practice we end up having some small > subset > >> of data landing into this topic with very erroneous timestamps (for > >> example, some arrive with timestamps many years in the future.) > >> > >> The first problem we are hitting is that these corrupt timestamps now > >> influence log segment rolling. For example, when reprocessing the entire > >> log, we end up seeing a bunch of segment files generated for state > stores > >> changelogs in kafka streams that store these events since as corrupted > >> timestamps come in a single one can trigger a segment roll if they are > >> timestamped far in the future due to the new heuristics. The result is > we > >> end up with hundreds of small segment files (which actually in our > current > >> configuration ends up causing kafka to run out of memory, but that's > >> another story :)) > >> > >> The second problem we are hitting is when reprocessing the full log, > >> since these timestamps are in the past as we run from the beginning, if > we > >> have a time based retention policy set on the state store changelog > topic > >> (say, a week) kafka ends up just deleting segments immediately since the > >> timestamps are far in the past and the segments are considered expired. > >> Previously this worked fine during reprocessing since the state store > >> changelogs were just going to get deleted a week after the reprocess job > >> ran since the retention policy was based upon segment file timestamp. > >> > >> Both of these problems could potentially be compensated for by writing a > >> clever timestamp extractor that tried to a) normalize timestamps that > >> appear very skewed and b) for changelog entries, extract a "logged at" > >> instead of "created at" timestamp when landing into the state store > >> changelog. The second problem could also be addressed by temporarily > >> changing the topic configuration during a reprocess to prevent "old" log > >> segments from being deleted. Neither of these seem ideal. > >> > >> I wanted to know if there are any recommendations on how to deal with > >> this -- it seems like there is a conflict between having segment file > >> policies be based on message timestamps and also having message > timestamps > >> be based on application creation time, since origin create time can > often > >> be subject to noise/skew/errors. One potential path forward would be to > be > >> able to have topic-specific settings for log rolling (including the > ability > >> to use the legacy behavior for retention that relies upon filesystem > >> timestamps) but I am sure there are problems with this proposal. > >> > >> In general, I don't really feel like I have a good sense of what a > >> correct solution would be, other than messages always having two > timestamps > >> and being able to have control over which timestamp is authoritative for > >> log segment management policies, but that obviously seems like something > >> that was considered and rejected for KIP-32 already. > >> > > > > > -- -- Guozhang