SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase. If someone is interested in I could do it
[1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote: > Hi Martin, > > usually, this error occurs when people forget to add > `org.apache.flink.api.scala._` to their imports. It is triggered by the > Scala macro that the DataStream API uses for extracting types. > > Can you try to call `result.toAppendStream[Row]` directly? This should > work if you import `org.apache.flink.table.api.scala._`. > > Maybe this example helps: > > > https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala > > Regards, > Timo > > > On 22.05.20 08:02, Martin Frank Hansen wrote: > > Hi, > > > > I am trying to write input from Kafka to a SQL server on AWS, but I have > > difficulties. > > > > I get the following error could not find implicit value for evidence > > parameter of type > > > org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] > > [error] val dsRow = tableEnv.toAppendStream[Row](result) > > [error] ^ > > > > Any help is appreciated > > > > I am not sure whether my approach is correct or not but my code is > > as follows: > > > > import java.util.Properties > > > > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} > > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, > JDBCOutputFormat} > > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, > FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} > > import org.apache.flink.streaming.api.scala._ > > import org.apache.flink.api.scala._ > > import org.apache.flink.api.common.serialization.{SimpleStringEncoder, > SimpleStringSchema} > > import org.apache.flink.table.api.scala.StreamTableEnvironment > > import org.apache.flink.table.api.{EnvironmentSettings, Table, > TableEnvironment, Types} > > import org.apache.flink.types.Row > > > > val properties =new Properties() > > properties.setProperty("bootstrap.servers",b_servers) > > properties.setProperty("zookeeper.connect",zk) > > properties.setProperty("group.id <http://group.id>", > "very_small_test") > > properties.setProperty("ssl.endpoint.identification.algorithm ", "") > > properties.setProperty("security.protocol", "SSL") > > > > > > val kafkaSource: FlinkKafkaConsumerBase[String] =new > FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), > properties).setStartFromTimestamp(0) > > > > val settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val tableEnv = StreamTableEnvironment.create(env, settings) > > > > val schema =new Schema() > > .field("fullVisitorId",Types.STRING) > > .field("eventTime",Types.STRING) > > .field("eventID",Types.STRING) > > .field("eventType",Types.STRING) > > .field("page",Types.MAP( Types.STRING, Types.STRING)) > > .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING)) > > > > > > tableEnv.connect(new Kafka() > > .version("universal") > > .topic("very_small_test") > > .properties(properties) > > .startFromEarliest() > > ) > > .withFormat( > > new Json() > > .failOnMissingField(false) > > .deriveSchema() > > ) > > .withSchema(schema) > > .inAppendMode() > > .registerTableSource("sql_source") > > > > > > val sqlStatement ="SELECT * from sql_source where > CustomDimensions['pagePath'] like '%BT%'" > > > > val result =tableEnv.sqlQuery(sqlStatement) > > > > val dsRow =tableEnv.toAppendStream[Row](result) > > > > > > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() > > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") > > .setDBUrl("AWS url") > > .setUsername(username) > > .setPassword(password) > > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, > eventID) VALUES > > (?, ?, ?)") > > .setBatchInterval(100) > > .finish() > > > > dsRow.writeUsingOutputFormat(jdbcOutput) > > > > tableEnv.execute("SQL test") > > > > > > -- > > > > *Best regards > > > > Martin Hansen* > > >