1. We don't need to worry about impl detail. But yes, we can remove the method from the interanl context that extends `ProcessorContext` already
2. Same here: we can discuss on the PR. Btw: it seems you got enough votes. Can you close the vote? Looking forward to your PR. -Matthias On 11/27/20 9:51 PM, Rohit Deshpande wrote: > Hi, > I would like to revive this KIP. > 1. As per proposed solution, we want to add following method in > ProcessorContext class > /** > * Returns current cached wall-clock system timestamp in milliseconds. > * > * @return the current cached wall-clock system timestamp in milliseconds > */ > long currentSystemTimeMs(); > but InternalProcessorContext class already contains same method: > https://github.com/guozhangwang/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java#L54 > Will it make more sense to get rid of this method from > InternalProcessorContext and add it to ProcessorContext? > 2. I am thinking of adding one test in TopologyDriverTest where using > currentSystemTimeMs(), Processor will determine what to do with incoming > record by comparing its timestamp with wall clock time. Similarly we can have > another test where we fetch streamTime and can take an action on incoming > record. > > > Thanks, > Rohit > > > On 2020/08/14 05:07:04, "John Roesler" <v...@apache.org> wrote: >> Thanks for the reply, Matthias,> >> >> I see what you mean. I suppose I was thinking that we would pass in the >> cached system time, which is also what we’re proposing to add to the >> ProcessorContext.> >> >> If you think there’s something about the timestamp extractor in particular >> that would make people want more precision, then something like Time would >> do the trick. Since it’s not a public API, maybe just ‘Supplier<Long>’ would >> be appropriate.> >> >> But I also don’t want to bikeshed it. My only concern was that it’s awkward >> to ask people to actually change their application code for testing. But >> maybe in this case, an option is better than no option, and if people don’t >> like it, we can always deprecate the mock extractor and evolve the interface >> later. > >> >> So, I’m +1 either way.> >> >> Thanks,> >> John> >> >> On Mon, Aug 3, 2020, at 16:28, Matthias J. Sax wrote:> >>> Interesting proposal.> >>>> >>> However, it raises the question how the runtime would pass in the> >>> `systemTime` parameter? To be accurate, we would need to call> >>> `Time.milliseconds()` each time before we call the timestamp extractor.> >>> This sound expensive and maybe the extractor does not even use this value.> >>>> >>> Or we only call `Time.milliseconds()` periodically (as we also do in our> >>> runtime code) to make it cheap, however, we loose precision? Not sure if> >>> we can make this trade-off for the user?> >>>> >>> Handing in the `Time` object itself might be another idea, however it> >>> seems "dangerous" though, as it does not seem to be actually public API?> >>>> >>> Last, do we really think we need this feature? We never had a feature> >>> request for it and I am not aware of any issue with the current> >>> TimestampExtractor interface.> >>>> >>> It's always easier to add it later if there is real demand instead of> >>> pro-actively changing it (and maybe the need to deprecate and remove> >>> later) with no clear benefit? Adding the `MockTimestampsExtractor` as> >>> part of the test-utils package seems less "dangerous" and should do the> >>> job, allowing us to collect feedback. If it's not good enough, we can> >>> still change the TimestampExtractor interface as a follow up?> >>>> >>>> >>> -Matthias> >>>> >>> On 7/28/20 10:03 AM, John Roesler wrote:> >>>> Thanks Matthias,> >>>>> >>>> This is a really good point. It might be a bummer> >>>> to have to actually change the topology between> >>>> testing and production. Do you think we can rather> >>>> evolve the TimestampExtractor interface to let> >>>> Streams pass in the current system time, along with> >>>> the current record and the current partition time?> >>>>> >>>> For example, we could add a new method:> >>>> long extract(> >>>> ConsumerRecord<Object, Object> record, > >>>> long partitionTime,> >>>> long systemTime> >>>> );> >>>>> >>>> Then, Streams could pass in the current system > >>>> time and TopologyTestDriver could pass the mocked> >>>> time. Additionally, users who implement> >>>> TimestampExtractor on their own would be able to> >>>> deterministically unit-test their own implementation.> >>>>> >>>> It's always a challenge to add to an interface without> >>>> breaking compatibility. In this case, it seems like> >>>> we could provide a default implementation that just> >>>> ignores the systemTime argument and calls> >>>> extract(record, partitionTime) and also deprecate> >>>> the existing method. Then custom implementations> >>>> would get a deprecation warning telling them to> >>>> implement the other method, and when we remove> >>>> the deprecated extract(record, partitionTime), we can> >>>> also drop the default implementation from the new> >>>> method.> >>>>> >>>> Specifically, what do you think about:> >>>> =================================> >>>> public interface TimestampExtractor {> >>>> /*...> >>>> * @deprecated Since 2.7 Implement> >>>> * {@code extract(ConsumerRecord<Object, Object> record, long >>>> partitionTime, long systemTime)} instead> >>>> */> >>>> @Deprecated> >>>> long extract(> >>>> ConsumerRecord<Object, Object> record,> >>>> long partitionTime> >>>> );> >>>>> >>>> default long extract(> >>>> ConsumerRecord<Object, Object> record,> >>>> long partitionTime,> >>>> long systemTime) {> >>>> return extract(record, partitionTime);> >>>> }> >>>> }> >>>> =================================> >>>>> >>>> Thanks,> >>>> -John> >>>>> >>>> On Sun, Jul 26, 2020, at 15:47, Matthias J. Sax wrote:> >>>>> Hi,> >>>>>> >>>>> I just had one more thought about an additional improvement we might> >>>>> want to include in this KIP.> >>>>>> >>>>> Kafka Streams ships with a `WallclockTimestampExtractor` that just> >>>>> returns `System.currentTimeMillis()` and thus, cannot be mocked. And it> >>>>> seems that there is no way for a user to build a custom timestamps> >>>>> extractor that returns the TTD's mocked system time.> >>>>>> >>>>> Thus, it might be nice, to add a `MockTimeExtractor` (only in the> >>>>> test-util package) that users could set and this `MockTimeExtractor`> >>>>> returns the mocked system time.> >>>>>> >>>>> Thoughts?> >>>>>> >>>>>> >>>>> -Matthias> >>>>>> >>>>> On 7/7/20 11:11 PM, Matthias J. Sax wrote:> >>>>>> I think, we don't need a default implementation for the new methods.> >>>>>>> >>>>>> What would be the use-case to implement the `ProcessorContext`> >>>>>> interface? In contract to, for example, `KeyValueStore`,> >>>>>> `ProcessorContext` is a use-only interface because it's never passed> >>>>>> into Kafka Streams, but only handed out to the user.> >>>>>>> >>>>>>> >>>>>> -Matthias> >>>>>>> >>>>>>> >>>>>> On 7/7/20 1:28 PM, William Bottrell wrote:> >>>>>>> Sure, I would appreciate help from Piotr creating an example.> >>>>>>>> >>>>>>> On Tue, Jul 7, 2020 at 12:03 PM Boyang Chen <re...@gmail.com>> >>>>>>> wrote:> >>>>>>>> >>>>>>>> Hey John,> >>>>>>>>> >>>>>>>> since ProcessorContext is a public API, I couldn't be sure that >>>>>>>> people> >>>>>>>> won't try to extend it. Without a default implementation, user code> >>>>>>>> compilation will break.> >>>>>>>>> >>>>>>>> William and Piotr, it seems that we haven't added any example usage of >>>>>>>> the> >>>>>>>> new API, could we try to address that? It should help with the >>>>>>>> motivation> >>>>>>>> and follow-up meta comments as John proposed.> >>>>>>>>> >>>>>>>> Boyang> >>>>>>>>> >>>>>>>> On Mon, Jul 6, 2020 at 12:04 PM Matthias J. Sax <mj...@apache.org> >>>>>>>> wrote:> >>>>>>>>> >>>>>>>>> William,> >>>>>>>>>> >>>>>>>>> thanks for the KIP. LGMT. Feel free to start a vote.> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> -Matthias> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> On 7/4/20 10:14 AM, John Roesler wrote:> >>>>>>>>>> Hi Richard,> >>>>>>>>>>> >>>>>>>>>> It’s good to hear from you!> >>>>>>>>>>> >>>>>>>>>> Thanks for bringing up the wall-clock suppression feature. IIRC,> >>>>>>>> someone> >>>>>>>>> actually started a KIP discussion for it already, but I don’t think >>>>>>>>> it> >>>>>>>> went> >>>>>>>>> to a vote. I don’t recall any technical impediment, just the lack of> >>>>>>>>> availability to finish it up. Although there is some association, it> >>>>>>>> would> >>>>>>>>> be good to keep the KIPs separate.> >>>>>>>>>>> >>>>>>>>>> Thanks,> >>>>>>>>>> John> >>>>>>>>>>> >>>>>>>>>> On Sat, Jul 4, 2020, at 10:05, Richard Yu wrote:> >>>>>>>>>>> Hi all,> >>>>>>>>>>>> >>>>>>>>>>> This reminds me of a previous issue I think that we were >>>>>>>>>>> discussing.> >>>>>>>>>>> @John Roesler <ma...@apache.org> I think you should> >>>>>>>> remember> >>>>>>>>> this one.> >>>>>>>>>>>> >>>>>>>>>>> A while back, we were talking about having suppress operator emit> >>>>>>>>>>> records by wall-clock time instead of stream time.> >>>>>>>>>>> If we are adding this, wouldn't that make it more feasible for us >>>>>>>>>>> to> >>>>>>>>>>> implement that feature for suppression?> >>>>>>>>>>>> >>>>>>>>>>> If I recall correctly, there actually had been quite a bit of user> >>>>>>>>>>> demand for such a feature.> >>>>>>>>>>> Might be good to include it in this KIP (or maybe use one of the >>>>>>>>>>> prior> >>>>>>>>>>> KIPs for this feature).> >>>>>>>>>>>> >>>>>>>>>>> Best,> >>>>>>>>>>> Richard> >>>>>>>>>>>> >>>>>>>>>>> On Sat, Jul 4, 2020 at 6:58 AM John Roesler <vv...@apache.org>> >>>>>>>>> wrote:> >>>>>>>>>>>> Hi all,> >>>>>>>>>>>>> >>>>>>>>>>>> 1. Thanks, Boyang, it is nice to see usage examples in KIPs like> >>>>>>>>> this. It helps during the discussion, and it’s also good >>>>>>>>> documentation> >>>>>>>>> later on.> >>>>>>>>>>>>> >>>>>>>>>>>> 2. Yeah, this is a subtle point. The motivation mentions being >>>>>>>>>>>> able> >>>>>>>>> to control the time during tests, but to be able to make it work, >>>>>>>>> the> >>>>>>>>> processor implementation needs a public method on ProcessorContext to >>>>>>>>> get> >>>>>>>>> the time. Otherwise, processors would have to check the type of the> >>>>>>>> context> >>>>>>>>> and cast, depending on whether they’re running inside a test or not. >>>>>>>>> In> >>>>>>>>> retrospect, if we’d had a usage example, this probably would have >>>>>>>>> been> >>>>>>>>> clear.> >>>>>>>>>>>>> >>>>>>>>>>>> 3. I don’t think we expect people to have their own >>>>>>>>>>>> implementations> >>>>>>>>> of ProcessorContext. Since all implementations are internal, it’s >>>>>>>>> really> >>>>>>>> an> >>>>>>>>> implementation detail whether we use a default method, abstract >>>>>>>>> methods,> >>>>>>>> or> >>>>>>>>> concrete methods. I can’t think of an implementation that really >>>>>>>>> wants to> >>>>>>>>> just look up the system time. In the production code path, we cache >>>>>>>>> the> >>>>>>>>> time for performance, and in testing, we use a mock time.> >>>>>>>>>>>>> >>>>>>>>>>>> Thanks,> >>>>>>>>>>>> John> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jul 3, 2020, at 06:41, Piotr Smoliński wrote:> >>>>>>>>>>>> > 1. Makes sense; let me propose something> >>>>>>>>>>>> >> >>>>>>>>>>>> > 2. That's not testing-only. The goal is to use the same API to> >>>>>>>>> access> >>>>>>>>>>>> > the time> >>>>>>>>>>>> > in deployment and testing environments. The major driver is> >>>>>>>>>>>> > System.currentTimeMillis(),> >>>>>>>>>>>> > which a) cannot be used in tests b) could go in specific cases> >>>>>>>> back> >>>>>>>>> c)> >>>>>>>>>>>> > is not compatible> >>>>>>>>>>>> > with punctuator call. The idea is that we could access clock >>>>>>>>>>>> using> >>>>>>>>>>>> > uniform API.> >>>>>>>>>>>> > For completness we should have same API for system and stream> >>>>>>>> time.> >>>>>>>>>>>> >> >>>>>>>>>>>> > 3. There aren't that many subclasses. Two important ones are> >>>>>>>>>>>> > ProcessorContextImpl and> >>>>>>>>>>>> > MockProcessorContext (and third one:> >>>>>>>>>>>> > ForwardingDisableProcessorContext). If given> >>>>>>>>>>>> > implementation does not support schedule() call, there is no> >>>>>>>> reason> >>>>>>>>> to> >>>>>>>>>>>> > support clock access.> >>>>>>>>>>>> > The default implementation should just throw> >>>>>>>>>>>> > UnsupportedOperationException just to prevent> >>>>>>>>>>>> > from compilation errors in possible subclasses.> >>>>>>>>>>>> >> >>>>>>>>>>>> > On 2020/07/01 02:24:43, Boyang Chen <re...@gmail.com>> >>>>>>>>> wrote:> >>>>>>>>>>>> > > Thanks Will for the KIP. A couple questions and suggestions:> >>>>>>>>>>>> > >> >>>>>>>>>>>> > > 1. I think for new APIs to make most sense, we should add a> >>>>>>>>> minimal example> >>>>>>>>>>>> > > demonstrating how it could be useful to structure unit tests >>>>>>>>>>>> w/o> >>>>>>>>> the new> >>>>>>>>>>>> > > APIs.> >>>>>>>>>>>> > > 2. If this is a testing-only feature, could we only add it> >>>>>>>>>>>> > > to MockProcessorContext?> >>>>>>>>>>>> > > 3. Regarding the API, since this will be added to the> >>>>>>>>> ProcessorContext with> >>>>>>>>>>>> > > many subclasses, does it make sense to provide default> >>>>>>>>> implementations as> >>>>>>>>>>>> > > well?> >>>>>>>>>>>> > >> >>>>>>>>>>>> > > Boyang> >>>>>>>>>>>> > >> >>>>>>>>>>>> > > On Tue, Jun 30, 2020 at 6:56 PM William Bottrell <> >>>>>>>>> bottre...@gmail.com>> >>>>>>>>>>>> > > wrote:> >>>>>>>>>>>> > >> >>>>>>>>>>>> > > > Thanks, John! I made the change. How much longer should I >>>>>>>>>>>> let> >>>>>>>>> there be> >>>>>>>>>>>> > > > discussion before starting a VOTE?> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > > On Sat, Jun 27, 2020 at 6:50 AM John Roesler <> >>>>>>>>> vvcep...@apache.org> wrote:> >>>>>>>>>>>> > > >> >>>>>>>>>>>> > > > > Thanks, Will,> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > That looks good to me. I would only add "cached" or> >>>>>>>> something> >>>>>>>>>>>> > > > > to indicate that it wouldn't just transparently look up >>>>>>>>>>>> the> >>>>>>>>> current> >>>>>>>>>>>> > > > > System.currentTimeMillis every time.> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > For example:> >>>>>>>>>>>> > > > > /**> >>>>>>>>>>>> > > > > * Returns current cached wall-clock system timestamp in> >>>>>>>>> milliseconds.> >>>>>>>>>>>> > > > > *> >>>>>>>>>>>> > > > > * @return the current cached wall-clock system timestamp >>>>>>>>>>>> in> >>>>>>>>> milliseconds> >>>>>>>>>>>> > > > > */> >>>>>>>>>>>> > > > > long currentSystemTimeMs();> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > I don't want to give specific information about _when_> >>>>>>>>> exactly the> >>>>>>>>>>>> > > > > timestamp cache will be updated, so that we can adjust it >>>>>>>>>>>> in> >>>>>>>>> the> >>>>>>>>>>>> > > > > future, but it does seem important to make people aware >>>>>>>>>>>> that> >>>>>>>>> they> >>>>>>>>>>>> > > > > won't see the timestamp advance during the execution of> >>>>>>>>>>>> > > > > Processor.process(), for example.> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > With that modification, I'll be +1 on this proposal.> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > Thanks again for the KIP!> >>>>>>>>>>>> > > > > -John> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > > On Thu, Jun 25, 2020, at 02:32, William Bottrell wrote:> >>>>>>>>>>>> > > > > > Thanks, John! I appreciate you adjusting my lingo. I >>>>>>>>>>>> made> >>>>>>>>> the change to> >>>>>>>>>>>> > > > > the> >>>>>>>>>>>> > > > > > KIP. I will add the note about system time to the >>>>>>>>>>>> javadoc.> >>>>>>>>>>>> > > > > >> >>>>>>>>>>>> > > > > > On Wed, Jun 24, 2020 at 6:52 PM John Roesler <> >>>>>>>>> vvcep...@apache.org>> >>>>>>>>>>>> > > > > wrote:> >>>>>>>>>>>> > > > > >> >>>>>>>>>>>> > > > > > > Hi Will,> >>>>>>>>>>>> > > > > > >> >>>>>>>>>>>> > > > > > > This proposal looks good to me overall. Thanks for >>>>>>>>>>>> the> >>>>>>>>> contribution!> >>>>>>>>>>>> > > > > > >> >>>>>>>>>>>> > > > > > > Just a couple of minor notes:> >>>>>>>>>>>> > > > > > >> >>>>>>>>>>>> > > > > > > The system time method would return a cached >>>>>>>>>>>> timestamp> >>>>>>>>> that Streams> >>>>>>>>>>>> > > > > looks> >>>>>>>>>>>> > > > > > > up once when it starts processing a record. This may >>>>>>>>>>>> be> >>>>>>>>> confusing, so> >>>>>>>>>>>> > > > > it> >>>>>>>>>>>> > > > > > > might be good to state it in the javadoc.> >>>>>>>>>>>> > > > > > >> >>>>>>>>>>>> > > > > > > I thought the javadoc for the stream time might be a >>>>>>>>>>>> bit> >>>>>>>>> confusing.> >>>>>>>>>>>> > > > We> >>>>>>>>>>>> > > > > > > normally talk about “Tasks” not “partition groups” >>>>>>>>>>>> in> >>>>>>>> the> >>>>>>>>> public api.> >>>>>>>>>>>> > > > > Maybe> >>>>>>>>>>>> > > > > > > just saying that it’s “the maximum timestamp of any> >>>>>>>>> record yet> >>>>>>>>>>>> > > > > processed by> >>>>>>>>>>>> > > > > > > the task” would be both high level an > [message truncated...] >