It was not clear to me that JdbcInputFormat was part of the DataSet api.
Now I understand.

Thank you.


On Fri, Jun 18, 2021 at 5:23 AM Timo Walther <twal...@apache.org> wrote:

> Hi Marco,
>
> as Robert already mentioned, the BatchTableEnvironment is simply build
> on top of the DataSet API, partitioning functionality is also available
> in DataSet API.
>
> So using the JdbcInputFormat directly should work in DataSet API.
> Otherwise I would recommend to use some initial pipeline to transfer the
> data from JDBC maybe to a CSV file. Flink should support that.
>
> Regards,
> Timo
>
>
> On 17.06.21 17:43, Marco Villalobos wrote:
> > I need to bootstrap a keyed process function.
> >
> > So, I was hoping to use the Table SQL API because I thought it could
> > parallelize the work more efficiently via partitioning.
> > I need to boot strap keyed state for a keyed process function, with
> > Flnk 1.12.1, thus I think I am required to use the DataSet API.
> >
> > Is my only option JdbcInputFormat?
> >
> > ExecutionEnvironment batchEnv =
> > ExecutionEnvironment.getExecutionEnvironment();
> > BatchTableEnvironment batchTableEnv =
> > BatchTableEnvironment.create(batchEnv);
> > batchTableEnv.executeSql("
> > CREATE TABLE my_table (
> > ....
> > ) WITH (
> >     'connector.type' = 'jdbc',
> >     'connector.url' = '?',
> >     'connector.username' = '?',
> >     'connector.password' = '?',
> >     'connector.table' = 'my_table'
> > )");
> >
> > Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
> > DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class);
> > rowDataSet.print();
> >
> > This ends up throwing this exception:
> >
> > org.apache.flink.table.api.TableException: Only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> > at
> >
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
> > at
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
> > at
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
> > at
> >
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
> > at
> >
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
> >
> > On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Hi Marco,
> >
> >     which operations do you want to execute in the bootstrap pipeline?
> >
> >     Maybe you don't need to use SQL and old planner. At least this would
> >     simplify the friction by going through another API layer.
> >
> >     The JDBC connector can be directly be used in DataSet API as well.
> >
> >     Regards,
> >     Timo
> >
> >
> >
> >     On 17.06.21 07:33, Marco Villalobos wrote:
> >      > Thank you very much!
> >      >
> >      > I tried using Flink's SQL JDBC connector, and ran into issues.
> >      > According to the flink documentation, only the old planner is
> >     compatible
> >      > with the DataSet API.
> >      >
> >      > When I connect to the table:
> >      >
> >      > CREATE TABLE my_table (
> >      > ....
> >      > ) WITH (
> >      >     'connector.type' = 'jdbc',
> >      >     'connector.url' = '?',
> >      >     'connector.username' = '?',
> >      >     'connector.password' = '?',
> >      >     'connector.table' = 'my_table'
> >      > )
> >      >
> >      > It creates a JdbcTableSource, but only BatchTableSource and
> >      > InputFormatTableSource are supported in BatchTableEnvironment.
> >      >
> >      > By the way, it was very challenging to figure out how to create
> that
> >      > connection string, because its a different format than what is in
> >     the
> >      > documentation. I had to comb through JdbcTableSourceSinkFactory to
> >      > figure out how to connect.
> >      >
> >      > Is it even possible to use the DataSet API with the Table SQL api
> in
> >      > Flink 1.12.1?
> >      >
> >      >
> >      > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger
> >     <rmetz...@apache.org <mailto:rmetz...@apache.org>
> >      > <mailto:rmetz...@apache.org <mailto:rmetz...@apache.org>>> wrote:
> >      >
> >      >     Hi Marco,
> >      >
> >      >     The DataSet API will not run out of memory, as it spills to
> >     disk if
> >      >     the data doesn't fit anymore.
> >      >     Load is distributed by partitioning data.
> >      >
> >      >     Giving you advice depends a bit on the use-case. I would
> >     explore two
> >      >     major options:
> >      >     a) reading the data from postgres using Flink's SQL JDBC
> >     connector
> >      >     [1]. 200 GB is not much data. A 1gb network link needs ~30
> >     minutes
> >      >     to transfer that (125 megabytes / second)
> >      >     b) Using the DataSet API and state processor API. I would
> >     first try
> >      >     to see how much effort it is to read the data using the
> >     DataSet API
> >      >     (could be less convenient than the Flink SQL JDBC connector).
> >      >
> >      >     [1]
> >      >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >     <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >
> >      >
> >       <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >>
> >      >
> >      >
> >      >     On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
> >      >     <mvillalo...@kineteque.com <mailto:mvillalo...@kineteque.com>
> >     <mailto:mvillalo...@kineteque.com
> >     <mailto:mvillalo...@kineteque.com>>> wrote:
> >      >
> >      >         I must bootstrap state from postgres (approximately 200
> GB of
> >      >         data) and I notice that the state processor API requires
> the
> >      >         DataSet API in order to bootstrap state for the Stream
> API.
> >      >
> >      >         I wish there was a way to use the SQL API and use a
> >     partitioned
> >      >         scan, but I don't know if that is even possible with the
> >     DataSet
> >      >         API.
> >      >
> >      >         I never used the DataSet API, and I am unsure how it
> manages
> >      >         memory, or distributes load, when handling large state.
> >      >
> >      >         Would it run out of memory if I map data from a
> >     JDBCInputFormat
> >      >         into a large DataSet and then use that to bootstrap state
> >     for my
> >      >         stream job?
> >      >
> >      >         Any advice on how I should proceed with this would be
> greatly
> >      >         appreciated.
> >      >
> >      >         Thank you.
> >      >
> >
>
>

Reply via email to