No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API
On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen < m...@berlingskemedia.dk> wrote: > Hi again, > > I am a bit confused as to why the generic jdbc connector would not work > with sql-server? > > Can you explain a bit more? > > > Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen < > m...@berlingskemedia.dk>: > >> Hi Flavio, >> >> Thanks for your reply. I will try another way then. >> >> >> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier < >> pomperma...@okkam.it>: >> >>> I expect you to see some exception somewhere, that sql server dialect is >>> not supported yet. >>> >>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < >>> m...@berlingskemedia.dk> wrote: >>> >>>> Hi Flavio, >>>> >>>> Thank you so much! Thought i had that import but misread it. >>>> >>>> The code does not give any errors, but no data is written to the sql >>>> server. Can you see why that is? >>>> >>>> >>>> >>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < >>>> pomperma...@okkam.it>: >>>> >>>>> SQL server should not be supported from what I know..for this I >>>>> opened a PR[1] that I should rebase. >>>>> If someone is interested in I could do it >>>>> >>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >>>>> >>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Martin, >>>>>> >>>>>> usually, this error occurs when people forget to add >>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by >>>>>> the >>>>>> Scala macro that the DataStream API uses for extracting types. >>>>>> >>>>>> Can you try to call `result.toAppendStream[Row]` directly? This >>>>>> should >>>>>> work if you import `org.apache.flink.table.api.scala._`. >>>>>> >>>>>> Maybe this example helps: >>>>>> >>>>>> >>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>>>>> > Hi, >>>>>> > >>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but I >>>>>> have >>>>>> > difficulties. >>>>>> > >>>>>> > I get the following error could not find implicit value for >>>>>> evidence >>>>>> > parameter of type >>>>>> > >>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>>>>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>>>>> > [error] ^ >>>>>> > >>>>>> > Any help is appreciated >>>>>> > >>>>>> > I am not sure whether my approach is correct or not but my code is >>>>>> > as follows: >>>>>> > >>>>>> > import java.util.Properties >>>>>> > >>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>>>>> JDBCOutputFormat} >>>>>> > import >>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>>>>> > import org.apache.flink.streaming.api.scala._ >>>>>> > import org.apache.flink.api.scala._ >>>>>> > import >>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>>>>> SimpleStringSchema} >>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>>>>> TableEnvironment, Types} >>>>>> > import org.apache.flink.types.Row >>>>>> > >>>>>> > val properties =new Properties() >>>>>> > properties.setProperty("bootstrap.servers",b_servers) >>>>>> > properties.setProperty("zookeeper.connect",zk) >>>>>> > properties.setProperty("group.id <http://group.id>", >>>>>> "very_small_test") >>>>>> > properties.setProperty("ssl.endpoint.identification.algorithm ", >>>>>> "") >>>>>> > properties.setProperty("security.protocol", "SSL") >>>>>> > >>>>>> > >>>>>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>>>>> properties).setStartFromTimestamp(0) >>>>>> > >>>>>> > val settings = >>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>>>>> > >>>>>> > val schema =new Schema() >>>>>> > .field("fullVisitorId",Types.STRING) >>>>>> > .field("eventTime",Types.STRING) >>>>>> > .field("eventID",Types.STRING) >>>>>> > .field("eventType",Types.STRING) >>>>>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>>>>> > .field("CustomDimensions",Types.MAP( Types.STRING, >>>>>> Types.STRING)) >>>>>> > >>>>>> > >>>>>> > tableEnv.connect(new Kafka() >>>>>> > .version("universal") >>>>>> > .topic("very_small_test") >>>>>> > .properties(properties) >>>>>> > .startFromEarliest() >>>>>> > ) >>>>>> > .withFormat( >>>>>> > new Json() >>>>>> > .failOnMissingField(false) >>>>>> > .deriveSchema() >>>>>> > ) >>>>>> > .withSchema(schema) >>>>>> > .inAppendMode() >>>>>> > .registerTableSource("sql_source") >>>>>> > >>>>>> > >>>>>> > val sqlStatement ="SELECT * from sql_source where >>>>>> CustomDimensions['pagePath'] like '%BT%'" >>>>>> > >>>>>> > val result =tableEnv.sqlQuery(sqlStatement) >>>>>> > >>>>>> > val dsRow =tableEnv.toAppendStream[Row](result) >>>>>> > >>>>>> > >>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>>>>> > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>>>>> > .setDBUrl("AWS url") >>>>>> > .setUsername(username) >>>>>> > .setPassword(password) >>>>>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, >>>>>> EventTime, eventID) VALUES >>>>>> > (?, ?, ?)") >>>>>> > .setBatchInterval(100) >>>>>> > .finish() >>>>>> > >>>>>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>>>>> > >>>>>> > tableEnv.execute("SQL test") >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > >>>>>> > *Best regards >>>>>> > >>>>>> > Martin Hansen* >>>>>> > >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Best regards >>>> >>>> Martin Hansen >>>> >>>> >>>> >> >> -- >> Best regards >> >> Martin Hansen >> >> >> > > -- > Best regards > > Martin Hansen > > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809