Hi,

Currently it only supports derby, mysql, postgresql dialect. The dialect
'sqlserver' is still not supported. There is a ticket
https://issues.apache.org/jira/browse/FLINK-14101 for this.

Regards,
Dian

On Mon, Oct 11, 2021 at 9:43 PM Schmid Christian <christian.sch...@cler.ch>
wrote:

> Hi all
>
>
>
> According to the official documentation (Table API / JDBC SQL Connector
> v.1.14.0) "the JDBC connector allows reading data from and writing data
> into *any relational databases* with a JDBC driver".
>
> At the moment we are using SQL Server in conjunction with Flink and Java,
> which works perfectly fine. Now we try to fetch Data from a Kafka-Topic and
> write it to a SQL Server sink using *PyFlink*.
>
> We succeeded in fetching the data from the kafka topic, but were not able
> to establish a connection to SQL Server.
>
>
>
> Our code looks as follows:
>
>
>
> import os
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.table import
>  StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, 
> WriteMode
>
> from pyflink.table.descriptors import Schema, Kafka, Json
>
>
>
> def main():
>
>         # Create streaming environment
>
>         env = StreamExecutionEnvironment.get_execution_environment()
>
>
>
>         settings = EnvironmentSettings.new_instance()\
>
>                 .in_streaming_mode()\
>
>                 .use_blink_planner()\
>
>                 .build()
>
>
>
>         # create table environment
>
>         tbl_env = StreamTableEnvironment.create(
> stream_execution_environment=env, environment_settings=settings)
>
>
>
>         # add kafka connector dependency
>
>         kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )), 'flink-sql-connector-kafka_2.11-1.13.0.jar')
>
>
>
>         # add jdbc connector dependency
>
>         jdbc_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )),'flink-connector-jdbc_2.11-1.13.2.jar')
>
>         mssql_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )),'mssql-jdbc-8.2.2.jre8.jar')
>
>
>
>         tbl_env.get_config()\
>
>                 .get_configuration().set_string("parallelism.default", "1"
> )\
>
>                 .set_string("pipeline.jars", "file:///{};file:///{}"
> .format(kafka_jar, jdbc_jar))\
>
>                 .set_string("pipeline.classpaths", "file:///{}"
> .format(mssql_jar))
>
>
>
>         kafka_table_sql = """
>
>         CREATE TABLE kafka (
>
>         [..] VARCHAR,
>
>         data ROW(
>
>                 [..] ROW(
>
>                         [..] VARCHAR,
>
>                         [..] VARCHAR
>
>                         ))
>
>         ) WITH (
>
>         'connector' = 'kafka',
>
>         'property-version' = 'universal',
>
>         'properties.bootstrap.servers' = '[..]',
>
>         'topic' = '[..]',
>
>         'scan.startup.mode' = 'earliest-offset',
>
>         'properties.security.protocol' = 'SSL',
>
>         'properties.ssl.endpoint.identification.algorithm' = '',
>
>         'properties.ssl.truststore.location' = '[..]',
>
>         'properties.ssl.truststore.password' = '[..]',
>
>         'properties.ssl.keystore.type' = 'JKS',
>
>         'properties.ssl.keystore.location' = '[..]',
>
>         'properties.ssl.keystore.password' = [..],
>
>         'properties.ssl.key.password' = [..],
>
>         'properties.group.id' = '[..]',
>
>         'format' = 'json'
>
>         )
>
>         """
>
>
>
>        sqlserver_table_sql = """
>
>         CREATE TABLE sqltest (
>
>                 [..] VARCHAR,
>
>                 [..] VARCHAR
>
>         ) WITH (
>
>                 'connector' = 'jdbc',
>
>                 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
>
>                 'url' = 'jdbc:sqlserver://db-server/database-name',
>
>                 'username' = '[..]',
>
>                 'password' = '[..],
>
>                 'table-name' = 'dbo.tablename'
>
>         )
>
>         """
>
>
>
>         # create source table (kafka)
>
>         tbl_env.execute_sql(kafka_table_sql)
>
>
>
>         # create sink table (sql server)
>
>         tbl_env.execute_sql(sqlserver_table_sql)
>
>
>
>         # copy data from source to sink
>
>         tbl_env.execute_sql(
> "INSERT INTO sqltest SELECT [..], [..] FROM kafka").wait()
>
>
>
> if __name__ == '__main__':
>
>         main()
>
>
>
>
>
> Which lead to an exception (java.lang.IllegalStateException: *Cannot
> handle such jdbc url* ..):
>
>
>
> Traceback (most recent call last):
>
>   File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in <module>
>
>     main()
>
>   File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main
>
>     tbl_env.execute_sql("[..]").wait()
>
>   File
> "C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py",
> line 804, in execute_sql
>
>     return TableResult(self._j_tenv.executeSql(stmt))
>
>   File
> "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py", line
> 1286, in __call__
>
>     answer, self.gateway_client, self.target_id, self.name)
>
>   File
> "C:\projects\flink\flink-evn\lib\site-packages\pyflink\util\exceptions.py",
> line 146, in deco
>
>     return f(*a, **kw)
>
>   File "C:\projects\flink\flink-evn\lib\site-packages\py4j\protocol.py",
> line 328, in get_return_value
>
>     format(target_id, ".", name), value)
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
>
> : org.apache.flink.table.api.ValidationException: Unable to create a sink
> for writing table 'default_catalog.default_database.sqltest'.
>
>
>
> Table options are:
>
>
>
> 'connector'='jdbc'
>
> 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver'
>
> 'password'='[..]'
>
> 'table-name'='[..]'
>
> 'url'='jdbc:sqlserver:// [..]'
>
> 'username'='[..]'
>
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
>
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
>
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
>
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
>
>         at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
>
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
>
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
>
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url:
> jdbc:sqlserver:// [..]
>
>         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>
>         at
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:304)
>
>         at
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSink(JdbcDynamicTableFactory.java:172)
>
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
>
>         ... 29 more
>
>
>
> *How can we correctly establish a connection to SQL Server using PyFlink
> and the Table API?*
>
>
>
> Any suggestions are highly appreciated.
>
>
>
> Thanks
>
>
>
> *Diese E-Mail ist ausschliesslich für den Adressaten bestimmt. Sollten Sie
> diese E-Mail irrtümlich erhalten haben oder wünschen Sie künftig keine
> Kontakte mehr per E-Mail, bitten wir Sie, die Bank Cler hierüber sofort zu
> orientieren. Die irrtümlich erhaltene E-Mail ist mit allen Anhängen
> unwiderruflich zu löschen, allfällige Ausdrucke sind zu vernichten und auf
> die Verwendung des Inhalts ist zu verzichten. Der Versand unverschlüsselter
> E-Mail birgt erhebliche Risiken in sich (mangelnde Vertraulichkeit,
> Manipulation von Inhalt/Absender, Fehlleitung, Viren etc.). Bank Cler lehnt
> jede Haftung für Schäden hieraus ab. Bank Cler akzeptiert grundsätzlich
> keine per E-Mail übermittelten Aufträge, Widerrufe von Aufträgen oder
> sonstige Weisungen etc., ohne verpflichtet zu sein, diese ausdrücklich
> zurück zu weisen. Kündigungen von Verträgen per E-Mail sind nicht
> rechtswirksam. *
>

Reply via email to