Thanks Guozhang, I figured I could use a custom timestamp extractor, and set that timestamp to 'now' when reading a source topic, as the original timestamp is pretty much irrelevant. I thought that the Timestamp extractor would then also use that updated timestamp as 'stream time', but I don't really see that happening, so that assumption was wrong.
If I could configure a timestamp extractor that would also be used by the producer I think I'd be in business, but right now I don't see an elegant way forward, so any ideas for work arounds are welcome. regards, Frank On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Frank, thanks for sharing with your findings. > > I think this is a general issue to consider in Streams, and the community > has been thinking about it: we write intermediate topics with the stream > time that is inherited from the source topic's timestamps, however that > timestamp is used for log rolling / retention etc as well, and these two > purposes (use timestamps in processing for out-of-ordering and late > arrivals, and operations on the Kafka topics) could rely on different > timestamp semantics. We need to revisit on timestamps can be maintained > across the topology in Streams. > > Guozhang > > On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flya...@gmail.com> wrote: > > > Hi Matthias, > > > > Ok, that clarifies quite a bit. I never really went into the timestamp > > aspects, as time does not really play a role in my application (aside > from > > the repartition topics, I have no KStreams or Windowed operation, just > > different kind of KTable join). > > > > I do think that the fail case I see (With this version joining two 'old' > > KTables causes a small percentage of records to vanish) is far from > > intuitive, and it somehow worked fine until a few weeks ago. > > > > I think your option 3 should work. I'll make a custom timestamp extractor > > (I actually do have a timestamp in my messages), and I'll set it to the > > current time as they enter the streams application. > > > > Thanks, that helped, regards, Frank > > > > On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Hi Frank, > > > > > > yes, retention policy is based on the embedded record timestamps and > not > > > on system time. Thus, if you send messages with an old timestamp, they > > > can trigger log/segment rolling. > > > > > > >> I see that the repartition topics have timestamp.type = CreateTime, > > does > > > >> that mean it uses the timestamp of the > > > >> original message? > > > > > > Yes. That's the default setting on the broker side. For Streams, we > > > maintain a so-called "stream time" that is computed based on the input > > > record timestamps. This "stream time" is used to set the timestamp for > > > records that are written by Stream. (so it's more or less the timestamp > > > of the input records). > > > > > > >> Shouldn't that be LogAppendTime for repartition topics? > > > > > > No. Streams needs to preserve the original timestamp to guaranteed > > > correct semantics for downstream window operations. Thus, it should be > > > CreateTime -- if you switch to LogAppendTime, you might break your > > > application logic and get wrong results. > > > > > > >> Or is there a way to configure that? > > > > > > You can configure this on a per topic basis on the brokers. > > > > > > >> If I hack into my Kafka streams code to force it to use > LogAppendTime > > > seems > > > >> to solve my problem, but that appears to > > > >> take a huge toll on the brokers. Throughput plummets, and I don't > > really > > > >> know why. > > > > > > I am not sure what you mean by this? As it's a topic config, I don't > > > understand how you can force this within you Streams application? > > > > > > > > > IMHO, you have multiple options thoug: > > > - increase the retention time for you re-partitioning topics > > > - you could change the retention policy to number of bytes instead of > > > time for the re-partitioning topics > > > - you can implement a custom timestamp extractor and adjust the > > > timestamps accordingly ("stream time" is based on whatever timestamp > > > extractor return) > > > > > > However, if you have records with old timestamps, I am wondering why > > > they are not truncated in your input topic? Do you not face the same > > > issue there? > > > > > > All my topics are compacted, I use no windowed operations at all, the > > only > > 'delete' > > topics are the repartitioning internal topics. > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 6/2/17 9:33 AM, Frank Lyaruu wrote: > > > > Hi Kafka people, > > > > > > > > I'm running an application that pushes database changes into a Kafka > > > topic. > > > > I'm also running a Kafka streams application > > > > that listens to these topics, and groups them using the high level > API, > > > and > > > > inserts them to another database. > > > > > > > > All topics are compacted, with the exception of the 'repartition > > topics', > > > > which are configured to be retained for 36 hours. > > > > > > > > Note that the changes in the original kafka topics can be old > > (generally > > > > more than 36 hours), as they only change when > > > > the data changes. > > > > > > > > When I start an instance of the Kafka Streams application, I see the > > > > repartition topics being deleted immediately, > > > > sometimes before they are processed, and it looks like the > repartition > > > > messages use the same timestamp as the > > > > original message. > > > > > > > > I see that the repartition topics have timestamp.type = CreateTime, > > does > > > > that mean it uses the timestamp of the > > > > original message? Shouldn't that be LogAppendTime for repartition > > topics? > > > > Or is there a way to configure that? > > > > > > > > If I hack into my Kafka streams code to force it to use LogAppendTime > > > seems > > > > to solve my problem, but that appears to > > > > take a huge toll on the brokers. Throughput plummets, and I don't > > really > > > > know why. > > > > > > > > Any ideas? > > > > > > > > Frank > > > > > > > > > > > > > > > > -- > -- Guozhang >