Is the sql-server jdbc jar in the flink dist lib folder?

On Fri, May 22, 2020 at 1:30 PM Martin Frank Hansen <m...@berlingskemedia.dk>
wrote:

> Arh ok thanks, no problem.
>
> My problem is now that nothing is sent, do I need to format it in another
> way? Or did I miss something else?
>
> I tried to include Class.forName(
> "com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.
>
> Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> No sorry, you're right. The JDBCOutputFormat should work..I get confused
>> with the Table API
>>
>> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
>> m...@berlingskemedia.dk> wrote:
>>
>>> Hi again,
>>>
>>> I am a bit confused as to why the generic jdbc connector would not work
>>> with sql-server?
>>>
>>> Can you explain a bit more?
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
>>> m...@berlingskemedia.dk>:
>>>
>>>> Hi Flavio,
>>>>
>>>> Thanks for your reply. I will try another way then.
>>>>
>>>>
>>>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
>>>> pomperma...@okkam.it>:
>>>>
>>>>> I expect you to see some exception somewhere, that sql server dialect
>>>>> is not supported yet.
>>>>>
>>>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>>>>> m...@berlingskemedia.dk> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> Thank you so much! Thought i had that import but misread it.
>>>>>>
>>>>>> The code does not give any errors, but no data is written to the sql
>>>>>> server. Can you see why that is?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>>>>>> pomperma...@okkam.it>:
>>>>>>
>>>>>>> SQL server should not be supported from what I know..for this I
>>>>>>> opened a PR[1] that I should rebase.
>>>>>>> If someone is interested in I could do it
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>>>>>
>>>>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> usually, this error occurs when people forget to add
>>>>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>>>>>>>> the
>>>>>>>> Scala macro that the DataStream API uses for extracting types.
>>>>>>>>
>>>>>>>> Can you try to call `result.toAppendStream[Row]` directly? This
>>>>>>>> should
>>>>>>>> work if you import `org.apache.flink.table.api.scala._`.
>>>>>>>>
>>>>>>>> Maybe this example helps:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but
>>>>>>>> I have
>>>>>>>> > difficulties.
>>>>>>>> >
>>>>>>>> > I get the following error could not find implicit value for
>>>>>>>> evidence
>>>>>>>> > parameter of type
>>>>>>>> >
>>>>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>>>>>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>>>>>>> > [error]                                           ^
>>>>>>>> >
>>>>>>>> > Any help is appreciated
>>>>>>>> >
>>>>>>>> > I am not sure whether my approach is correct or not but my code
>>>>>>>> is
>>>>>>>> > as follows:
>>>>>>>> >
>>>>>>>> > import java.util.Properties
>>>>>>>> >
>>>>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>>>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>>>>>>> JDBCOutputFormat}
>>>>>>>> > import
>>>>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>>>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>>>>>>> > import org.apache.flink.streaming.api.scala._
>>>>>>>> > import org.apache.flink.api.scala._
>>>>>>>> > import
>>>>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>>>>>>> SimpleStringSchema}
>>>>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>>>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>>>>>>> TableEnvironment, Types}
>>>>>>>> > import org.apache.flink.types.Row
>>>>>>>> >
>>>>>>>> >    val properties =new Properties()
>>>>>>>> >    properties.setProperty("bootstrap.servers",b_servers)
>>>>>>>> >    properties.setProperty("zookeeper.connect",zk)
>>>>>>>> >    properties.setProperty("group.id <http://group.id>",
>>>>>>>> "very_small_test")
>>>>>>>> >    properties.setProperty("ssl.endpoint.identification.algorithm
>>>>>>>> ", "")
>>>>>>>> >    properties.setProperty("security.protocol", "SSL")
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >    val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>>>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>>>>>>> properties).setStartFromTimestamp(0)
>>>>>>>> >
>>>>>>>> >    val settings =
>>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>>>> >    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>>>>>>> >
>>>>>>>> > val schema =new Schema()
>>>>>>>> >      .field("fullVisitorId",Types.STRING)
>>>>>>>> >      .field("eventTime",Types.STRING)
>>>>>>>> >      .field("eventID",Types.STRING)
>>>>>>>> >      .field("eventType",Types.STRING)
>>>>>>>> >      .field("page",Types.MAP( Types.STRING, Types.STRING))
>>>>>>>> >      .field("CustomDimensions",Types.MAP( Types.STRING,
>>>>>>>> Types.STRING))
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >    tableEnv.connect(new Kafka()
>>>>>>>> >        .version("universal")
>>>>>>>> >        .topic("very_small_test")
>>>>>>>> >        .properties(properties)
>>>>>>>> >        .startFromEarliest()
>>>>>>>> >       )
>>>>>>>> >      .withFormat(
>>>>>>>> >      new Json()
>>>>>>>> >        .failOnMissingField(false)
>>>>>>>> >        .deriveSchema()
>>>>>>>> >    )
>>>>>>>> >      .withSchema(schema)
>>>>>>>> >      .inAppendMode()
>>>>>>>> >      .registerTableSource("sql_source")
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > val sqlStatement ="SELECT * from sql_source where
>>>>>>>> CustomDimensions['pagePath'] like '%BT%'"
>>>>>>>> >
>>>>>>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>>>>>>> >
>>>>>>>> >    val dsRow =tableEnv.toAppendStream[Row](result)
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>>>>>>> >
>>>>>>>> .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>>>>>>> >      .setDBUrl("AWS url")
>>>>>>>> > .setUsername(username)
>>>>>>>> >      .setPassword(password)
>>>>>>>> >      .setQuery("INSERT INTO kafka_data_test (fullVisitorId,
>>>>>>>> EventTime, eventID) VALUES
>>>>>>>> > (?, ?, ?)")
>>>>>>>> >      .setBatchInterval(100)
>>>>>>>> >      .finish()
>>>>>>>> >
>>>>>>>> >    dsRow.writeUsingOutputFormat(jdbcOutput)
>>>>>>>> >
>>>>>>>> > tableEnv.execute("SQL test")
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> >
>>>>>>>> > *Best regards
>>>>>>>> >
>>>>>>>> > Martin Hansen*
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best regards
>>>>>>
>>>>>> Martin Hansen
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Martin Hansen
>>>>
>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Martin Hansen
>>>
>>>
>>>
>>
>> --
>> Flavio Pompermaier
>> Development Department
>>
>> OKKAM S.r.l.
>> Tel. +(39) 0461 041809
>>
>
>
> --
> Best Regards
>
> Martin Hansen
>
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Reply via email to