Hi Leonard, First, Thank you.
I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR. i am not ready to change our operational environment to solve this. Second, I am using the DataStream API. The Kafka Topic is not in a table, it is in a DataStream. The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job. I am struggling to determine where this happens. JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated. Thus, I am wondering what is the right approach. Let me restate the parameters. SQL Query One = data in PostgreSQL (200K records) that is used for business logic. SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic. Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks Asci Diagram: [SQL Query One] ----> [Aggregate to Map] Kafka ----> [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] [SQL Query Two] ---->[Aggregate to Map] Maybe my graph above helps. You see, I need Query One and Query Two only ever execute once. After that the information they provide are used to correctly process the Kafka Topic. I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database. Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that. > On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote: > > Hi, Marco > >> If I need SQL Query One and SQL Query Two to happen just one time, > > Looks like you want to reuse this kafka table in one job, It’s supported to > execute multiple query in one sql job in Flink 1.11. > You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a > single SQL job[1]. > > > Best > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement> > > >> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com >> <mailto:mvillalo...@kineteque.com>> 写道: >> >> Lets say that I have: >> >> SQL Query One from data in PostgreSQL (200K records). >> SQL Query Two from data in PostgreSQL (1000 records). >> and Kafka Topic One. >> >> Let's also say that main data from this Flink job arrives in Kafka Topic One. >> >> If I need SQL Query One and SQL Query Two to happen just one time, when the >> job starts up, and afterwards maybe store it in Keyed State or Broadcast >> State, but it's not really part of the stream, then what is the best >> practice for supporting that in Flink >> >> The Flink job needs to stream data from Kafka Topic One, aggregate it, and >> perform computations that require all of the data in SQL Query One and SQL >> Query Two to perform its business logic. >> >> I am using Flink 1.10. >> >> I supposed to query the database before the Job I submitted, and then pass >> it on as parameters to a function? >> Or am I supposed to use JDBCInputFormat for both queries and create two >> streams, and somehow connect or broadcast both of them two the main stream >> that uses Kafka Topic One? >> >> I would appreciate guidance. Please. Thank you. >> >> Sincerely, >> >> Marco A. Villalobos >> >> >> >