Hi again,

I am a bit confused as to why the generic jdbc connector would not work
with sql-server?

Can you explain a bit more?


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
m...@berlingskemedia.dk>:

> Hi Flavio,
>
> Thanks for your reply. I will try another way then.
>
>
> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> I expect you to see some exception somewhere, that sql server dialect is
>> not supported yet.
>>
>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>> m...@berlingskemedia.dk> wrote:
>>
>>> Hi Flavio,
>>>
>>> Thank you so much! Thought i had that import but misread it.
>>>
>>> The code does not give any errors, but no data is written to the sql
>>> server. Can you see why that is?
>>>
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>>> pomperma...@okkam.it>:
>>>
>>>> SQL server should not be supported from what I know..for this I
>>>> opened a PR[1] that I should rebase.
>>>> If someone is interested in I could do it
>>>>
>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>>
>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> usually, this error occurs when people forget to add
>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>>>>> the
>>>>> Scala macro that the DataStream API uses for extracting types.
>>>>>
>>>>> Can you try to call `result.toAppendStream[Row]` directly? This should
>>>>> work if you import `org.apache.flink.table.api.scala._`.
>>>>>
>>>>> Maybe this example helps:
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>>>>> have
>>>>> > difficulties.
>>>>> >
>>>>> > I get the following error could not find implicit value for evidence
>>>>> > parameter of type
>>>>> >
>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>>>> > [error]                                           ^
>>>>> >
>>>>> > Any help is appreciated
>>>>> >
>>>>> > I am not sure whether my approach is correct or not but my code is
>>>>> > as follows:
>>>>> >
>>>>> > import java.util.Properties
>>>>> >
>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>>>> JDBCOutputFormat}
>>>>> > import
>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>>>> > import org.apache.flink.streaming.api.scala._
>>>>> > import org.apache.flink.api.scala._
>>>>> > import
>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>>>> SimpleStringSchema}
>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>>>> TableEnvironment, Types}
>>>>> > import org.apache.flink.types.Row
>>>>> >
>>>>> >    val properties =new Properties()
>>>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>>>> >    properties.setProperty("zookeeper.connect",zk)
>>>>> >    properties.setProperty("group.id <http://group.id>",
>>>>> "very_small_test")
>>>>> >    properties.setProperty("ssl.endpoint.identification.algorithm ",
>>>>> "")
>>>>> >    properties.setProperty("security.protocol", "SSL")
>>>>> >
>>>>> >
>>>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>>>> properties).setStartFromTimestamp(0)
>>>>> >
>>>>> >    val settings =
>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>>>> >
>>>>> > val schema =new Schema()
>>>>> >      .field("fullVisitorId",Types.STRING)
>>>>> >      .field("eventTime",Types.STRING)
>>>>> >      .field("eventID",Types.STRING)
>>>>> >      .field("eventType",Types.STRING)
>>>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>>>> >      .field("CustomDimensions",Types.MAP( Types.STRING,
>>>>> Types.STRING))
>>>>> >
>>>>> >
>>>>> >    tableEnv.connect(new Kafka()
>>>>> >        .version("universal")
>>>>> >        .topic("very_small_test")
>>>>> >        .properties(properties)
>>>>> >        .startFromEarliest()
>>>>> >       )
>>>>> >      .withFormat(
>>>>> >      new Json()
>>>>> >        .failOnMissingField(false)
>>>>> >        .deriveSchema()
>>>>> >    )
>>>>> >      .withSchema(schema)
>>>>> >      .inAppendMode()
>>>>> >      .registerTableSource("sql_source")
>>>>> >
>>>>> >
>>>>> > val sqlStatement ="SELECT * from sql_source where
>>>>> CustomDimensions['pagePath'] like '%BT%'"
>>>>> >
>>>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>>>> >
>>>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>>>> >
>>>>> >
>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>>>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>>>> >      .setDBUrl("AWS url")
>>>>> > .setUsername(username)
>>>>> >      .setPassword(password)
>>>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId,
>>>>> EventTime, eventID) VALUES
>>>>> > (?, ?, ?)")
>>>>> >      .setBatchInterval(100)
>>>>> >      .finish()
>>>>> >
>>>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>>>> >
>>>>> > tableEnv.execute("SQL test")
>>>>> >
>>>>> >
>>>>> > --
>>>>> >
>>>>> > *Best regards
>>>>> >
>>>>> > Martin Hansen*
>>>>> >
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best regards
>>>
>>> Martin Hansen
>>>
>>>
>>>
>
> --
> Best regards
>
> Martin Hansen
>
>
>

--
Best regards

Martin Hansen

Reply via email to