Hi, Currently it only supports derby, mysql, postgresql dialect. The dialect 'sqlserver' is still not supported. There is a ticket https://issues.apache.org/jira/browse/FLINK-14101 for this.
Regards, Dian On Mon, Oct 11, 2021 at 9:43 PM Schmid Christian <christian.sch...@cler.ch> wrote: > Hi all > > > > According to the official documentation (Table API / JDBC SQL Connector > v.1.14.0) "the JDBC connector allows reading data from and writing data > into *any relational databases* with a JDBC driver". > > At the moment we are using SQL Server in conjunction with Flink and Java, > which works perfectly fine. Now we try to fetch Data from a Kafka-Topic and > write it to a SQL Server sink using *PyFlink*. > > We succeeded in fetching the data from the kafka topic, but were not able > to establish a connection to SQL Server. > > > > Our code looks as follows: > > > > import os > > > > from pyflink.datastream import StreamExecutionEnvironment > > from pyflink.table import > StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, > WriteMode > > from pyflink.table.descriptors import Schema, Kafka, Json > > > > def main(): > > # Create streaming environment > > env = StreamExecutionEnvironment.get_execution_environment() > > > > settings = EnvironmentSettings.new_instance()\ > > .in_streaming_mode()\ > > .use_blink_planner()\ > > .build() > > > > # create table environment > > tbl_env = StreamTableEnvironment.create( > stream_execution_environment=env, environment_settings=settings) > > > > # add kafka connector dependency > > kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__ > )), 'flink-sql-connector-kafka_2.11-1.13.0.jar') > > > > # add jdbc connector dependency > > jdbc_jar = os.path.join(os.path.abspath(os.path.dirname(__file__ > )),'flink-connector-jdbc_2.11-1.13.2.jar') > > mssql_jar = os.path.join(os.path.abspath(os.path.dirname(__file__ > )),'mssql-jdbc-8.2.2.jre8.jar') > > > > tbl_env.get_config()\ > > .get_configuration().set_string("parallelism.default", "1" > )\ > > .set_string("pipeline.jars", "file:///{};file:///{}" > .format(kafka_jar, jdbc_jar))\ > > .set_string("pipeline.classpaths", "file:///{}" > .format(mssql_jar)) > > > > kafka_table_sql = """ > > CREATE TABLE kafka ( > > [..] VARCHAR, > > data ROW( > > [..] ROW( > > [..] VARCHAR, > > [..] VARCHAR > > )) > > ) WITH ( > > 'connector' = 'kafka', > > 'property-version' = 'universal', > > 'properties.bootstrap.servers' = '[..]', > > 'topic' = '[..]', > > 'scan.startup.mode' = 'earliest-offset', > > 'properties.security.protocol' = 'SSL', > > 'properties.ssl.endpoint.identification.algorithm' = '', > > 'properties.ssl.truststore.location' = '[..]', > > 'properties.ssl.truststore.password' = '[..]', > > 'properties.ssl.keystore.type' = 'JKS', > > 'properties.ssl.keystore.location' = '[..]', > > 'properties.ssl.keystore.password' = [..], > > 'properties.ssl.key.password' = [..], > > 'properties.group.id' = '[..]', > > 'format' = 'json' > > ) > > """ > > > > sqlserver_table_sql = """ > > CREATE TABLE sqltest ( > > [..] VARCHAR, > > [..] VARCHAR > > ) WITH ( > > 'connector' = 'jdbc', > > 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver', > > 'url' = 'jdbc:sqlserver://db-server/database-name', > > 'username' = '[..]', > > 'password' = '[..], > > 'table-name' = 'dbo.tablename' > > ) > > """ > > > > # create source table (kafka) > > tbl_env.execute_sql(kafka_table_sql) > > > > # create sink table (sql server) > > tbl_env.execute_sql(sqlserver_table_sql) > > > > # copy data from source to sink > > tbl_env.execute_sql( > "INSERT INTO sqltest SELECT [..], [..] FROM kafka").wait() > > > > if __name__ == '__main__': > > main() > > > > > > Which lead to an exception (java.lang.IllegalStateException: *Cannot > handle such jdbc url* ..): > > > > Traceback (most recent call last): > > File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in <module> > > main() > > File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main > > tbl_env.execute_sql("[..]").wait() > > File > "C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py", > line 804, in execute_sql > > return TableResult(self._j_tenv.executeSql(stmt)) > > File > "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py", line > 1286, in __call__ > > answer, self.gateway_client, self.target_id, self.name) > > File > "C:\projects\flink\flink-evn\lib\site-packages\pyflink\util\exceptions.py", > line 146, in deco > > return f(*a, **kw) > > File "C:\projects\flink\flink-evn\lib\site-packages\py4j\protocol.py", > line 328, in get_return_value > > format(target_id, ".", name), value) > > py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. > > : org.apache.flink.table.api.ValidationException: Unable to create a sink > for writing table 'default_catalog.default_database.sqltest'. > > > > Table options are: > > > > 'connector'='jdbc' > > 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver' > > 'password'='[..]' > > 'table-name'='[..]' > > 'url'='jdbc:sqlserver:// [..]' > > 'username'='[..]' > > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171) > > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201) > > at > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > at scala.collection.Iterator.foreach(Iterator.scala:937) > > at scala.collection.Iterator.foreach$(Iterator.scala:937) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: > jdbc:sqlserver:// [..] > > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > > at > org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:304) > > at > org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSink(JdbcDynamicTableFactory.java:172) > > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168) > > ... 29 more > > > > *How can we correctly establish a connection to SQL Server using PyFlink > and the Table API?* > > > > Any suggestions are highly appreciated. > > > > Thanks > > > > *Diese E-Mail ist ausschliesslich für den Adressaten bestimmt. Sollten Sie > diese E-Mail irrtümlich erhalten haben oder wünschen Sie künftig keine > Kontakte mehr per E-Mail, bitten wir Sie, die Bank Cler hierüber sofort zu > orientieren. Die irrtümlich erhaltene E-Mail ist mit allen Anhängen > unwiderruflich zu löschen, allfällige Ausdrucke sind zu vernichten und auf > die Verwendung des Inhalts ist zu verzichten. Der Versand unverschlüsselter > E-Mail birgt erhebliche Risiken in sich (mangelnde Vertraulichkeit, > Manipulation von Inhalt/Absender, Fehlleitung, Viren etc.). Bank Cler lehnt > jede Haftung für Schäden hieraus ab. Bank Cler akzeptiert grundsätzlich > keine per E-Mail übermittelten Aufträge, Widerrufe von Aufträgen oder > sonstige Weisungen etc., ohne verpflichtet zu sein, diese ausdrücklich > zurück zu weisen. Kündigungen von Verträgen per E-Mail sind nicht > rechtswirksam. * >