Great to hear that it is now working and thanks for letting the community know :-)
On Wed, Feb 17, 2021 at 2:48 PM Clay Teeter <[email protected]> wrote: > Yep, that was it! thanks! And to complete the thread, this is the working > revision. > > package com.maalka.flink.sinks > > import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} > import com.typesafe.scalalogging.LazyLogging > import org.apache.flink.api.common.functions.RuntimeContext > import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, > JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} > import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, > JdbcBatchingOutputFormat} > import > org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider > import > org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor > import org.apache.flink.connector.jdbc.internal.options.JdbcOptions > import org.apache.flink.streaming.api.functions.sink.SinkFunction > import org.apache.flink.streaming.api.scala._ > import com.maalka.flink.typeInformation.Implicits._ > > import java.sql.PreparedStatement > import java.util.function.Function > > object TestSink extends LazyLogging { > // START HERE > def process(messageStream: DataStream[MaalkaDataRecord], > signableUpdateStream: Option[DataStream[SignableUpdate]], > streamExecutionEnvironment: StreamExecutionEnvironment): Unit = > { > > insertAnalyticData("raw", > "insert into analytic_table ... ", > messageStream.map(_ => "A")) > } > > // it is required that you explicitly create a new JDBCStatementBuilder > val statementBuilder: JdbcStatementBuilder[String] = > new JdbcStatementBuilder[String] { > override def accept(ps: PreparedStatement, t: String): Unit = { > ps.setString(1, t) > } > } > > > private def insertAnalyticData( > interval: String, > insertStatement: String, > messageStream: DataStream[String]): Unit = { > val connectionString = s"jdbc:postgresql://localhost/db" > val sink: SinkFunction[String] = JdbcSink.sink( > insertStatement, > statementBuilder, > JdbcExecutionOptions.builder() > .withBatchIntervalMs(1000) > .withBatchSize(1000) > .withMaxRetries(10) > .build, > JdbcOptions.builder() > .setDBUrl(connectionString) > .setTableName("analytic_table") > .build > ) > > messageStream > .addSink(sink) > } > } > > > > > On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann <[email protected]> > wrote: > >> I am not 100% sure but maybe (_, _) => {} captures a reference to object >> TestSink which is not serializable. Maybe try to simply define a no >> op JdbcStatementBuilder and pass such an instance to JdbcSink.sink(). >> >> Cheers, >> Till >> >> On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[email protected]> >> wrote: >> >>> Ok, this is about as simple as I can get. >>> >>> package com.maalka.flink.sinks >>> >>> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} >>> import com.typesafe.scalalogging.LazyLogging >>> import org.apache.flink.api.common.functions.RuntimeContext >>> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, >>> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} >>> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, >>> JdbcBatchingOutputFormat} >>> import >>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider >>> import >>> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor >>> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions >>> import org.apache.flink.streaming.api.functions.sink.SinkFunction >>> import org.apache.flink.streaming.api.scala._ >>> import com.maalka.flink.typeInformation.Implicits._ >>> >>> import java.util.function.Function >>> >>> object TestSink extends LazyLogging { >>> // START HERE >>> def process(messageStream: DataStream[MaalkaDataRecord], >>> signableUpdateStream: Option[DataStream[SignableUpdate]], >>> streamExecutionEnvironment: StreamExecutionEnvironment): Unit >>> = { >>> >>> insertAnalyticData("raw", >>> "insert into analytic_table ... ", >>> messageStream.map(_ => "A")) >>> } >>> >>> private def insertAnalyticData( >>> interval: String, >>> insertStatement: String, >>> messageStream: DataStream[String]): Unit = { >>> val connectionString = s"jdbc:postgresql://localhost/db" >>> val sink: SinkFunction[String] = JdbcSink.sink( >>> insertStatement, >>> (_, _) => {}, // I have a feeling that this is the lambda that >>> can't serialize >>> JdbcExecutionOptions.builder() >>> .withBatchIntervalMs(1000) >>> .withBatchSize(1000) >>> .withMaxRetries(10) >>> .build, >>> JdbcOptions.builder() >>> .setDBUrl(connectionString) >>> .setTableName("analytic_table") >>> .build >>> ) >>> >>> messageStream >>> .addSink(sink) >>> } >>> } >>> >>> >>> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]> >>> wrote: >>> >>>> Hi Clay, >>>> >>>> could you maybe share the source code of >>>> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this >>>> sink uses a lambda which is not serializable. Maybe it holds a reference to >>>> some non Serializable class as part of its closure. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]> >>>> wrote: >>>> >>>>> Thanks Till, the tickets and links were immensely useful. With that i >>>>> was able to make progress and even get things to compile. However, when i >>>>> run things a serializable exception is thrown. (see below) >>>>> >>>>> .addSink(JdbcSink.sink[SignableTableSchema]( >>>>>> addIntervalToInsertStatement(insertStatement, interval), >>>>>> (ps: PreparedStatement, rd: SignableTableSchema) => { >>>>>> ps.setString(1, rd.data_processing_id) >>>>>> ps.setTimestamp(2, rd.crc) >>>>>> ps.setString(3, rd.command) >>>>>> ps.setString(4, rd.result) >>>>>> ps.setOptionalString(5, rd.message) >>>>>> ps.setString(6, rd.arguments) >>>>>> ps.setOptionalString(7, rd.validatorUUID) >>>>>> }, >>>>>> getJdbcExecutionOptions, >>>>>> getJdbcOptions(interval, insertStatement) // <-- This is line 376 >>>>>> )) >>>>>> >>>>>> Where i set the executionOptions to behave in a bachfull way. >>>>> >>>>> def getJdbcExecutionOptions: JdbcExecutionOptions = { >>>>> JdbcExecutionOptions.builder() >>>>> .withBatchIntervalMs(1000) >>>>> .withBatchSize(1000) >>>>> .withMaxRetries(10) >>>>> .build >>>>> } >>>>> >>>>> >>>>> Any suggestions? >>>>> >>>>> [info] org.apache.flink.api.common.InvalidProgramException: The >>>>>> implementation of the AbstractJdbcOutputFormat is not serializable. The >>>>>> object probably contains or references non serializable fields. >>>>>> [info] at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) >>>>>> [info] at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) >>>>>> [info] at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110) >>>>>> [info] at >>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376) >>>>>> [info] at >>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262) >>>>>> [info] at >>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250) >>>>>> [info] ... >>>>>> [info] Cause: java.io.NotSerializableException: Non-serializable >>>>>> lambda >>>>>> [info] at >>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown >>>>>> Source) >>>>>> [info] at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> [info] at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> [info] at >>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> [info] at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>> [info] at >>>>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) >>>>>> [info] at >>>>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) >>>>>> [info] at >>>>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) >>>>>> [info] at >>>>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) >>>>>> [info] at >>>>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) >>>>>> >>>>> >>>>> >>>>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Clay, >>>>>> >>>>>> I am not a Table API expert but let me try to answer your question: >>>>>> >>>>>> With FLINK-17748 [1] the community removed the registerTableSink in >>>>>> favour of the connect API. The connect API has been deprecated [2] >>>>>> because >>>>>> it was not well maintained. Now the recommended way for specifying sinks >>>>>> is >>>>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on >>>>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide >>>>>> on how to register your jdbc table sink. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-17748 >>>>>> [2] https://issues.apache.org/jira/browse/FLINK-18416 >>>>>> [3] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hey all. Hopefully this is an easy question. I'm porting my JDBC >>>>>>> postgres sink from 1.10 to 1.12 >>>>>>> >>>>>>> I'm using: >>>>>>> * StreamTableEnvironment >>>>>>> * JdbcUpsertTableSink >>>>>>> >>>>>>> What I'm having difficulty with is how to register the sink with the >>>>>>> streaming table environment. >>>>>>> >>>>>>> In 1.10: >>>>>>> >>>>>>> tableEnv.registerTableSink( >>>>>>>> s"${interval}_maalka_jdbc_output_table", >>>>>>>> jdbcTableSink) >>>>>>> >>>>>>> >>>>>>> This method doesn't exist in 1.12, what is the equivalent? >>>>>>> >>>>>>> Thanks! >>>>>>> Clay >>>>>>> >>>>>>>
