Hi, I am trying to write input from Kafka to a SQL server on AWS, but I have difficulties.
I get the following error could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] [error] val dsRow = tableEnv.toAppendStream[Row](result) [error] ^ Any help is appreciated I am not sure whether my approach is correct or not but my code is as follows: import java.util.Properties import org.apache.flink.table.descriptors.{Json, Kafka, Schema} import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.scala._ import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types} import org.apache.flink.types.Row val properties = new Properties() properties.setProperty("bootstrap.servers",b_servers) properties.setProperty("zookeeper.connect",zk) properties.setProperty("group.id", "very_small_test") properties.setProperty("ssl.endpoint.identification.algorithm ", "") properties.setProperty("security.protocol", "SSL") val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[ String]("very_small_test", new SimpleStringSchema(), properties ).setStartFromTimestamp(0) val settings = EnvironmentSettings.newInstance ().useBlinkPlanner().inStreamingMode().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, settings) val schema = new Schema() .field("fullVisitorId",Types.STRING) .field("eventTime",Types.STRING) .field("eventID",Types.STRING) .field("eventType",Types.STRING) .field("page",Types.MAP( Types.STRING, Types.STRING)) .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING)) tableEnv.connect(new Kafka() .version("universal") .topic("very_small_test") .properties(properties) .startFromEarliest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema(schema) .inAppendMode() .registerTableSource("sql_source") val sqlStatement = "SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'" val result = tableEnv.sqlQuery(sqlStatement) val dsRow = tableEnv.toAppendStream[Row](result) val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") .setDBUrl("AWS url") .setUsername(username) .setPassword(password) .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES (?, ?, ?)") .setBatchInterval(100) .finish() dsRow.writeUsingOutputFormat(jdbcOutput) tableEnv.execute("SQL test") -- *Best regardsMartin Hansen*