Hi David Thanks for the fast reply. Just one follow up question.
The context object provides access to the state state store in BookKeeper - is that context shared globally across all functions, or is it limited in scope to a given namespace unique to our two functions in question (ie: consumer group in Kafka)? From what I've read it looks like they're separated by namespace, which would be ideal, but just wanted to confirm. Thanks again, and also for the pseudo code as well. Adam On Wed, Jul 3, 2019 at 12:10 PM David Kjerrumgaard <da...@streaml.io> wrote: > Hi Adam, > > Your best bet for this might be to implement 2 separate Stateful Pulsar > functions, each consuming from a different topic. The basic idea is to use > the stateful context in order to share information about which timestamp > they are processing and use that information to decide whether or not to > process the next record.Something like the following pseudo-code.... > > public class SyncFunctionA implements Function<String, String> { > @Override > public void apply(String input, Context ctx) { > > // Topic A: key=userId, value = (timestamp, productId) > long msg_ts = input.getTimestamp(); // Assume you know how to parse > the value > long topicB_ts = Long.parse(ctx.getState("topicB_ts_key")); > long max_delta = ctx.getUserConfigValue("max_delta"); > > while (Math.abs(msg_ts - topicB_ts) > max_delta) { > Thread.sleep(100); // wait until we are "in sync" > } > > // Process Topic A message > > ctx.putState("topicA_ts_key", msg_ts); > > } > } > > Regards, > David > > On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > > > Hi All > > > > The use-case is pretty simple. Lets say we have a history of events with > > the following: > > key=userId, value = (timestamp, productId) > > > > and we want to remap it to: > > key=productId, value=(original_timestamp, userId) > > > > Now, say I have 30 days of backlog, and 2 input topics with the original > > events. I spin up 2 instances and let them process the data from the > > beginning of time, but one instance is only half as powerful (less CPU, > > Mem, etc), such that instance 0 processes X events / sec which instance 1 > > processes x/2 events /sec. > > > > My question is: How do I allow these two consumer instances to remain in > > sync *according to their timestamps* (not offsets) as they consume from > > these topics? I don't want to see events with original_timestamps out of > > order by more than, say, 60 seconds. I am looking for any existing tech > > that would effectively say *"cap the time difference of events coming out > > of each processor at 60 seconds max".* If one processor is too far ahead > of > > the other, stop and wait for it to catch up. > > > > To me it seems like the shared consumer may *implicitly* offer a close > > option, but I am concerned that one input topic would be consumed and > > processed much too far ahead of the other input topic, such that the > > arbitrary 60s window is not observed. I have been digging through the > code > > and from my understanding of it it seems that there is no guarantee in > > offset consumption between topics, and that it's on a per subscription > > basis. > > > > Anyways, if anyone has any information they could lend to helping me > solve > > this sort of qualitative issue I would be very grateful. I'm coming from > > the Kafka world so I apologize if my terminology isn't quite on point. > > > > Thanks > > Adam > > > > > -- > Regards, > > David Kjerrumgaard > Director - Solution Architecture > Streamlio > Cell: 330-437-5467 >