It was not clear to me that JdbcInputFormat was part of the DataSet api. Now I understand.
Thank you. On Fri, Jun 18, 2021 at 5:23 AM Timo Walther <twal...@apache.org> wrote: > Hi Marco, > > as Robert already mentioned, the BatchTableEnvironment is simply build > on top of the DataSet API, partitioning functionality is also available > in DataSet API. > > So using the JdbcInputFormat directly should work in DataSet API. > Otherwise I would recommend to use some initial pipeline to transfer the > data from JDBC maybe to a CSV file. Flink should support that. > > Regards, > Timo > > > On 17.06.21 17:43, Marco Villalobos wrote: > > I need to bootstrap a keyed process function. > > > > So, I was hoping to use the Table SQL API because I thought it could > > parallelize the work more efficiently via partitioning. > > I need to boot strap keyed state for a keyed process function, with > > Flnk 1.12.1, thus I think I am required to use the DataSet API. > > > > Is my only option JdbcInputFormat? > > > > ExecutionEnvironment batchEnv = > > ExecutionEnvironment.getExecutionEnvironment(); > > BatchTableEnvironment batchTableEnv = > > BatchTableEnvironment.create(batchEnv); > > batchTableEnv.executeSql(" > > CREATE TABLE my_table ( > > .... > > ) WITH ( > > 'connector.type' = 'jdbc', > > 'connector.url' = '?', > > 'connector.username' = '?', > > 'connector.password' = '?', > > 'connector.table' = 'my_table' > > )"); > > > > Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table"); > > DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class); > > rowDataSet.print(); > > > > This ends up throwing this exception: > > > > org.apache.flink.table.api.TableException: Only BatchTableSource and > > InputFormatTableSource are supported in BatchTableEnvironment. > > at > > > org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) > > at > > > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) > > at > > > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) > > at > > > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) > > at > > > org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) > > > > On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Marco, > > > > which operations do you want to execute in the bootstrap pipeline? > > > > Maybe you don't need to use SQL and old planner. At least this would > > simplify the friction by going through another API layer. > > > > The JDBC connector can be directly be used in DataSet API as well. > > > > Regards, > > Timo > > > > > > > > On 17.06.21 07:33, Marco Villalobos wrote: > > > Thank you very much! > > > > > > I tried using Flink's SQL JDBC connector, and ran into issues. > > > According to the flink documentation, only the old planner is > > compatible > > > with the DataSet API. > > > > > > When I connect to the table: > > > > > > CREATE TABLE my_table ( > > > .... > > > ) WITH ( > > > 'connector.type' = 'jdbc', > > > 'connector.url' = '?', > > > 'connector.username' = '?', > > > 'connector.password' = '?', > > > 'connector.table' = 'my_table' > > > ) > > > > > > It creates a JdbcTableSource, but only BatchTableSource and > > > InputFormatTableSource are supported in BatchTableEnvironment. > > > > > > By the way, it was very challenging to figure out how to create > that > > > connection string, because its a different format than what is in > > the > > > documentation. I had to comb through JdbcTableSourceSinkFactory to > > > figure out how to connect. > > > > > > Is it even possible to use the DataSet API with the Table SQL api > in > > > Flink 1.12.1? > > > > > > > > > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger > > <rmetz...@apache.org <mailto:rmetz...@apache.org> > > > <mailto:rmetz...@apache.org <mailto:rmetz...@apache.org>>> wrote: > > > > > > Hi Marco, > > > > > > The DataSet API will not run out of memory, as it spills to > > disk if > > > the data doesn't fit anymore. > > > Load is distributed by partitioning data. > > > > > > Giving you advice depends a bit on the use-case. I would > > explore two > > > major options: > > > a) reading the data from postgres using Flink's SQL JDBC > > connector > > > [1]. 200 GB is not much data. A 1gb network link needs ~30 > > minutes > > > to transfer that (125 megabytes / second) > > > b) Using the DataSet API and state processor API. I would > > first try > > > to see how much effort it is to read the data using the > > DataSet API > > > (could be less convenient than the Flink SQL JDBC connector). > > > > > > [1] > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > > < > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > > > > > > > < > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > < > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > >> > > > > > > > > > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos > > > <mvillalo...@kineteque.com <mailto:mvillalo...@kineteque.com> > > <mailto:mvillalo...@kineteque.com > > <mailto:mvillalo...@kineteque.com>>> wrote: > > > > > > I must bootstrap state from postgres (approximately 200 > GB of > > > data) and I notice that the state processor API requires > the > > > DataSet API in order to bootstrap state for the Stream > API. > > > > > > I wish there was a way to use the SQL API and use a > > partitioned > > > scan, but I don't know if that is even possible with the > > DataSet > > > API. > > > > > > I never used the DataSet API, and I am unsure how it > manages > > > memory, or distributes load, when handling large state. > > > > > > Would it run out of memory if I map data from a > > JDBCInputFormat > > > into a large DataSet and then use that to bootstrap state > > for my > > > stream job? > > > > > > Any advice on how I should proceed with this would be > greatly > > > appreciated. > > > > > > Thank you. > > > > > > >