Hi! The "feedback loop" sounds like a solution, yes. Actually, that works well with the CoMap / CoFlatMap - one input to the CoMap would be the original value, the other input the feedback value.
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations Once Flink's iterations are better hardened, these could be used for feedback as well, and would be exactly once. Best, Stephan On Tue, Aug 23, 2016 at 9:05 PM, Michael Warnock <mich...@ripple.com> wrote: > Another approach I'm considering, which feels pretty kludgy, but I think > should be acceptable for my current use: > > Only one stateful op, keyed on the same field, but with a flag field > indicating the actual operation to be performed. The results of this op > are output to a kafka (or whatever) queue, which is ingested along with the > first stream. The two state changes don't have to be atomic for my case, > but the second one does have to be guaranteed to eventually happen, and be > idempotent. I'm not quite sure how to (safely) make that second pass > idempotent though, at the moment, and I'm not sure if there might be other > issues with it I'm not seeing - it definitely doesn't _feel_ like a great > solution. > > Any thoughts? > > On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock <mich...@ripple.com> > wrote: > >> Thanks for the quick response! >> >> I've been wondering about Connected streams and CoFlatMap, but either I >> don't see all the ways they can be used, or they don't solve my problem. >> Do you know of any examples outside of the documentation? My searches for >> "flink comap example" and similar haven't turned anything up. >> >> On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> This is a tricky one. State access and changes are not shared across >>> operators in Flink. >>> We chose that design because it makes it possible to work on "local" >>> state in each operator >>> - state automatically shards with the computation >>> - no locking / concurrency implications >>> - asynchronous persistence >>> >>> Sharing state across operations between two operations in the same stage >>> works with the CoMap / CoFlatMap functions >>> Sharing state across successive nodes does not work, because the >>> functions could be executed on different machines and one would need to do >>> remote and synchronized state updates that way. >>> >>> Do you think you can use the CoMap / CoFlatMap functions for this? >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mich...@ripple.com> >>> wrote: >>> >>>> I'm trying to do something that seems like it should be possible, but >>>> my implementation doesn't behave as expected, and I'm not sure how else to >>>> express it. >>>> >>>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) >>>> and I want to keyBy(1), flatMap with state associated with Alice, then >>>> keyBy(2) with state associated with Bob. The trick is, when I later get a >>>> tuple like (Bob, Alice, 1), I want the first operator to see the state that >>>> was updated in the second op previously. Is this possible? I tried >>>> implementing both operators as one, getting the state by descriptor in the >>>> flatMap body, and even instantiating the operator only once; the behavior >>>> is, as you might guess, that the state in stage 1 doesn't include changes >>>> made previously in stage 2. >>>> >>>> Is there any way to do this without throwing away the parallelism? >>>> >>>> Thanks in advance! >>>> ~Michael >>>> >>> >>> >> >