Is the sql-server jdbc jar in the flink dist lib folder? On Fri, May 22, 2020 at 1:30 PM Martin Frank Hansen <m...@berlingskemedia.dk> wrote:
> Arh ok thanks, no problem. > > My problem is now that nothing is sent, do I need to format it in another > way? Or did I miss something else? > > I tried to include Class.forName( > "com.microsoft.sqlserver.jdbc.SQLServerDriver") but that didn't work. > > Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier < > pomperma...@okkam.it>: > >> No sorry, you're right. The JDBCOutputFormat should work..I get confused >> with the Table API >> >> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen < >> m...@berlingskemedia.dk> wrote: >> >>> Hi again, >>> >>> I am a bit confused as to why the generic jdbc connector would not work >>> with sql-server? >>> >>> Can you explain a bit more? >>> >>> >>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen < >>> m...@berlingskemedia.dk>: >>> >>>> Hi Flavio, >>>> >>>> Thanks for your reply. I will try another way then. >>>> >>>> >>>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier < >>>> pomperma...@okkam.it>: >>>> >>>>> I expect you to see some exception somewhere, that sql server dialect >>>>> is not supported yet. >>>>> >>>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < >>>>> m...@berlingskemedia.dk> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> Thank you so much! Thought i had that import but misread it. >>>>>> >>>>>> The code does not give any errors, but no data is written to the sql >>>>>> server. Can you see why that is? >>>>>> >>>>>> >>>>>> >>>>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < >>>>>> pomperma...@okkam.it>: >>>>>> >>>>>>> SQL server should not be supported from what I know..for this I >>>>>>> opened a PR[1] that I should rebase. >>>>>>> If someone is interested in I could do it >>>>>>> >>>>>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >>>>>>> >>>>>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Martin, >>>>>>>> >>>>>>>> usually, this error occurs when people forget to add >>>>>>>> `org.apache.flink.api.scala._` to their imports. It is triggered by >>>>>>>> the >>>>>>>> Scala macro that the DataStream API uses for extracting types. >>>>>>>> >>>>>>>> Can you try to call `result.toAppendStream[Row]` directly? This >>>>>>>> should >>>>>>>> work if you import `org.apache.flink.table.api.scala._`. >>>>>>>> >>>>>>>> Maybe this example helps: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>>>>>>> >>>>>>>> Regards, >>>>>>>> Timo >>>>>>>> >>>>>>>> >>>>>>>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>>>>>>> > Hi, >>>>>>>> > >>>>>>>> > I am trying to write input from Kafka to a SQL server on AWS, but >>>>>>>> I have >>>>>>>> > difficulties. >>>>>>>> > >>>>>>>> > I get the following error could not find implicit value for >>>>>>>> evidence >>>>>>>> > parameter of type >>>>>>>> > >>>>>>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>>>>>>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>>>>>>> > [error] ^ >>>>>>>> > >>>>>>>> > Any help is appreciated >>>>>>>> > >>>>>>>> > I am not sure whether my approach is correct or not but my code >>>>>>>> is >>>>>>>> > as follows: >>>>>>>> > >>>>>>>> > import java.util.Properties >>>>>>>> > >>>>>>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>>>>>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>>>>>>> JDBCOutputFormat} >>>>>>>> > import >>>>>>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>>>>>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>>>>>>> > import org.apache.flink.streaming.api.scala._ >>>>>>>> > import org.apache.flink.api.scala._ >>>>>>>> > import >>>>>>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>>>>>>> SimpleStringSchema} >>>>>>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>>>>>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>>>>>>> TableEnvironment, Types} >>>>>>>> > import org.apache.flink.types.Row >>>>>>>> > >>>>>>>> > val properties =new Properties() >>>>>>>> > properties.setProperty("bootstrap.servers",b_servers) >>>>>>>> > properties.setProperty("zookeeper.connect",zk) >>>>>>>> > properties.setProperty("group.id <http://group.id>", >>>>>>>> "very_small_test") >>>>>>>> > properties.setProperty("ssl.endpoint.identification.algorithm >>>>>>>> ", "") >>>>>>>> > properties.setProperty("security.protocol", "SSL") >>>>>>>> > >>>>>>>> > >>>>>>>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>>>>>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>>>>>>> properties).setStartFromTimestamp(0) >>>>>>>> > >>>>>>>> > val settings = >>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>>>>>>> > >>>>>>>> > val schema =new Schema() >>>>>>>> > .field("fullVisitorId",Types.STRING) >>>>>>>> > .field("eventTime",Types.STRING) >>>>>>>> > .field("eventID",Types.STRING) >>>>>>>> > .field("eventType",Types.STRING) >>>>>>>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>>>>>>> > .field("CustomDimensions",Types.MAP( Types.STRING, >>>>>>>> Types.STRING)) >>>>>>>> > >>>>>>>> > >>>>>>>> > tableEnv.connect(new Kafka() >>>>>>>> > .version("universal") >>>>>>>> > .topic("very_small_test") >>>>>>>> > .properties(properties) >>>>>>>> > .startFromEarliest() >>>>>>>> > ) >>>>>>>> > .withFormat( >>>>>>>> > new Json() >>>>>>>> > .failOnMissingField(false) >>>>>>>> > .deriveSchema() >>>>>>>> > ) >>>>>>>> > .withSchema(schema) >>>>>>>> > .inAppendMode() >>>>>>>> > .registerTableSource("sql_source") >>>>>>>> > >>>>>>>> > >>>>>>>> > val sqlStatement ="SELECT * from sql_source where >>>>>>>> CustomDimensions['pagePath'] like '%BT%'" >>>>>>>> > >>>>>>>> > val result =tableEnv.sqlQuery(sqlStatement) >>>>>>>> > >>>>>>>> > val dsRow =tableEnv.toAppendStream[Row](result) >>>>>>>> > >>>>>>>> > >>>>>>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>>>>>>> > >>>>>>>> .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>>>>>>> > .setDBUrl("AWS url") >>>>>>>> > .setUsername(username) >>>>>>>> > .setPassword(password) >>>>>>>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, >>>>>>>> EventTime, eventID) VALUES >>>>>>>> > (?, ?, ?)") >>>>>>>> > .setBatchInterval(100) >>>>>>>> > .finish() >>>>>>>> > >>>>>>>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>>>>>>> > >>>>>>>> > tableEnv.execute("SQL test") >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > >>>>>>>> > *Best regards >>>>>>>> > >>>>>>>> > Martin Hansen* >>>>>>>> > >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Best regards >>>>>> >>>>>> Martin Hansen >>>>>> >>>>>> >>>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Martin Hansen >>>> >>>> >>>> >>> >>> -- >>> Best regards >>> >>> Martin Hansen >>> >>> >>> >> >> -- >> Flavio Pompermaier >> Development Department >> >> OKKAM S.r.l. >> Tel. +(39) 0461 041809 >> > > > -- > Best Regards > > Martin Hansen > > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809