Hi Kevin, Thanks for your proposal. It is quite interesting. We had a similar solution back to the Apache Storm era many years ago. The job deployment with operators itself could be considered as physical distributed task containers. There will be a management tool, like a control plane, to deploy the dynamic logic, e.g. groovy/Bash script you mentioned in your proposal or CEP rules into the job. In this way, there will be a physical job deployed to the cluster and a virtual job (the dynamic script) running within it. If it is managed carefully, we could even dynamically control the traffic, e.g. KeyedStream. There will be one interesting question: did you consider the data consistency? E.g. how to guarantee the Exactly-Once Semantics after the logic has been changed dynamically? Looking forward to hearing from you.
Best regards, Jing On Wed, Aug 2, 2023 at 4:33 AM Kevin Tseng <apa...@ae-service.app> wrote: > Hello Dev Mailing List, > > I have a feature proposition that I am interested to hear more and to > gather feedback from everyone on the mailing list. > > (please view this email in monospace font when possible) > > _*Proposal*_* > * > > Currently, when Flink DataStream API is to be utilized, every data > processing logic will need to be written and compiled before job > deployment. And if there's any change to the logic the code generally need > to be recompiled. > > To support the use case of a more generic deployment process we can take > inspiration from platforms such as Apache Airflow and Apache Spark, where > the support for externalizing operator logic exist. > > This can be achieved by extending Default Flink Operators used for filter, > process, keyBy, flatMap, map ... etc. As we are merely adding an > externalized component "Script" that gets executed instead of pre-compiled > code. > > Surface Level implementation idea: > > ScriptFilterFunction extends RichFilterFunction > ScriptProcessFunction extends RichProcessFunction > ScriptMapFunction extends RichMapFunction > …etc > > -------------------- > | RichFilterFunction | > -------------------- > ▲ > | > ---------------------- > | ScriptFilterFunction | > ---------------------- > > We can add support progressively, not every operator will need its script > counterpart in the initial release. > > Every supported operator will have its Script counterpart where each one > of them will have at least one Implemented ScriptExecutor interface; > Co*Function may require two. We may also add metrics corresponding to each > extended function to track script performance. > > There will also be a default ScriptExecutor impl that does nothing if user > does not define a script to be used. > > ScriptExecutor is an interface that can be implemented to support various > externalized script languages. > The interface will include two abstract methods setup() and execute() / > execute(...args) where > > • setup is used to initialize any execution engine or compilation > requirement > • execute is used to handle invocation of the script function given an > optional input object > > ---------------- > | ScriptExecutor | > ---------------- > △ ◸ > | ⟍ > | ⟍ > | ⟍ > -------------------- ---------------------- > | BashScriptExecutor | | GroovyScriptExecutor | > -------------------- ---------------------- > > Depending on the ScriptExecutor, it will either accept file path(s) that > utilizes Flink’s own FileSystem API for script file loading, or script text > that defines the processing logic. > > _*Use Case*_ > > Supposed we have a super generic Job that does the following: > > [Kafka] -> [Filter] -> [Kafka] > > Instead of having to recompile DataStream API based binary every time > filter condition is to be changed, we can now specify a file (may even be > from a job parameter or some sort of configuration file) that’s located on > an object storage or remote filesystem supported by Flink, and simply > redeploy the job. > > Same job setup can also be used to support multiple similar job with minor > differences in filtering criteria. > > Given that Flink is written in Scala & Java, JVM based language such as > Groovy & Scala scripts that can be compiled into JVM Byte Code at runtime > can be supported easily. And performance will generally be on-par as > pre-compiled deployable binary. > > By making this part of Flink Operator Set, we will be able to open up an > universal script support that rivals other platform and framework. > > Looking forward to hearing back from the community with regards to this > feature. > > Thanks, > > K