Adam,

That is my understanding as well, that the context is associated with a
namespace, so as long as you run both functions in the same namespace you
shouldn't have any issues.

HTH,
David K.

On Wed, Jul 3, 2019 at 10:51 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi David
>
> Thanks for the fast reply. Just one follow up question.
>
> The context object provides access to the state state store in BookKeeper -
> is that context shared globally across all functions, or is it limited in
> scope to a given namespace unique to our two functions in question (ie:
> consumer group in Kafka)? From what I've read it looks like they're
> separated by namespace, which would be ideal, but just wanted to confirm.
>
> Thanks again, and also for the pseudo code as well.
>
> Adam
>
>
> On Wed, Jul 3, 2019 at 12:10 PM David Kjerrumgaard <da...@streaml.io>
> wrote:
>
> > Hi Adam,
> >
> > Your best bet for this might be to implement 2 separate Stateful Pulsar
> > functions, each consuming from a different topic. The basic idea is to
> use
> > the stateful context in order to share information about which timestamp
> > they are processing and use that information to decide whether or not to
> > process the next record.Something like the following pseudo-code....
> >
> > public class SyncFunctionA implements Function<String, String> {
> >     @Override
> >     public void apply(String input, Context ctx) {
> >
> >     // Topic A: key=userId, value = (timestamp, productId)
> >     long msg_ts = input.getTimestamp();   // Assume you know how to parse
> > the value
> >     long topicB_ts = Long.parse(ctx.getState("topicB_ts_key"));
> >     long max_delta = ctx.getUserConfigValue("max_delta");
> >
> >     while (Math.abs(msg_ts - topicB_ts) > max_delta) {
> >     Thread.sleep(100);  // wait until we are "in sync"
> >     }
> >
> >     // Process Topic A message
> >
> >     ctx.putState("topicA_ts_key", msg_ts);
> >
> >     }
> > }
> >
> > Regards,
> > David
> >
> > On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> > > Hi All
> > >
> > > The use-case is pretty simple. Lets say we have a history of events
> with
> > > the following:
> > > key=userId, value = (timestamp, productId)
> > >
> > > and we want to remap it to:
> > > key=productId, value=(original_timestamp, userId)
> > >
> > > Now, say I have 30 days of backlog, and 2 input topics with the
> original
> > > events. I spin up 2 instances and let them process the data from the
> > > beginning of time, but one instance is only half as powerful (less CPU,
> > > Mem, etc), such that instance 0 processes X events / sec which
> instance 1
> > > processes x/2 events /sec.
> > >
> > > My question is: How do I allow these two consumer instances to remain
> in
> > > sync *according to their timestamps* (not offsets) as they consume from
> > > these topics? I don't want to see events with original_timestamps out
> of
> > > order by more than, say, 60 seconds.  I am looking for any existing
> tech
> > > that would effectively say *"cap the time difference of events coming
> out
> > > of each processor at 60 seconds max".* If one processor is too far
> ahead
> > of
> > > the other, stop and wait for it to catch up.
> > >
> > > To me it seems like the shared consumer may *implicitly* offer a close
> > > option, but I am concerned that one input topic would be consumed and
> > > processed much too far ahead of the other input topic, such that the
> > > arbitrary 60s window is not observed. I have been digging through the
> > code
> > > and from my understanding of it it seems that there is no guarantee in
> > > offset consumption between topics, and that it's on a per subscription
> > > basis.
> > >
> > > Anyways, if anyone has any information they could lend to helping me
> > solve
> > > this sort of qualitative issue I would be very grateful. I'm coming
> from
> > > the Kafka world so I apologize if my terminology isn't quite on point.
> > >
> > > Thanks
> > > Adam
> > >
> >
> >
> > --
> > Regards,
> >
> > David Kjerrumgaard
> > Director - Solution Architecture
> > Streamlio
> > Cell: 330-437-5467
> >
>


-- 
Regards,

David Kjerrumgaard
Director - Solution Architecture
Streamlio
Cell: 330-437-5467

Reply via email to