Another approach I'm considering, which feels pretty kludgy, but I think should be acceptable for my current use:
Only one stateful op, keyed on the same field, but with a flag field indicating the actual operation to be performed. The results of this op are output to a kafka (or whatever) queue, which is ingested along with the first stream. The two state changes don't have to be atomic for my case, but the second one does have to be guaranteed to eventually happen, and be idempotent. I'm not quite sure how to (safely) make that second pass idempotent though, at the moment, and I'm not sure if there might be other issues with it I'm not seeing - it definitely doesn't _feel_ like a great solution. Any thoughts? On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock <mich...@ripple.com> wrote: > Thanks for the quick response! > > I've been wondering about Connected streams and CoFlatMap, but either I > don't see all the ways they can be used, or they don't solve my problem. > Do you know of any examples outside of the documentation? My searches for > "flink comap example" and similar haven't turned anything up. > > On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> This is a tricky one. State access and changes are not shared across >> operators in Flink. >> We chose that design because it makes it possible to work on "local" >> state in each operator >> - state automatically shards with the computation >> - no locking / concurrency implications >> - asynchronous persistence >> >> Sharing state across operations between two operations in the same stage >> works with the CoMap / CoFlatMap functions >> Sharing state across successive nodes does not work, because the >> functions could be executed on different machines and one would need to do >> remote and synchronized state updates that way. >> >> Do you think you can use the CoMap / CoFlatMap functions for this? >> >> Greetings, >> Stephan >> >> >> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mich...@ripple.com> >> wrote: >> >>> I'm trying to do something that seems like it should be possible, but my >>> implementation doesn't behave as expected, and I'm not sure how else to >>> express it. >>> >>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) >>> and I want to keyBy(1), flatMap with state associated with Alice, then >>> keyBy(2) with state associated with Bob. The trick is, when I later get a >>> tuple like (Bob, Alice, 1), I want the first operator to see the state that >>> was updated in the second op previously. Is this possible? I tried >>> implementing both operators as one, getting the state by descriptor in the >>> flatMap body, and even instantiating the operator only once; the behavior >>> is, as you might guess, that the state in stage 1 doesn't include changes >>> made previously in stage 2. >>> >>> Is there any way to do this without throwing away the parallelism? >>> >>> Thanks in advance! >>> ~Michael >>> >> >> >