I expect you to see some exception somewhere, that sql server dialect is not supported yet.
On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < m...@berlingskemedia.dk> wrote: > Hi Flavio, > > Thank you so much! Thought i had that import but misread it. > > The code does not give any errors, but no data is written to the sql > server. Can you see why that is? > > > > Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < > pomperma...@okkam.it>: > >> SQL server should not be supported from what I know..for this I opened a >> PR[1] that I should rebase. >> If someone is interested in I could do it >> >> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >> >> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Martin, >>> >>> usually, this error occurs when people forget to add >>> `org.apache.flink.api.scala._` to their imports. It is triggered by the >>> Scala macro that the DataStream API uses for extracting types. >>> >>> Can you try to call `result.toAppendStream[Row]` directly? This should >>> work if you import `org.apache.flink.table.api.scala._`. >>> >>> Maybe this example helps: >>> >>> >>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>> >>> Regards, >>> Timo >>> >>> >>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>> > Hi, >>> > >>> > I am trying to write input from Kafka to a SQL server on AWS, but I >>> have >>> > difficulties. >>> > >>> > I get the following error could not find implicit value for evidence >>> > parameter of type >>> > >>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>> > [error] ^ >>> > >>> > Any help is appreciated >>> > >>> > I am not sure whether my approach is correct or not but my code is >>> > as follows: >>> > >>> > import java.util.Properties >>> > >>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>> JDBCOutputFormat} >>> > import >>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>> > import org.apache.flink.streaming.api.scala._ >>> > import org.apache.flink.api.scala._ >>> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>> SimpleStringSchema} >>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>> TableEnvironment, Types} >>> > import org.apache.flink.types.Row >>> > >>> > val properties =new Properties() >>> > properties.setProperty("bootstrap.servers",b_servers) >>> > properties.setProperty("zookeeper.connect",zk) >>> > properties.setProperty("group.id <http://group.id>", >>> "very_small_test") >>> > properties.setProperty("ssl.endpoint.identification.algorithm ", "") >>> > properties.setProperty("security.protocol", "SSL") >>> > >>> > >>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>> properties).setStartFromTimestamp(0) >>> > >>> > val settings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>> > >>> > val schema =new Schema() >>> > .field("fullVisitorId",Types.STRING) >>> > .field("eventTime",Types.STRING) >>> > .field("eventID",Types.STRING) >>> > .field("eventType",Types.STRING) >>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>> > .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING)) >>> > >>> > >>> > tableEnv.connect(new Kafka() >>> > .version("universal") >>> > .topic("very_small_test") >>> > .properties(properties) >>> > .startFromEarliest() >>> > ) >>> > .withFormat( >>> > new Json() >>> > .failOnMissingField(false) >>> > .deriveSchema() >>> > ) >>> > .withSchema(schema) >>> > .inAppendMode() >>> > .registerTableSource("sql_source") >>> > >>> > >>> > val sqlStatement ="SELECT * from sql_source where >>> CustomDimensions['pagePath'] like '%BT%'" >>> > >>> > val result =tableEnv.sqlQuery(sqlStatement) >>> > >>> > val dsRow =tableEnv.toAppendStream[Row](result) >>> > >>> > >>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>> > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>> > .setDBUrl("AWS url") >>> > .setUsername(username) >>> > .setPassword(password) >>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, >>> eventID) VALUES >>> > (?, ?, ?)") >>> > .setBatchInterval(100) >>> > .finish() >>> > >>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>> > >>> > tableEnv.execute("SQL test") >>> > >>> > >>> > -- >>> > >>> > *Best regards >>> > >>> > Martin Hansen* >>> > >>> >> > > -- > > Best regards > > Martin Hansen > > >