Using an S3 bucket containing the configuration is the way to go.
1) web sockets, or more generally all approaches where you connect to
the source
The JobGraph won't help you; it doesn't contain the information on where
tasks are deployed to at runtime. It is just an abstract representation
of your job.
You could theoretically retrieve the actual location through the REST
API, and maybe expose the port as a metric.
But then you still have to deal with resolving IPs, internal/external
IPs and all that jazz.
2) CoProcessFunction
We still have to get the data in somehow; so you'd need to have some
source in any case :)
3) ParameterTool
This is really just a parsing tool, so it won't help for this use-case.
4) State Processing API
A bit too complicated. If restarting jobs is an option, you could just
encode the commands into the source, emit them as an event of sort, and
the process function updates it's state on reception of these events.
On 15/07/2020 10:00, Tom Wells wrote:
Hi Everyone
I'm looking for some advice on designing my operators (which
unsurprisingly tend to take the form of SourceFunctions,
ProcessFunctions or SinkFunctions) to allow them to be "dynamically
configured" while running.
By way of example, I have a SourceFunction which collects the names of
various S3 buckets, and then a ProcessFunction which reads and
collects their contents. The gotcha is that the list of S3 buckets is
not fixed, and can be changed during the lifetime of the job. This
add/remove action would be done by some human administrator, and lets
say using a simple command line tool.
For example - here is an idea of what I want to build to "communicate"
with my flink job:
```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1
--access-key <blah>...
# Get a list of the s3 buckets we're currently processing, and when
last they were last accessed
$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago
# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```
Hope that gives you an idea - of course this could apply to any number
of different source types, and could even extend to configuration of
sinks etc too.
So - how should my command line tool communicate with my operators?
4 alternative approaches I've thought about:
- Have a SourceFunction open a websocket and listen for bucket
add/remove commands (written to by the command line tool). I think
this would work, but the difficulty is in figuring out where exactly
the SourceFunction might be deployed in the flink cluster to find the
websocket listening port. I took a look at the ClusterClient API and
it's possibly available by inspecting the JobGraph... I'm just not
sure if this is an anti-pattern?
- Use a CoProcessFunction instead, and have it be joined with a
DataStream that I can somehow write to directly from the command line
tool (maybe using flink-client api - can i write to a DataStream
directly??). I don't think this is possible but would feel like a good
clean approach?
- Somehow using the ParameterTool. I don't think it supports a dynamic
use-case though?
- Writing directly to the saved state of a ProcessFunction to add the
remove bucket names. I'm pretty unfamiliar with this approach - but
looks possible according to the docs on the State Processor API -
however it seems like I would have to read the savepoint, write the
updates, then restore from savepoint which may mean suspending and
resuming the job while that happens. Not really an issue for me, but
does feel like possibly the wrong approach for my simple requirement.
- Solve it just using datasources - e.g. create a centrally read s3
bucket which holds the latest configuration and is sourced and joined
by every operator (probably using Broadcast State). My command line
tool would then just have to write to that S3 bucket - no need to
communicate directly with the operators.
The last option is fairly obvious and probably my default approach -
I'm just wondering if whether any of the alternatives above are worth
investigating. (Especially considering my endless quest to learn
everything about Flink - i don't mind exploring the less obvious
pathways).
I would love some guidance or advice on what you feel is the best
approach / idiomatic approach for this.
All the best,
Tom