I expect you to see some exception somewhere, that sql server dialect is
not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
m...@berlingskemedia.dk> wrote:

> Hi Flavio,
>
> Thank you so much! Thought i had that import but misread it.
>
> The code does not give any errors, but no data is written to the sql
> server. Can you see why that is?
>
>
>
> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> SQL server should not be supported from what I know..for this I opened a
>> PR[1] that I should rebase.
>> If someone is interested in I could do it
>>
>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>
>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Martin,
>>>
>>> usually, this error occurs when people forget to add
>>> `org.apache.flink.api.scala._` to their imports. It is triggered by the
>>> Scala macro that the DataStream API uses for extracting types.
>>>
>>> Can you try to call `result.toAppendStream[Row]` directly? This should
>>> work if you import `org.apache.flink.table.api.scala._`.
>>>
>>> Maybe this example helps:
>>>
>>>
>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>> > Hi,
>>> >
>>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>>> have
>>> > difficulties.
>>> >
>>> > I get the following error could not find implicit value for evidence
>>> > parameter of type
>>> >
>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>> > [error]                                           ^
>>> >
>>> > Any help is appreciated
>>> >
>>> > I am not sure whether my approach is correct or not but my code is
>>> > as follows:
>>> >
>>> > import java.util.Properties
>>> >
>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>> JDBCOutputFormat}
>>> > import
>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>> > import org.apache.flink.streaming.api.scala._
>>> > import org.apache.flink.api.scala._
>>> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>> SimpleStringSchema}
>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>> TableEnvironment, Types}
>>> > import org.apache.flink.types.Row
>>> >
>>> >    val properties =new Properties()
>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>> >    properties.setProperty("zookeeper.connect",zk)
>>> >    properties.setProperty("group.id <http://group.id>",
>>> "very_small_test")
>>> >    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>>> >    properties.setProperty("security.protocol", "SSL")
>>> >
>>> >
>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>> properties).setStartFromTimestamp(0)
>>> >
>>> >    val settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>> >
>>> > val schema =new Schema()
>>> >      .field("fullVisitorId",Types.STRING)
>>> >      .field("eventTime",Types.STRING)
>>> >      .field("eventID",Types.STRING)
>>> >      .field("eventType",Types.STRING)
>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>> >      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>>> >
>>> >
>>> >    tableEnv.connect(new Kafka()
>>> >        .version("universal")
>>> >        .topic("very_small_test")
>>> >        .properties(properties)
>>> >        .startFromEarliest()
>>> >       )
>>> >      .withFormat(
>>> >      new Json()
>>> >        .failOnMissingField(false)
>>> >        .deriveSchema()
>>> >    )
>>> >      .withSchema(schema)
>>> >      .inAppendMode()
>>> >      .registerTableSource("sql_source")
>>> >
>>> >
>>> > val sqlStatement ="SELECT * from sql_source where
>>> CustomDimensions['pagePath'] like '%BT%'"
>>> >
>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>> >
>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>> >
>>> >
>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>> >      .setDBUrl("AWS url")
>>> > .setUsername(username)
>>> >      .setPassword(password)
>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
>>> eventID) VALUES
>>> > (?, ?, ?)")
>>> >      .setBatchInterval(100)
>>> >      .finish()
>>> >
>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>> >
>>> > tableEnv.execute("SQL test")
>>> >
>>> >
>>> > --
>>> >
>>> > *Best regards
>>> >
>>> > Martin Hansen*
>>> >
>>>
>>
>
> --
>
> Best regards
>
> Martin Hansen
>
>
>

Reply via email to