Hi again, I am a bit confused as to why the generic jdbc connector would not work with sql-server?
Can you explain a bit more? Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen < m...@berlingskemedia.dk>: > Hi Flavio, > > Thanks for your reply. I will try another way then. > > > Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier < > pomperma...@okkam.it>: > >> I expect you to see some exception somewhere, that sql server dialect is >> not supported yet. >> >> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < >> m...@berlingskemedia.dk> wrote: >> >>> Hi Flavio, >>> >>> Thank you so much! Thought i had that import but misread it. >>> >>> The code does not give any errors, but no data is written to the sql >>> server. Can you see why that is? >>> >>> >>> >>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < >>> pomperma...@okkam.it>: >>> >>>> SQL server should not be supported from what I know..for this I >>>> opened a PR[1] that I should rebase. >>>> If someone is interested in I could do it >>>> >>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >>>> >>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> >>>> wrote: >>>> >>>>> Hi Martin, >>>>> >>>>> usually, this error occurs when people forget to add >>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by >>>>> the >>>>> Scala macro that the DataStream API uses for extracting types. >>>>> >>>>> Can you try to call `result.toAppendStream[Row]` directly? This should >>>>> work if you import `org.apache.flink.table.api.scala._`. >>>>> >>>>> Maybe this example helps: >>>>> >>>>> >>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>>>> > Hi, >>>>> > >>>>> > I am trying to write input from Kafka to a SQL server on AWS, but I >>>>> have >>>>> > difficulties. >>>>> > >>>>> > I get the following error could not find implicit value for evidence >>>>> > parameter of type >>>>> > >>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>>>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>>>> > [error] ^ >>>>> > >>>>> > Any help is appreciated >>>>> > >>>>> > I am not sure whether my approach is correct or not but my code is >>>>> > as follows: >>>>> > >>>>> > import java.util.Properties >>>>> > >>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>>>> JDBCOutputFormat} >>>>> > import >>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>>>> > import org.apache.flink.streaming.api.scala._ >>>>> > import org.apache.flink.api.scala._ >>>>> > import >>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>>>> SimpleStringSchema} >>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>>>> TableEnvironment, Types} >>>>> > import org.apache.flink.types.Row >>>>> > >>>>> > val properties =new Properties() >>>>> > properties.setProperty("bootstrap.servers",b_servers) >>>>> > properties.setProperty("zookeeper.connect",zk) >>>>> > properties.setProperty("group.id <http://group.id>", >>>>> "very_small_test") >>>>> > properties.setProperty("ssl.endpoint.identification.algorithm ", >>>>> "") >>>>> > properties.setProperty("security.protocol", "SSL") >>>>> > >>>>> > >>>>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>>>> properties).setStartFromTimestamp(0) >>>>> > >>>>> > val settings = >>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>>>> > >>>>> > val schema =new Schema() >>>>> > .field("fullVisitorId",Types.STRING) >>>>> > .field("eventTime",Types.STRING) >>>>> > .field("eventID",Types.STRING) >>>>> > .field("eventType",Types.STRING) >>>>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>>>> > .field("CustomDimensions",Types.MAP( Types.STRING, >>>>> Types.STRING)) >>>>> > >>>>> > >>>>> > tableEnv.connect(new Kafka() >>>>> > .version("universal") >>>>> > .topic("very_small_test") >>>>> > .properties(properties) >>>>> > .startFromEarliest() >>>>> > ) >>>>> > .withFormat( >>>>> > new Json() >>>>> > .failOnMissingField(false) >>>>> > .deriveSchema() >>>>> > ) >>>>> > .withSchema(schema) >>>>> > .inAppendMode() >>>>> > .registerTableSource("sql_source") >>>>> > >>>>> > >>>>> > val sqlStatement ="SELECT * from sql_source where >>>>> CustomDimensions['pagePath'] like '%BT%'" >>>>> > >>>>> > val result =tableEnv.sqlQuery(sqlStatement) >>>>> > >>>>> > val dsRow =tableEnv.toAppendStream[Row](result) >>>>> > >>>>> > >>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>>>> > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>>>> > .setDBUrl("AWS url") >>>>> > .setUsername(username) >>>>> > .setPassword(password) >>>>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, >>>>> EventTime, eventID) VALUES >>>>> > (?, ?, ?)") >>>>> > .setBatchInterval(100) >>>>> > .finish() >>>>> > >>>>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>>>> > >>>>> > tableEnv.execute("SQL test") >>>>> > >>>>> > >>>>> > -- >>>>> > >>>>> > *Best regards >>>>> > >>>>> > Martin Hansen* >>>>> > >>>>> >>>> >>> >>> -- >>> >>> Best regards >>> >>> Martin Hansen >>> >>> >>> > > -- > Best regards > > Martin Hansen > > > -- Best regards Martin Hansen