Hi Kevin,

Thanks for your proposal. It is quite interesting. We had a similar
solution back to the Apache Storm era many years ago. The job deployment
with operators itself could be considered as physical distributed task
containers. There will be a management tool, like a control plane, to
deploy the dynamic logic, e.g. groovy/Bash script you mentioned in your
proposal or CEP rules into the job. In this way, there will be a
physical job deployed to the cluster and a virtual job (the dynamic script)
running within it. If it is managed carefully, we could even dynamically
control the traffic, e.g. KeyedStream. There will be one interesting
question: did you consider the data consistency? E.g. how to guarantee the
Exactly-Once Semantics after the logic has been changed dynamically?
Looking forward to hearing from you.

Best regards,
Jing

On Wed, Aug 2, 2023 at 4:33 AM Kevin Tseng <apa...@ae-service.app> wrote:

> Hello Dev Mailing List,
>
> I have a feature proposition that I am interested to hear more and to
> gather feedback from everyone on the mailing list.
>
> (please view this email in monospace font when possible)
>
> _*Proposal*_*
> *
>
> Currently, when Flink DataStream API is to be utilized, every data
> processing logic will need to be written and compiled before job
> deployment. And if there's any change to the logic the code generally need
> to be recompiled.
>
> To support the use case of a more generic deployment process we can take
> inspiration from platforms such as Apache Airflow and Apache Spark, where
> the support for externalizing operator logic exist.
>
> This can be achieved by extending Default Flink Operators used for filter,
> process, keyBy, flatMap, map ... etc. As we are merely adding an
> externalized component "Script" that gets executed instead of pre-compiled
> code.
>
> Surface Level implementation idea:
>
> ScriptFilterFunction extends RichFilterFunction
> ScriptProcessFunction extends RichProcessFunction
> ScriptMapFunction extends RichMapFunction
> …etc
>
>  --------------------
> | RichFilterFunction |
>  --------------------
>            ▲
>            |
>  ----------------------
> | ScriptFilterFunction |
>  ----------------------
>
> We can add support progressively, not every operator will need its script
> counterpart in the initial release.
>
> Every supported operator will have its Script counterpart where each one
> of them will have at least one Implemented ScriptExecutor interface;
> Co*Function may require two. We may also add metrics corresponding to each
> extended function to track script performance.
>
> There will also be a default ScriptExecutor impl that does nothing if user
> does not define a script to be used.
>
> ScriptExecutor is an interface that can be implemented to support various
> externalized script languages.
> The interface will include two abstract methods setup() and execute() /
> execute(...args) where
>
> • setup is used to initialize any execution engine or compilation
> requirement
> • execute is used to handle invocation of the script function given an
> optional input object
>
>    ----------------
>   | ScriptExecutor |
>    ----------------
>            △        ◸
>            |          ⟍
>            |            ⟍
>            |              ⟍
>  --------------------        ----------------------
> | BashScriptExecutor |      | GroovyScriptExecutor |
>  --------------------        ----------------------
>
> Depending on the ScriptExecutor, it will either accept file path(s) that
> utilizes Flink’s own FileSystem API for script file loading, or script text
> that defines the processing logic.
>
> _*Use Case*_
>
> Supposed we have a super generic Job that does the following:
>
> [Kafka] -> [Filter] -> [Kafka]
>
> Instead of having to recompile DataStream API based binary every time
> filter condition is to be changed, we can now specify a file (may even be
> from a job parameter or some sort of configuration file) that’s located on
> an object storage or remote filesystem supported by Flink, and simply
> redeploy the job.
>
> Same job setup can also be used to support multiple similar job with minor
> differences in filtering criteria.
>
> Given that Flink is written in Scala & Java, JVM based language such as
> Groovy & Scala scripts that can be compiled into JVM Byte Code at runtime
> can be supported easily. And performance will generally be on-par as
> pre-compiled deployable binary.
>
> By making this part of Flink Operator Set, we will be able to open up an
> universal script support that rivals other platform and framework.
>
> Looking forward to hearing back from the community with regards to this
> feature.
>
> Thanks,
>
> K

Reply via email to