Hi Arvid - thanks for the welcome :) The SinkFunction is custom (extends RichSinkFunction), and writes to a legacy web service over HTTP.
I’ll investigate the keyBy+KeyedProcessFunction further - thanks. Frankly I looked at this but I think I was confusing myself between working with KV store (database thinking) and the new (to me) world of "stream state". Additionally, if I move my SinkFunction functionality (HTTP POST, PUT etc) into the KeyedProcess, I assume I would use a DiscardingSink to terminate the flow, or is this an anti-pattern? Many thanks Paul > On 9 Jun 2021, at 13:02, Arvid Heise <ar...@apache.org> wrote: > > Hi Paul, > > Welcome to the club! > > What's your SinkFunction? Is it custom? If so, you could also implement > CheckpointedFunction to read and write data. > Here you could use OperatorStateStore and with it the BroadcastState. > However, broadcast state is quite heavy (it sends all data to all instances, > so it doesn't scale). > > A better way would be to have a keyBy+KeyedProcessFunction before the sink > function. You could keyBy your key and use a normal value state [1] to store > the data point. If you configure your state backend to be rocksdb [2]. Then > you have everything together. > > Note that you could also have it next to sink function. There is no reason to > not have a dangling operator (no sink). > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state > > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state> > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/ > > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/> > On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore <paulkmo...@gmail.com > <mailto:paulkmo...@gmail.com>> wrote: > Hi all, > > First post here, so please be kind :) > > Firstly some context; I have the following high-level job topology: > > (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction > > 1. The FlinkPulsarSource reads event notifications about article updates from > a Pulsar topic > 2. The RichAsyncFunction fetches the “full” article from the specified URL > end-point, and transmutes it into a “legacy” article format > 3. The SinkFunction writes the “legacy” article to a (legacy) web platform > i.e. the sink is effectively another web site > > I have this all up and running (despite lots of shading fun). > > When the SinkFunction creates an article on the legacy platform it returns an > 'HTTP 201 - Created’ with a Location header suitably populated. > > Now, I need to persist that Location header and, more explicitly, need to > persist a map between the URLs for the new and legacy platforms. This is > needed for latter update and delete processing. > > The question is how do I store this mapping information? > > I’ve spent some time trying to grok state management and the various > backends, but from what I can see the state management is focused on > “operator scoped” state. This seems reasonable given the requirement for > barriers etc to ensure accurate recovery. > > However, I need some persistence between operators (shared state?) and with > longevity beyond the processing of an operator. > > My gut reaction is that I need an external K-V store such as Ignite (or > similar). Frankly given that Flink ships with embedded RocksDB I was hoping > to use that, but there seems no obvious way to do this, and lots of advice > saying don’t :) > > Am I missing something obvious here? > > Many thanks in advance > > Paul > >