I am not 100% sure but maybe (_, _) => {} captures a reference to object
TestSink which is not serializable. Maybe try to simply define a no
op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().

Cheers,
Till

On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[email protected]> wrote:

> Ok, this is about as simple as I can get.
>
> package com.maalka.flink.sinks
>
> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
> import com.typesafe.scalalogging.LazyLogging
> import org.apache.flink.api.common.functions.RuntimeContext
> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
> JdbcBatchingOutputFormat}
> import 
> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
> import 
> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala._
> import com.maalka.flink.typeInformation.Implicits._
>
> import java.util.function.Function
>
> object TestSink extends LazyLogging {
>   // START HERE
>   def process(messageStream: DataStream[MaalkaDataRecord],
>               signableUpdateStream: Option[DataStream[SignableUpdate]],
>               streamExecutionEnvironment: StreamExecutionEnvironment): Unit = 
> {
>
>     insertAnalyticData("raw",
>       "insert into analytic_table ... ",
>       messageStream.map(_ => "A"))
>   }
>
>   private def insertAnalyticData(
>                           interval: String,
>                           insertStatement: String,
>                           messageStream: DataStream[String]): Unit = {
>     val connectionString = s"jdbc:postgresql://localhost/db"
>     val sink: SinkFunction[String] = JdbcSink.sink(
>       insertStatement,
>       (_, _) => {},    // I have a feeling that this is the lambda that can't 
> serialize
>       JdbcExecutionOptions.builder()
>         .withBatchIntervalMs(1000)
>         .withBatchSize(1000)
>         .withMaxRetries(10)
>         .build,
>       JdbcOptions.builder()
>         .setDBUrl(connectionString)
>         .setTableName("analytic_table")
>         .build
>     )
>
>     messageStream
>       .addSink(sink)
>   }
> }
>
>
> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]>
> wrote:
>
>> Hi Clay,
>>
>> could you maybe share the source code of
>> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
>> sink uses a lambda which is not serializable. Maybe it holds a reference to
>> some non Serializable class as part of its closure.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]>
>> wrote:
>>
>>> Thanks Till, the tickets and links were immensely useful.  With that i
>>> was able to make progress and even get things to compile.  However, when i
>>> run things a serializable exception is thrown. (see below)
>>>
>>> .addSink(JdbcSink.sink[SignableTableSchema](
>>>>   addIntervalToInsertStatement(insertStatement, interval),
>>>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>>>>     ps.setString(1, rd.data_processing_id)
>>>>     ps.setTimestamp(2, rd.crc)
>>>>     ps.setString(3, rd.command)
>>>>     ps.setString(4, rd.result)
>>>>     ps.setOptionalString(5, rd.message)
>>>>     ps.setString(6, rd.arguments)
>>>>     ps.setOptionalString(7, rd.validatorUUID)
>>>>   },
>>>>   getJdbcExecutionOptions,
>>>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>>>> ))
>>>>
>>>>  Where i set the executionOptions to behave in a bachfull way.
>>>
>>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>>>   JdbcExecutionOptions.builder()
>>>     .withBatchIntervalMs(1000)
>>>     .withBatchSize(1000)
>>>     .withMaxRetries(10)
>>>     .build
>>> }
>>>
>>>
>>> Any suggestions?
>>>
>>> [info]   org.apache.flink.api.common.InvalidProgramException: The
>>>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>>>> object probably contains or references non serializable fields.
>>>> [info]   at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>>>> [info]   at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>>>> [info]   at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>>>> [info]   at
>>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>>>> [info]   at
>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>>>> [info]   at
>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>>>> [info]   ...
>>>> [info]   Cause: java.io.NotSerializableException: Non-serializable
>>>> lambda
>>>> [info]   at
>>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
>>>> Source)
>>>> [info]   at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> [info]   at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> [info]   at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> [info]   at
>>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>>>> [info]   at
>>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>>>> [info]   at
>>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>>>> [info]   at
>>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>>>> [info]   at
>>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>>>
>>>
>>>
>>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]>
>>> wrote:
>>>
>>>> Hi Clay,
>>>>
>>>> I am not a Table API expert but let me try to answer your question:
>>>>
>>>> With FLINK-17748 [1] the community removed the registerTableSink in
>>>> favour of the connect API. The connect API has been deprecated [2] because
>>>> it was not well maintained. Now the recommended way for specifying sinks is
>>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>>>> on how to register your jdbc table sink.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>>>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]>
>>>> wrote:
>>>>
>>>>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>>>>> postgres sink from 1.10 to 1.12
>>>>>
>>>>> I'm using:
>>>>> * StreamTableEnvironment
>>>>> * JdbcUpsertTableSink
>>>>>
>>>>> What I'm having difficulty with is how to register the sink with the
>>>>> streaming table environment.
>>>>>
>>>>> In 1.10:
>>>>>
>>>>>     tableEnv.registerTableSink(
>>>>>>       s"${interval}_maalka_jdbc_output_table",
>>>>>>       jdbcTableSink)
>>>>>
>>>>>
>>>>> This method doesn't exist in 1.12, what is the equivalent?
>>>>>
>>>>> Thanks!
>>>>> Clay
>>>>>
>>>>>

Reply via email to