Arh ok thanks, no problem.

My problem is now that nothing is sent, do I need to format it in another
way? Or did I miss something else?

I tried to include Class.forName(
"com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.

Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> No sorry, you're right. The JDBCOutputFormat should work..I get confused
> with the Table API
>
> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi again,
>>
>> I am a bit confused as to why the generic jdbc connector would not work
>> with sql-server?
>>
>> Can you explain a bit more?
>>
>>
>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
>> m...@berlingskemedia.dk>:
>>
>>> Hi Flavio,
>>>
>>> Thanks for your reply. I will try another way then.
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
>>> pomperma...@okkam.it>:
>>>
>>>> I expect you to see some exception somewhere, that sql server dialect
>>>> is not supported yet.
>>>>
>>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>>>> m...@berlingskemedia.dk> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Thank you so much! Thought i had that import but misread it.
>>>>>
>>>>> The code does not give any errors, but no data is written to the sql
>>>>> server. Can you see why that is?
>>>>>
>>>>>
>>>>>
>>>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>>>>> pomperma...@okkam.it>:
>>>>>
>>>>>> SQL server should not be supported from what I know..for this I
>>>>>> opened a PR[1] that I should rebase.
>>>>>> If someone is interested in I could do it
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>>>>
>>>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> usually, this error occurs when people forget to add
>>>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>>>>>>> the
>>>>>>> Scala macro that the DataStream API uses for extracting types.
>>>>>>>
>>>>>>> Can you try to call `result.toAppendStream[Row]` directly? This
>>>>>>> should
>>>>>>> work if you import `org.apache.flink.table.api.scala._`.
>>>>>>>
>>>>>>> Maybe this example helps:
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>>
>>>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but
>>>>>>> I have
>>>>>>> > difficulties.
>>>>>>> >
>>>>>>> > I get the following error could not find implicit value for
>>>>>>> evidence
>>>>>>> > parameter of type
>>>>>>> >
>>>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>>>>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>>>>>> > [error]                                           ^
>>>>>>> >
>>>>>>> > Any help is appreciated
>>>>>>> >
>>>>>>> > I am not sure whether my approach is correct or not but my code is
>>>>>>> > as follows:
>>>>>>> >
>>>>>>> > import java.util.Properties
>>>>>>> >
>>>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>>>>>> JDBCOutputFormat}
>>>>>>> > import
>>>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>>>>>> > import org.apache.flink.streaming.api.scala._
>>>>>>> > import org.apache.flink.api.scala._
>>>>>>> > import
>>>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>>>>>> SimpleStringSchema}
>>>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>>>>>> TableEnvironment, Types}
>>>>>>> > import org.apache.flink.types.Row
>>>>>>> >
>>>>>>> >    val properties =new Properties()
>>>>>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>>>>>> >    properties.setProperty("zookeeper.connect",zk)
>>>>>>> >    properties.setProperty("group.id <http://group.id>",
>>>>>>> "very_small_test")
>>>>>>> >    properties.setProperty("ssl.endpoint.identification.algorithm
>>>>>>> ", "")
>>>>>>> >    properties.setProperty("security.protocol", "SSL")
>>>>>>> >
>>>>>>> >
>>>>>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>>>>>> properties).setStartFromTimestamp(0)
>>>>>>> >
>>>>>>> >    val settings =
>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>>>>>> >
>>>>>>> > val schema =new Schema()
>>>>>>> >      .field("fullVisitorId",Types.STRING)
>>>>>>> >      .field("eventTime",Types.STRING)
>>>>>>> >      .field("eventID",Types.STRING)
>>>>>>> >      .field("eventType",Types.STRING)
>>>>>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>>>>>> >      .field("CustomDimensions",Types.MAP( Types.STRING,
>>>>>>> Types.STRING))
>>>>>>> >
>>>>>>> >
>>>>>>> >    tableEnv.connect(new Kafka()
>>>>>>> >        .version("universal")
>>>>>>> >        .topic("very_small_test")
>>>>>>> >        .properties(properties)
>>>>>>> >        .startFromEarliest()
>>>>>>> >       )
>>>>>>> >      .withFormat(
>>>>>>> >      new Json()
>>>>>>> >        .failOnMissingField(false)
>>>>>>> >        .deriveSchema()
>>>>>>> >    )
>>>>>>> >      .withSchema(schema)
>>>>>>> >      .inAppendMode()
>>>>>>> >      .registerTableSource("sql_source")
>>>>>>> >
>>>>>>> >
>>>>>>> > val sqlStatement ="SELECT * from sql_source where
>>>>>>> CustomDimensions['pagePath'] like '%BT%'"
>>>>>>> >
>>>>>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>>>>>> >
>>>>>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>>>>>> >
>>>>>>> >
>>>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>>>>>> >      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>>>>>> >      .setDBUrl("AWS url")
>>>>>>> > .setUsername(username)
>>>>>>> >      .setPassword(password)
>>>>>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId,
>>>>>>> EventTime, eventID) VALUES
>>>>>>> > (?, ?, ?)")
>>>>>>> >      .setBatchInterval(100)
>>>>>>> >      .finish()
>>>>>>> >
>>>>>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>>>>>> >
>>>>>>> > tableEnv.execute("SQL test")
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> >
>>>>>>> > *Best regards
>>>>>>> >
>>>>>>> > Martin Hansen*
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best regards
>>>>>
>>>>> Martin Hansen
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Martin Hansen
>>>
>>>
>>>
>>
>> --
>> Best regards
>>
>> Martin Hansen
>>
>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


-- 
Best Regards

Martin Hansen

Reply via email to