Hi Flavio,

Thank you so much! Thought i had that import but misread it.

The code does not give any errors, but no data is written to the sql
server. Can you see why that is?



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> SQL server should not be supported from what I know..for this I opened a
> PR[1] that I should rebase.
> If someone is interested in I could do it
>
> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>
> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Martin,
>>
>> usually, this error occurs when people forget to add
>> `org.apache.flink.api.scala._` to their imports. It is triggered by the
>> Scala macro that the DataStream API uses for extracting types.
>>
>> Can you try to call `result.toAppendStream[Row]` directly? This should
>> work if you import `org.apache.flink.table.api.scala._`.
>>
>> Maybe this example helps:
>>
>>
>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>
>> Regards,
>> Timo
>>
>>
>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>> > Hi,
>> >
>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>> have
>> > difficulties.
>> >
>> > I get the following error could not find implicit value for evidence
>> > parameter of type
>> >
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>> > [error]                                           ^
>> >
>> > Any help is appreciated
>> >
>> > I am not sure whether my approach is correct or not but my code is
>> > as follows:
>> >
>> > import java.util.Properties
>> >
>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>> JDBCOutputFormat}
>> > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>> > import org.apache.flink.streaming.api.scala._
>> > import org.apache.flink.api.scala._
>> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>> SimpleStringSchema}
>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>> TableEnvironment, Types}
>> > import org.apache.flink.types.Row
>> >
>> >    val properties =new Properties()
>> >    properties.setProperty("bootstrap.servers",b_servers)
>> >    properties.setProperty("zookeeper.connect",zk)
>> >    properties.setProperty("group.id <http://group.id>",
>> "very_small_test")
>> >    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>> >    properties.setProperty("security.protocol", "SSL")
>> >
>> >
>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>> properties).setStartFromTimestamp(0)
>> >
>> >    val settings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>> >
>> > val schema =new Schema()
>> >      .field("fullVisitorId",Types.STRING)
>> >      .field("eventTime",Types.STRING)
>> >      .field("eventID",Types.STRING)
>> >      .field("eventType",Types.STRING)
>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>> >      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>> >
>> >
>> >    tableEnv.connect(new Kafka()
>> >        .version("universal")
>> >        .topic("very_small_test")
>> >        .properties(properties)
>> >        .startFromEarliest()
>> >       )
>> >      .withFormat(
>> >      new Json()
>> >        .failOnMissingField(false)
>> >        .deriveSchema()
>> >    )
>> >      .withSchema(schema)
>> >      .inAppendMode()
>> >      .registerTableSource("sql_source")
>> >
>> >
>> > val sqlStatement ="SELECT * from sql_source where
>> CustomDimensions['pagePath'] like '%BT%'"
>> >
>> > val result =tableEnv.sqlQuery(sqlStatement)
>> >
>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>> >
>> >
>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>> >      .setDBUrl("AWS url")
>> > .setUsername(username)
>> >      .setPassword(password)
>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
>> eventID) VALUES
>> > (?, ?, ?)")
>> >      .setBatchInterval(100)
>> >      .finish()
>> >
>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>> >
>> > tableEnv.execute("SQL test")
>> >
>> >
>> > --
>> >
>> > *Best regards
>> >
>> > Martin Hansen*
>> >
>>
>

-- 

Best regards

Martin Hansen

Reply via email to