Hi Everyone

I'm looking for some advice on designing my operators (which unsurprisingly
tend to take the form of SourceFunctions, ProcessFunctions or
SinkFunctions) to allow them to be "dynamically configured" while running.

By way of example, I have a SourceFunction which collects the names of
various S3 buckets, and then a ProcessFunction which reads and collects
their contents. The gotcha is that the list of S3 buckets is not fixed, and
can be changed during the lifetime of the job. This add/remove action would
be done by some human administrator, and lets say using a simple command
line tool.

For example - here is an idea of what I want to build to "communicate" with
my flink job:

```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1
--access-key <blah>...

# Get a list of the s3 buckets we're currently processing, and when last
they were last accessed
$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago

# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```

Hope that gives you an idea - of course this could apply to any number of
different source types, and could even extend to configuration of sinks etc
too.

So - how should my command line tool communicate with my operators?

4 alternative approaches I've thought about:

- Have a SourceFunction open a websocket and listen for bucket add/remove
commands (written to by the command line tool). I think this would work,
but the difficulty is in figuring out where exactly the SourceFunction
might be deployed in the flink cluster to find the websocket listening
port. I took a look at the ClusterClient API and it's possibly available by
inspecting the JobGraph... I'm just not sure if this is an anti-pattern?

- Use a CoProcessFunction instead, and have it be joined with a DataStream
that I can somehow write to directly from the command line tool (maybe
using flink-client api - can i write to a DataStream directly??). I don't
think this is possible but would feel like a good clean approach?

- Somehow using the ParameterTool. I don't think it supports a dynamic
use-case though?

- Writing directly to the saved state of a ProcessFunction to add the
remove bucket names. I'm pretty unfamiliar with this approach - but looks
possible according to the docs on the State Processor API - however it
seems like I would have to read the savepoint, write the updates, then
restore from savepoint which may mean suspending and resuming the job while
that happens. Not really an issue for me, but does feel like possibly the
wrong approach for my simple requirement.

- Solve it just using datasources - e.g. create a centrally read s3 bucket
which holds the latest configuration and is sourced and joined by every
operator (probably using Broadcast State). My command line tool would then
just have to write to that S3 bucket - no need to communicate directly with
the operators.

The last option is fairly obvious and probably my default approach - I'm
just wondering if whether any of the alternatives above are worth
investigating. (Especially considering my endless quest to learn everything
about Flink - i don't mind exploring the less obvious pathways).

I would love some guidance or advice on what you feel is the best approach
/ idiomatic approach for this.

All the best,
Tom

Reply via email to