I tried to use a TimestampExtractor that uses our timestamps from the messages, and use a 'map' operation on the KTable to set it to current, to have a precise point where I discard our original timestamps. That does not work, (I verified by writing a separate java Kafka Consumer and spit out the timestamps) as the TimestampExtractor only gets called once, and it will stick with that time. I did not really have a good reason not to simply use the WallclockTimeExtractor, and that one seems to do exactly what I wanted.
So, I'm good! I am interested in the community discussion Guozhang mentions. Is there a KIP for that? regards, Frank On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Frank, > > If you use "now", I assume you are calling System.currentTimeMillis(). > If yes, you can also use predefined WallclockTimestampExtractor that > ships with Streams (no need to write your own one). > > > I thought that the Timestamp extractor would then also use > >> that updated timestamp as 'stream time', but I don't really see that > >> happening, so that assumption was wrong. > > Yes, this should happen. Not sure why you don't observe this. And thus, > the producer should use this timestamp to write the records. > > How did you verify the timestamps that are set for your output records? > > > -Matthias > > > On 6/5/17 6:15 AM, Frank Lyaruu wrote: > > Thanks Guozhang, > > > > I figured I could use a custom timestamp extractor, and set that > timestamp > > to 'now' when reading a source topic, as the original timestamp is pretty > > much irrelevant. I thought that the Timestamp extractor would then also > use > > that updated timestamp as 'stream time', but I don't really see that > > happening, so that assumption was wrong. > > > > If I could configure a timestamp extractor that would also be used by the > > producer I think I'd be in business, but right now I don't see an elegant > > way forward, so any ideas for work arounds are welcome. > > > > regards, Frank > > > > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Frank, thanks for sharing with your findings. > >> > >> I think this is a general issue to consider in Streams, and the > community > >> has been thinking about it: we write intermediate topics with the stream > >> time that is inherited from the source topic's timestamps, however that > >> timestamp is used for log rolling / retention etc as well, and these two > >> purposes (use timestamps in processing for out-of-ordering and late > >> arrivals, and operations on the Kafka topics) could rely on different > >> timestamp semantics. We need to revisit on timestamps can be maintained > >> across the topology in Streams. > >> > >> Guozhang > >> > >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flya...@gmail.com> > wrote: > >> > >>> Hi Matthias, > >>> > >>> Ok, that clarifies quite a bit. I never really went into the timestamp > >>> aspects, as time does not really play a role in my application (aside > >> from > >>> the repartition topics, I have no KStreams or Windowed operation, just > >>> different kind of KTable join). > >>> > >>> I do think that the fail case I see (With this version joining two > 'old' > >>> KTables causes a small percentage of records to vanish) is far from > >>> intuitive, and it somehow worked fine until a few weeks ago. > >>> > >>> I think your option 3 should work. I'll make a custom timestamp > extractor > >>> (I actually do have a timestamp in my messages), and I'll set it to the > >>> current time as they enter the streams application. > >>> > >>> Thanks, that helped, regards, Frank > >>> > >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> Hi Frank, > >>>> > >>>> yes, retention policy is based on the embedded record timestamps and > >> not > >>>> on system time. Thus, if you send messages with an old timestamp, they > >>>> can trigger log/segment rolling. > >>>> > >>>>>> I see that the repartition topics have timestamp.type = CreateTime, > >>> does > >>>>>> that mean it uses the timestamp of the > >>>>>> original message? > >>>> > >>>> Yes. That's the default setting on the broker side. For Streams, we > >>>> maintain a so-called "stream time" that is computed based on the input > >>>> record timestamps. This "stream time" is used to set the timestamp for > >>>> records that are written by Stream. (so it's more or less the > timestamp > >>>> of the input records). > >>>> > >>>>>> Shouldn't that be LogAppendTime for repartition topics? > >>>> > >>>> No. Streams needs to preserve the original timestamp to guaranteed > >>>> correct semantics for downstream window operations. Thus, it should be > >>>> CreateTime -- if you switch to LogAppendTime, you might break your > >>>> application logic and get wrong results. > >>>> > >>>>>> Or is there a way to configure that? > >>>> > >>>> You can configure this on a per topic basis on the brokers. > >>>> > >>>>>> If I hack into my Kafka streams code to force it to use > >> LogAppendTime > >>>> seems > >>>>>> to solve my problem, but that appears to > >>>>>> take a huge toll on the brokers. Throughput plummets, and I don't > >>> really > >>>>>> know why. > >>>> > >>>> I am not sure what you mean by this? As it's a topic config, I don't > >>>> understand how you can force this within you Streams application? > >>>> > >>>> > >>>> IMHO, you have multiple options thoug: > >>>> - increase the retention time for you re-partitioning topics > >>>> - you could change the retention policy to number of bytes instead of > >>>> time for the re-partitioning topics > >>>> - you can implement a custom timestamp extractor and adjust the > >>>> timestamps accordingly ("stream time" is based on whatever timestamp > >>>> extractor return) > >>>> > >>>> However, if you have records with old timestamps, I am wondering why > >>>> they are not truncated in your input topic? Do you not face the same > >>>> issue there? > >>>> > >>>> All my topics are compacted, I use no windowed operations at all, the > >>> only > >>> 'delete' > >>> topics are the repartitioning internal topics. > >>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On 6/2/17 9:33 AM, Frank Lyaruu wrote: > >>>>> Hi Kafka people, > >>>>> > >>>>> I'm running an application that pushes database changes into a Kafka > >>>> topic. > >>>>> I'm also running a Kafka streams application > >>>>> that listens to these topics, and groups them using the high level > >> API, > >>>> and > >>>>> inserts them to another database. > >>>>> > >>>>> All topics are compacted, with the exception of the 'repartition > >>> topics', > >>>>> which are configured to be retained for 36 hours. > >>>>> > >>>>> Note that the changes in the original kafka topics can be old > >>> (generally > >>>>> more than 36 hours), as they only change when > >>>>> the data changes. > >>>>> > >>>>> When I start an instance of the Kafka Streams application, I see the > >>>>> repartition topics being deleted immediately, > >>>>> sometimes before they are processed, and it looks like the > >> repartition > >>>>> messages use the same timestamp as the > >>>>> original message. > >>>>> > >>>>> I see that the repartition topics have timestamp.type = CreateTime, > >>> does > >>>>> that mean it uses the timestamp of the > >>>>> original message? Shouldn't that be LogAppendTime for repartition > >>> topics? > >>>>> Or is there a way to configure that? > >>>>> > >>>>> If I hack into my Kafka streams code to force it to use LogAppendTime > >>>> seems > >>>>> to solve my problem, but that appears to > >>>>> take a huge toll on the brokers. Throughput plummets, and I don't > >>> really > >>>>> know why. > >>>>> > >>>>> Any ideas? > >>>>> > >>>>> Frank > >>>>> > >>>> > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > >