No sorry, you're right. The JDBCOutputFormat should work..I get confused
with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
m...@berlingskemedia.dk> wrote:

> Hi again,
>
> I am a bit confused as to why the generic jdbc connector would not work
> with sql-server?
>
> Can you explain a bit more?
>
>
> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
> m...@berlingskemedia.dk>:
>
>> Hi Flavio,
>>
>> Thanks for your reply. I will try another way then.
>>
>>
>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> I expect you to see some exception somewhere, that sql server dialect is
>>> not supported yet.
>>>
>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>>> m...@berlingskemedia.dk> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Thank you so much! Thought i had that import but misread it.
>>>>
>>>> The code does not give any errors, but no data is written to the sql
>>>> server. Can you see why that is?
>>>>
>>>>
>>>>
>>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>>>> pomperma...@okkam.it>:
>>>>
>>>>> SQL server should not be supported from what I know..for this I
>>>>> opened a PR[1] that I should rebase.
>>>>> If someone is interested in I could do it
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>>>
>>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> usually, this error occurs when people forget to add
>>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>>>>>> the
>>>>>> Scala macro that the DataStream API uses for extracting types.
>>>>>>
>>>>>> Can you try to call `result.toAppendStream[Row]` directly? This
>>>>>> should
>>>>>> work if you import `org.apache.flink.table.api.scala._`.
>>>>>>
>>>>>> Maybe this example helps:
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>>>>>> have
>>>>>> > difficulties.
>>>>>> >
>>>>>> > I get the following error could not find implicit value for
>>>>>> evidence
>>>>>> > parameter of type
>>>>>> >
>>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>>>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>>>>> > [error]                                           ^
>>>>>> >
>>>>>> > Any help is appreciated
>>>>>> >
>>>>>> > I am not sure whether my approach is correct or not but my code is
>>>>>> > as follows:
>>>>>> >
>>>>>> > import java.util.Properties
>>>>>> >
>>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>>>>> JDBCOutputFormat}
>>>>>> > import
>>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>>>>> > import org.apache.flink.streaming.api.scala._
>>>>>> > import org.apache.flink.api.scala._
>>>>>> > import
>>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>>>>> SimpleStringSchema}
>>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>>>>> TableEnvironment, Types}
>>>>>> > import org.apache.flink.types.Row
>>>>>> >
>>>>>> >    val properties =new Properties()
>>>>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>>>>> >    properties.setProperty("zookeeper.connect",zk)
>>>>>> >    properties.setProperty("group.id <http://group.id>",
>>>>>> "very_small_test")
>>>>>> >    properties.setProperty("ssl.endpoint.identification.algorithm ",
>>>>>> "")
>>>>>> >    properties.setProperty("security.protocol", "SSL")
>>>>>> >
>>>>>> >
>>>>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>>>>> properties).setStartFromTimestamp(0)
>>>>>> >
>>>>>> >    val settings =
>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>>>>> >
>>>>>> > val schema =new Schema()
>>>>>> >      .field("fullVisitorId",Types.STRING)
>>>>>> >      .field("eventTime",Types.STRING)
>>>>>> >      .field("eventID",Types.STRING)
>>>>>> >      .field("eventType",Types.STRING)
>>>>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>>>>> >      .field("CustomDimensions",Types.MAP( Types.STRING,
>>>>>> Types.STRING))
>>>>>> >
>>>>>> >
>>>>>> >    tableEnv.connect(new Kafka()
>>>>>> >        .version("universal")
>>>>>> >        .topic("very_small_test")
>>>>>> >        .properties(properties)
>>>>>> >        .startFromEarliest()
>>>>>> >       )
>>>>>> >      .withFormat(
>>>>>> >      new Json()
>>>>>> >        .failOnMissingField(false)
>>>>>> >        .deriveSchema()
>>>>>> >    )
>>>>>> >      .withSchema(schema)
>>>>>> >      .inAppendMode()
>>>>>> >      .registerTableSource("sql_source")
>>>>>> >
>>>>>> >
>>>>>> > val sqlStatement ="SELECT * from sql_source where
>>>>>> CustomDimensions['pagePath'] like '%BT%'"
>>>>>> >
>>>>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>>>>> >
>>>>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>>>>> >
>>>>>> >
>>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>>>>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>>>>> >      .setDBUrl("AWS url")
>>>>>> > .setUsername(username)
>>>>>> >      .setPassword(password)
>>>>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId,
>>>>>> EventTime, eventID) VALUES
>>>>>> > (?, ?, ?)")
>>>>>> >      .setBatchInterval(100)
>>>>>> >      .finish()
>>>>>> >
>>>>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>>>>> >
>>>>>> > tableEnv.execute("SQL test")
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> >
>>>>>> > *Best regards
>>>>>> >
>>>>>> > Martin Hansen*
>>>>>> >
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Best regards
>>>>
>>>> Martin Hansen
>>>>
>>>>
>>>>
>>
>> --
>> Best regards
>>
>> Martin Hansen
>>
>>
>>
>
> --
> Best regards
>
> Martin Hansen
>
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Reply via email to