Hi Jing, Thanks for the input, that's exactly how I have imagined the standard deployment process.
With regards to the Exactly-Once guarantee as per my understanding: Current Exactly-Once semantics are performed via Two Phase Commit that is done at both Source and Sink (plus any checkpoint enabled intermediary operators). At minimum required both terminal operators to work in tandem to achieve the guarantee. In all cases this implementation is achieved via utilizing Sink destination medium commit feature such as Multipart Upload for S3 and Transactional commit for DB / Kafka. Sink Operator tend to revert the unfinished commit, and let source regenerate the data via restoration of the checkpointed state. In the event of potential logic alteration that introduced inconsistency in the output, as Script Operator is currently not designated for either source or sink, i don't believe it violates any exactly-once semantic as checkpointing flow will remain consistent with Two Phase Commit paradigm. Even if the logic has been altered this will still enforce the same "each record will only be processed exactly once", as the commit boundary is decided by checkpointing process, not the record themselves. The only possible non-deterministic situation that could arise would be the case that the intermediary operator depended on specific states to generate corresponding outputs (e.g window). Options to handle this type of scenario would be: 1. have operator state categorized by the checksum value of the overall script (we can use CRC32 for this purpose), so any change to the script will essentially start with empty state or previously created one. 2. assumes that data induced state changes are expected by the end user, and that the user is aware of existing state being used on data produced by new processing logic. In this case we do nothing, and leave the handling to the implementer of the Script Operator. Not sure if this answers the question? Truly appreciate the response! Best regards, Kevin > On Fri, Aug 4, 2023, at 8:43 PM, Jing Ge wrote: >> Hi Kevin, >> >> Thanks for your proposal. It is quite interesting. We had a similar >> solution back to the Apache Storm era many years ago. The job deployment >> with operators itself could be considered as physical distributed task >> containers. There will be a management tool, like a control plane, to >> deploy the dynamic logic, e.g. groovy/Bash script you mentioned in your >> proposal or CEP rules into the job. In this way, there will be a >> physical job deployed to the cluster and a virtual job (the dynamic script) >> running within it. If it is managed carefully, we could even dynamically >> control the traffic, e.g. KeyedStream. There will be one interesting >> question: did you consider the data consistency? E.g. how to guarantee the >> Exactly-Once Semantics after the logic has been changed dynamically? >> Looking forward to hearing from you. >> >> Best regards, >> Jing >> >> On Wed, Aug 2, 2023 at 4:33 AM Kevin Tseng <apa...@ae-service.app> wrote: >> >> > Hello Dev Mailing List, >> > >> > I have a feature proposition that I am interested to hear more and to >> > gather feedback from everyone on the mailing list. >> > >> > (please view this email in monospace font when possible) >> > >> > _*Proposal*_* >> > * >> > >> > Currently, when Flink DataStream API is to be utilized, every data >> > processing logic will need to be written and compiled before job >> > deployment. And if there's any change to the logic the code generally need >> > to be recompiled. >> > >> > To support the use case of a more generic deployment process we can take >> > inspiration from platforms such as Apache Airflow and Apache Spark, where >> > the support for externalizing operator logic exist. >> > >> > This can be achieved by extending Default Flink Operators used for filter, >> > process, keyBy, flatMap, map ... etc. As we are merely adding an >> > externalized component "Script" that gets executed instead of pre-compiled >> > code. >> > >> > Surface Level implementation idea: >> > >> > ScriptFilterFunction extends RichFilterFunction >> > ScriptProcessFunction extends RichProcessFunction >> > ScriptMapFunction extends RichMapFunction >> > …etc >> > >> > -------------------- >> > | RichFilterFunction | >> > -------------------- >> > ▲ >> > | >> > ---------------------- >> > | ScriptFilterFunction | >> > ---------------------- >> > >> > We can add support progressively, not every operator will need its script >> > counterpart in the initial release. >> > >> > Every supported operator will have its Script counterpart where each one >> > of them will have at least one Implemented ScriptExecutor interface; >> > Co*Function may require two. We may also add metrics corresponding to each >> > extended function to track script performance. >> > >> > There will also be a default ScriptExecutor impl that does nothing if user >> > does not define a script to be used. >> > >> > ScriptExecutor is an interface that can be implemented to support various >> > externalized script languages. >> > The interface will include two abstract methods setup() and execute() / >> > execute(...args) where >> > >> > • setup is used to initialize any execution engine or compilation >> > requirement >> > • execute is used to handle invocation of the script function given an >> > optional input object >> > >> > ---------------- >> > | ScriptExecutor | >> > ---------------- >> > △ ◸ >> > | ⟍ >> > | ⟍ >> > | ⟍ >> > -------------------- ---------------------- >> > | BashScriptExecutor | | GroovyScriptExecutor | >> > -------------------- ---------------------- >> > >> > Depending on the ScriptExecutor, it will either accept file path(s) that >> > utilizes Flink’s own FileSystem API for script file loading, or script text >> > that defines the processing logic. >> > >> > _*Use Case*_ >> > >> > Supposed we have a super generic Job that does the following: >> > >> > [Kafka] -> [Filter] -> [Kafka] >> > >> > Instead of having to recompile DataStream API based binary every time >> > filter condition is to be changed, we can now specify a file (may even be >> > from a job parameter or some sort of configuration file) that’s located on >> > an object storage or remote filesystem supported by Flink, and simply >> > redeploy the job. >> > >> > Same job setup can also be used to support multiple similar job with minor >> > differences in filtering criteria. >> > >> > Given that Flink is written in Scala & Java, JVM based language such as >> > Groovy & Scala scripts that can be compiled into JVM Byte Code at runtime >> > can be supported easily. And performance will generally be on-par as >> > pre-compiled deployable binary. >> > >> > By making this part of Flink Operator Set, we will be able to open up an >> > universal script support that rivals other platform and framework. >> > >> > Looking forward to hearing back from the community with regards to this >> > feature. >> > >> > Thanks, >> > >> > K >> >