Arh ok thanks, no problem. My problem is now that nothing is sent, do I need to format it in another way? Or did I miss something else?
I tried to include Class.forName( "com.microsoft.sqlserver.jdbc.SQLServerDriver") but that didn't work. Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier < pomperma...@okkam.it>: > No sorry, you're right. The JDBCOutputFormat should work..I get confused > with the Table API > > On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen < > m...@berlingskemedia.dk> wrote: > >> Hi again, >> >> I am a bit confused as to why the generic jdbc connector would not work >> with sql-server? >> >> Can you explain a bit more? >> >> >> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen < >> m...@berlingskemedia.dk>: >> >>> Hi Flavio, >>> >>> Thanks for your reply. I will try another way then. >>> >>> >>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier < >>> pomperma...@okkam.it>: >>> >>>> I expect you to see some exception somewhere, that sql server dialect >>>> is not supported yet. >>>> >>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < >>>> m...@berlingskemedia.dk> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> Thank you so much! Thought i had that import but misread it. >>>>> >>>>> The code does not give any errors, but no data is written to the sql >>>>> server. Can you see why that is? >>>>> >>>>> >>>>> >>>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < >>>>> pomperma...@okkam.it>: >>>>> >>>>>> SQL server should not be supported from what I know..for this I >>>>>> opened a PR[1] that I should rebase. >>>>>> If someone is interested in I could do it >>>>>> >>>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >>>>>> >>>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Martin, >>>>>>> >>>>>>> usually, this error occurs when people forget to add >>>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by >>>>>>> the >>>>>>> Scala macro that the DataStream API uses for extracting types. >>>>>>> >>>>>>> Can you try to call `result.toAppendStream[Row]` directly? This >>>>>>> should >>>>>>> work if you import `org.apache.flink.table.api.scala._`. >>>>>>> >>>>>>> Maybe this example helps: >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>>>>>> > Hi, >>>>>>> > >>>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but >>>>>>> I have >>>>>>> > difficulties. >>>>>>> > >>>>>>> > I get the following error could not find implicit value for >>>>>>> evidence >>>>>>> > parameter of type >>>>>>> > >>>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>>>>>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>>>>>> > [error] ^ >>>>>>> > >>>>>>> > Any help is appreciated >>>>>>> > >>>>>>> > I am not sure whether my approach is correct or not but my code is >>>>>>> > as follows: >>>>>>> > >>>>>>> > import java.util.Properties >>>>>>> > >>>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>>>>>> JDBCOutputFormat} >>>>>>> > import >>>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>>>>>> > import org.apache.flink.streaming.api.scala._ >>>>>>> > import org.apache.flink.api.scala._ >>>>>>> > import >>>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>>>>>> SimpleStringSchema} >>>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>>>>>> TableEnvironment, Types} >>>>>>> > import org.apache.flink.types.Row >>>>>>> > >>>>>>> > val properties =new Properties() >>>>>>> > properties.setProperty("bootstrap.servers",b_servers) >>>>>>> > properties.setProperty("zookeeper.connect",zk) >>>>>>> > properties.setProperty("group.id <http://group.id>", >>>>>>> "very_small_test") >>>>>>> > properties.setProperty("ssl.endpoint.identification.algorithm >>>>>>> ", "") >>>>>>> > properties.setProperty("security.protocol", "SSL") >>>>>>> > >>>>>>> > >>>>>>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>>>>>> properties).setStartFromTimestamp(0) >>>>>>> > >>>>>>> > val settings = >>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>>>>>> > >>>>>>> > val schema =new Schema() >>>>>>> > .field("fullVisitorId",Types.STRING) >>>>>>> > .field("eventTime",Types.STRING) >>>>>>> > .field("eventID",Types.STRING) >>>>>>> > .field("eventType",Types.STRING) >>>>>>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>>>>>> > .field("CustomDimensions",Types.MAP( Types.STRING, >>>>>>> Types.STRING)) >>>>>>> > >>>>>>> > >>>>>>> > tableEnv.connect(new Kafka() >>>>>>> > .version("universal") >>>>>>> > .topic("very_small_test") >>>>>>> > .properties(properties) >>>>>>> > .startFromEarliest() >>>>>>> > ) >>>>>>> > .withFormat( >>>>>>> > new Json() >>>>>>> > .failOnMissingField(false) >>>>>>> > .deriveSchema() >>>>>>> > ) >>>>>>> > .withSchema(schema) >>>>>>> > .inAppendMode() >>>>>>> > .registerTableSource("sql_source") >>>>>>> > >>>>>>> > >>>>>>> > val sqlStatement ="SELECT * from sql_source where >>>>>>> CustomDimensions['pagePath'] like '%BT%'" >>>>>>> > >>>>>>> > val result =tableEnv.sqlQuery(sqlStatement) >>>>>>> > >>>>>>> > val dsRow =tableEnv.toAppendStream[Row](result) >>>>>>> > >>>>>>> > >>>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>>>>>> > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>>>>>> > .setDBUrl("AWS url") >>>>>>> > .setUsername(username) >>>>>>> > .setPassword(password) >>>>>>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, >>>>>>> EventTime, eventID) VALUES >>>>>>> > (?, ?, ?)") >>>>>>> > .setBatchInterval(100) >>>>>>> > .finish() >>>>>>> > >>>>>>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>>>>>> > >>>>>>> > tableEnv.execute("SQL test") >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > >>>>>>> > *Best regards >>>>>>> > >>>>>>> > Martin Hansen* >>>>>>> > >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Best regards >>>>> >>>>> Martin Hansen >>>>> >>>>> >>>>> >>> >>> -- >>> Best regards >>> >>> Martin Hansen >>> >>> >>> >> >> -- >> Best regards >> >> Martin Hansen >> >> >> > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 041809 > -- Best Regards Martin Hansen