Hi Flavio,

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> I expect you to see some exception somewhere, that sql server dialect is
> not supported yet.
>
> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi Flavio,
>>
>> Thank you so much! Thought i had that import but misread it.
>>
>> The code does not give any errors, but no data is written to the sql
>> server. Can you see why that is?
>>
>>
>>
>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> SQL server should not be supported from what I know..for this I opened a
>>> PR[1] that I should rebase.
>>> If someone is interested in I could do it
>>>
>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>
>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> usually, this error occurs when people forget to add
>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by the
>>>> Scala macro that the DataStream API uses for extracting types.
>>>>
>>>> Can you try to call `result.toAppendStream[Row]` directly? This should
>>>> work if you import `org.apache.flink.table.api.scala._`.
>>>>
>>>> Maybe this example helps:
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>>> > Hi,
>>>> >
>>>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>>>> have
>>>> > difficulties.
>>>> >
>>>> > I get the following error could not find implicit value for evidence
>>>> > parameter of type
>>>> >
>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>>> > [error]                                           ^
>>>> >
>>>> > Any help is appreciated
>>>> >
>>>> > I am not sure whether my approach is correct or not but my code is
>>>> > as follows:
>>>> >
>>>> > import java.util.Properties
>>>> >
>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>>> JDBCOutputFormat}
>>>> > import
>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>>> > import org.apache.flink.streaming.api.scala._
>>>> > import org.apache.flink.api.scala._
>>>> > import
>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>>> SimpleStringSchema}
>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>>> TableEnvironment, Types}
>>>> > import org.apache.flink.types.Row
>>>> >
>>>> >    val properties =new Properties()
>>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>>> >    properties.setProperty("zookeeper.connect",zk)
>>>> >    properties.setProperty("group.id <http://group.id>",
>>>> "very_small_test")
>>>> >    properties.setProperty("ssl.endpoint.identification.algorithm ",
>>>> "")
>>>> >    properties.setProperty("security.protocol", "SSL")
>>>> >
>>>> >
>>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>>> properties).setStartFromTimestamp(0)
>>>> >
>>>> >    val settings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>>> >
>>>> > val schema =new Schema()
>>>> >      .field("fullVisitorId",Types.STRING)
>>>> >      .field("eventTime",Types.STRING)
>>>> >      .field("eventID",Types.STRING)
>>>> >      .field("eventType",Types.STRING)
>>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>>> >      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>>>> >
>>>> >
>>>> >    tableEnv.connect(new Kafka()
>>>> >        .version("universal")
>>>> >        .topic("very_small_test")
>>>> >        .properties(properties)
>>>> >        .startFromEarliest()
>>>> >       )
>>>> >      .withFormat(
>>>> >      new Json()
>>>> >        .failOnMissingField(false)
>>>> >        .deriveSchema()
>>>> >    )
>>>> >      .withSchema(schema)
>>>> >      .inAppendMode()
>>>> >      .registerTableSource("sql_source")
>>>> >
>>>> >
>>>> > val sqlStatement ="SELECT * from sql_source where
>>>> CustomDimensions['pagePath'] like '%BT%'"
>>>> >
>>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>>> >
>>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>>> >
>>>> >
>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>>> >      .setDBUrl("AWS url")
>>>> > .setUsername(username)
>>>> >      .setPassword(password)
>>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId,
>>>> EventTime, eventID) VALUES
>>>> > (?, ?, ?)")
>>>> >      .setBatchInterval(100)
>>>> >      .finish()
>>>> >
>>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>>> >
>>>> > tableEnv.execute("SQL test")
>>>> >
>>>> >
>>>> > --
>>>> >
>>>> > *Best regards
>>>> >
>>>> > Martin Hansen*
>>>> >
>>>>
>>>
>>
>> --
>>
>> Best regards
>>
>> Martin Hansen
>>
>>
>>

--
Best regards

Martin Hansen

Reply via email to