Hi Adam, Your best bet for this might be to implement 2 separate Stateful Pulsar functions, each consuming from a different topic. The basic idea is to use the stateful context in order to share information about which timestamp they are processing and use that information to decide whether or not to process the next record.Something like the following pseudo-code....
public class SyncFunctionA implements Function<String, String> { @Override public void apply(String input, Context ctx) { // Topic A: key=userId, value = (timestamp, productId) long msg_ts = input.getTimestamp(); // Assume you know how to parse the value long topicB_ts = Long.parse(ctx.getState("topicB_ts_key")); long max_delta = ctx.getUserConfigValue("max_delta"); while (Math.abs(msg_ts - topicB_ts) > max_delta) { Thread.sleep(100); // wait until we are "in sync" } // Process Topic A message ctx.putState("topicA_ts_key", msg_ts); } } Regards, David On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi All > > The use-case is pretty simple. Lets say we have a history of events with > the following: > key=userId, value = (timestamp, productId) > > and we want to remap it to: > key=productId, value=(original_timestamp, userId) > > Now, say I have 30 days of backlog, and 2 input topics with the original > events. I spin up 2 instances and let them process the data from the > beginning of time, but one instance is only half as powerful (less CPU, > Mem, etc), such that instance 0 processes X events / sec which instance 1 > processes x/2 events /sec. > > My question is: How do I allow these two consumer instances to remain in > sync *according to their timestamps* (not offsets) as they consume from > these topics? I don't want to see events with original_timestamps out of > order by more than, say, 60 seconds. I am looking for any existing tech > that would effectively say *"cap the time difference of events coming out > of each processor at 60 seconds max".* If one processor is too far ahead of > the other, stop and wait for it to catch up. > > To me it seems like the shared consumer may *implicitly* offer a close > option, but I am concerned that one input topic would be consumed and > processed much too far ahead of the other input topic, such that the > arbitrary 60s window is not observed. I have been digging through the code > and from my understanding of it it seems that there is no guarantee in > offset consumption between topics, and that it's on a per subscription > basis. > > Anyways, if anyone has any information they could lend to helping me solve > this sort of qualitative issue I would be very grateful. I'm coming from > the Kafka world so I apologize if my terminology isn't quite on point. > > Thanks > Adam > -- Regards, David Kjerrumgaard Director - Solution Architecture Streamlio Cell: 330-437-5467