Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have
difficulties.

I get the following error could not find implicit value for evidence
parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]                                           ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is
as follows:

import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table,
TableEnvironment, Types}
import org.apache.flink.types.Row

val properties = new Properties()
properties.setProperty("bootstrap.servers",b_servers)
properties.setProperty("zookeeper.connect",zk)
properties.setProperty("group.id", "very_small_test")
properties.setProperty("ssl.endpoint.identification.algorithm ", "")
properties.setProperty("security.protocol", "SSL")


val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[
String]("very_small_test", new SimpleStringSchema(), properties
).setStartFromTimestamp(0)

val settings = EnvironmentSettings.newInstance
().useBlinkPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema = new Schema()
.field("fullVisitorId",Types.STRING)
.field("eventTime",Types.STRING)
.field("eventID",Types.STRING)
.field("eventType",Types.STRING)
.field("page",Types.MAP( Types.STRING, Types.STRING))
.field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


tableEnv.connect(new Kafka()
.version("universal")
.topic("very_small_test")
.properties(properties)
.startFromEarliest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("sql_source")


val sqlStatement = "SELECT * from sql_source where
CustomDimensions['pagePath'] like '%BT%'"

val result = tableEnv.sqlQuery(sqlStatement)

val dsRow = tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.setDBUrl("AWS url")
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID)
VALUES (?, ?, ?)")
.setBatchInterval(100)
.finish()

dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


-- 



*Best regardsMartin Hansen*

Reply via email to