Adam, That is my understanding as well, that the context is associated with a namespace, so as long as you run both functions in the same namespace you shouldn't have any issues.
HTH, David K. On Wed, Jul 3, 2019 at 10:51 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi David > > Thanks for the fast reply. Just one follow up question. > > The context object provides access to the state state store in BookKeeper - > is that context shared globally across all functions, or is it limited in > scope to a given namespace unique to our two functions in question (ie: > consumer group in Kafka)? From what I've read it looks like they're > separated by namespace, which would be ideal, but just wanted to confirm. > > Thanks again, and also for the pseudo code as well. > > Adam > > > On Wed, Jul 3, 2019 at 12:10 PM David Kjerrumgaard <da...@streaml.io> > wrote: > > > Hi Adam, > > > > Your best bet for this might be to implement 2 separate Stateful Pulsar > > functions, each consuming from a different topic. The basic idea is to > use > > the stateful context in order to share information about which timestamp > > they are processing and use that information to decide whether or not to > > process the next record.Something like the following pseudo-code.... > > > > public class SyncFunctionA implements Function<String, String> { > > @Override > > public void apply(String input, Context ctx) { > > > > // Topic A: key=userId, value = (timestamp, productId) > > long msg_ts = input.getTimestamp(); // Assume you know how to parse > > the value > > long topicB_ts = Long.parse(ctx.getState("topicB_ts_key")); > > long max_delta = ctx.getUserConfigValue("max_delta"); > > > > while (Math.abs(msg_ts - topicB_ts) > max_delta) { > > Thread.sleep(100); // wait until we are "in sync" > > } > > > > // Process Topic A message > > > > ctx.putState("topicA_ts_key", msg_ts); > > > > } > > } > > > > Regards, > > David > > > > On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <adam.bellem...@gmail.com> > > wrote: > > > > > Hi All > > > > > > The use-case is pretty simple. Lets say we have a history of events > with > > > the following: > > > key=userId, value = (timestamp, productId) > > > > > > and we want to remap it to: > > > key=productId, value=(original_timestamp, userId) > > > > > > Now, say I have 30 days of backlog, and 2 input topics with the > original > > > events. I spin up 2 instances and let them process the data from the > > > beginning of time, but one instance is only half as powerful (less CPU, > > > Mem, etc), such that instance 0 processes X events / sec which > instance 1 > > > processes x/2 events /sec. > > > > > > My question is: How do I allow these two consumer instances to remain > in > > > sync *according to their timestamps* (not offsets) as they consume from > > > these topics? I don't want to see events with original_timestamps out > of > > > order by more than, say, 60 seconds. I am looking for any existing > tech > > > that would effectively say *"cap the time difference of events coming > out > > > of each processor at 60 seconds max".* If one processor is too far > ahead > > of > > > the other, stop and wait for it to catch up. > > > > > > To me it seems like the shared consumer may *implicitly* offer a close > > > option, but I am concerned that one input topic would be consumed and > > > processed much too far ahead of the other input topic, such that the > > > arbitrary 60s window is not observed. I have been digging through the > > code > > > and from my understanding of it it seems that there is no guarantee in > > > offset consumption between topics, and that it's on a per subscription > > > basis. > > > > > > Anyways, if anyone has any information they could lend to helping me > > solve > > > this sort of qualitative issue I would be very grateful. I'm coming > from > > > the Kafka world so I apologize if my terminology isn't quite on point. > > > > > > Thanks > > > Adam > > > > > > > > > -- > > Regards, > > > > David Kjerrumgaard > > Director - Solution Architecture > > Streamlio > > Cell: 330-437-5467 > > > -- Regards, David Kjerrumgaard Director - Solution Architecture Streamlio Cell: 330-437-5467