SQL server should not be supported from what I know..for this I opened a
PR[1] that I should rebase.
If someone is interested in I could do it

[1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )

On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote:

> Hi Martin,
>
> usually, this error occurs when people forget to add
> `org.apache.flink.api.scala._` to their imports. It is triggered by the
> Scala macro that the DataStream API uses for extracting types.
>
> Can you try to call `result.toAppendStream[Row]` directly? This should
> work if you import `org.apache.flink.table.api.scala._`.
>
> Maybe this example helps:
>
>
> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>
> Regards,
> Timo
>
>
> On 22.05.20 08:02, Martin Frank Hansen wrote:
> > Hi,
> >
> > I am trying to write input from Kafka to a SQL server on AWS, but I have
> > difficulties.
> >
> > I get the following error could not find implicit value for evidence
> > parameter of type
> >
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> > [error]                                           ^
> >
> > Any help is appreciated
> >
> > I am not sure whether my approach is correct or not but my code is
> > as follows:
> >
> > import java.util.Properties
> >
> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
> JDBCOutputFormat}
> > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> > import org.apache.flink.streaming.api.scala._
> > import org.apache.flink.api.scala._
> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
> SimpleStringSchema}
> > import org.apache.flink.table.api.scala.StreamTableEnvironment
> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
> TableEnvironment, Types}
> > import org.apache.flink.types.Row
> >
> >    val properties =new Properties()
> >    properties.setProperty("bootstrap.servers",b_servers)
> >    properties.setProperty("zookeeper.connect",zk)
> >    properties.setProperty("group.id <http://group.id>",
> "very_small_test")
> >    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
> >    properties.setProperty("security.protocol", "SSL")
> >
> >
> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
> properties).setStartFromTimestamp(0)
> >
> >    val settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
> > val tableEnv = StreamTableEnvironment.create(env, settings)
> >
> > val schema =new Schema()
> >      .field("fullVisitorId",Types.STRING)
> >      .field("eventTime",Types.STRING)
> >      .field("eventID",Types.STRING)
> >      .field("eventType",Types.STRING)
> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
> >      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
> >
> >
> >    tableEnv.connect(new Kafka()
> >        .version("universal")
> >        .topic("very_small_test")
> >        .properties(properties)
> >        .startFromEarliest()
> >       )
> >      .withFormat(
> >      new Json()
> >        .failOnMissingField(false)
> >        .deriveSchema()
> >    )
> >      .withSchema(schema)
> >      .inAppendMode()
> >      .registerTableSource("sql_source")
> >
> >
> > val sqlStatement ="SELECT * from sql_source where
> CustomDimensions['pagePath'] like '%BT%'"
> >
> > val result =tableEnv.sqlQuery(sqlStatement)
> >
> >    val dsRow =tableEnv.toAppendStream[Row](result)
> >
> >
> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
> >      .setDBUrl("AWS url")
> > .setUsername(username)
> >      .setPassword(password)
> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
> eventID) VALUES
> > (?, ?, ?)")
> >      .setBatchInterval(100)
> >      .finish()
> >
> >    dsRow.writeUsingOutputFormat(jdbcOutput)
> >
> > tableEnv.execute("SQL test")
> >
> >
> > --
> >
> > *Best regards
> >
> > Martin Hansen*
> >
>

Reply via email to