Yep, that was it!  thanks! And to complete the thread, this is the working
revision.

package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions,
JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction,
JdbcBatchingOutputFormat}
import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.sql.PreparedStatement
import java.util.function.Function

object TestSink extends LazyLogging {
  // START HERE
  def process(messageStream: DataStream[MaalkaDataRecord],
              signableUpdateStream: Option[DataStream[SignableUpdate]],
              streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

    insertAnalyticData("raw",
      "insert into analytic_table ... ",
      messageStream.map(_ => "A"))
  }

  // it is required that you explicitly create a new JDBCStatementBuilder
  val statementBuilder: JdbcStatementBuilder[String] =
    new JdbcStatementBuilder[String] {
      override def accept(ps: PreparedStatement, t: String): Unit = {
        ps.setString(1, t)
      }
    }


  private def insertAnalyticData(
                          interval: String,
                          insertStatement: String,
                          messageStream: DataStream[String]): Unit = {
    val connectionString = s"jdbc:postgresql://localhost/db"
    val sink: SinkFunction[String] = JdbcSink.sink(
      insertStatement,
      statementBuilder,
      JdbcExecutionOptions.builder()
        .withBatchIntervalMs(1000)
        .withBatchSize(1000)
        .withMaxRetries(10)
        .build,
      JdbcOptions.builder()
        .setDBUrl(connectionString)
        .setTableName("analytic_table")
        .build
    )

    messageStream
      .addSink(sink)
  }
}




On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann <[email protected]> wrote:

> I am not 100% sure but maybe (_, _) => {} captures a reference to object
> TestSink which is not serializable. Maybe try to simply define a no
> op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().
>
> Cheers,
> Till
>
> On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[email protected]>
> wrote:
>
>> Ok, this is about as simple as I can get.
>>
>> package com.maalka.flink.sinks
>>
>> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
>> import com.typesafe.scalalogging.LazyLogging
>> import org.apache.flink.api.common.functions.RuntimeContext
>> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
>> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
>> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
>> JdbcBatchingOutputFormat}
>> import 
>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
>> import 
>> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
>> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>> import org.apache.flink.streaming.api.scala._
>> import com.maalka.flink.typeInformation.Implicits._
>>
>> import java.util.function.Function
>>
>> object TestSink extends LazyLogging {
>>   // START HERE
>>   def process(messageStream: DataStream[MaalkaDataRecord],
>>               signableUpdateStream: Option[DataStream[SignableUpdate]],
>>               streamExecutionEnvironment: StreamExecutionEnvironment): Unit 
>> = {
>>
>>     insertAnalyticData("raw",
>>       "insert into analytic_table ... ",
>>       messageStream.map(_ => "A"))
>>   }
>>
>>   private def insertAnalyticData(
>>                           interval: String,
>>                           insertStatement: String,
>>                           messageStream: DataStream[String]): Unit = {
>>     val connectionString = s"jdbc:postgresql://localhost/db"
>>     val sink: SinkFunction[String] = JdbcSink.sink(
>>       insertStatement,
>>       (_, _) => {},    // I have a feeling that this is the lambda that 
>> can't serialize
>>       JdbcExecutionOptions.builder()
>>         .withBatchIntervalMs(1000)
>>         .withBatchSize(1000)
>>         .withMaxRetries(10)
>>         .build,
>>       JdbcOptions.builder()
>>         .setDBUrl(connectionString)
>>         .setTableName("analytic_table")
>>         .build
>>     )
>>
>>     messageStream
>>       .addSink(sink)
>>   }
>> }
>>
>>
>> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[email protected]>
>> wrote:
>>
>>> Hi Clay,
>>>
>>> could you maybe share the source code of
>>> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
>>> sink uses a lambda which is not serializable. Maybe it holds a reference to
>>> some non Serializable class as part of its closure.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[email protected]>
>>> wrote:
>>>
>>>> Thanks Till, the tickets and links were immensely useful.  With that i
>>>> was able to make progress and even get things to compile.  However, when i
>>>> run things a serializable exception is thrown. (see below)
>>>>
>>>> .addSink(JdbcSink.sink[SignableTableSchema](
>>>>>   addIntervalToInsertStatement(insertStatement, interval),
>>>>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>>>>>     ps.setString(1, rd.data_processing_id)
>>>>>     ps.setTimestamp(2, rd.crc)
>>>>>     ps.setString(3, rd.command)
>>>>>     ps.setString(4, rd.result)
>>>>>     ps.setOptionalString(5, rd.message)
>>>>>     ps.setString(6, rd.arguments)
>>>>>     ps.setOptionalString(7, rd.validatorUUID)
>>>>>   },
>>>>>   getJdbcExecutionOptions,
>>>>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>>>>> ))
>>>>>
>>>>>  Where i set the executionOptions to behave in a bachfull way.
>>>>
>>>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>>>>   JdbcExecutionOptions.builder()
>>>>     .withBatchIntervalMs(1000)
>>>>     .withBatchSize(1000)
>>>>     .withMaxRetries(10)
>>>>     .build
>>>> }
>>>>
>>>>
>>>> Any suggestions?
>>>>
>>>> [info]   org.apache.flink.api.common.InvalidProgramException: The
>>>>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>>>>> object probably contains or references non serializable fields.
>>>>> [info]   at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>>>>> [info]   at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>>>>> [info]   at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>>>>> [info]   at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>>>>> [info]   at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>>>>> [info]   at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>>>>> [info]   at
>>>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>>>>> [info]   at
>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>>>>> [info]   at
>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>>>>> [info]   at
>>>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>>>>> [info]   ...
>>>>> [info]   Cause: java.io.NotSerializableException: Non-serializable
>>>>> lambda
>>>>> [info]   at
>>>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
>>>>> Source)
>>>>> [info]   at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> [info]   at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> [info]   at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> [info]   at
>>>>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>>>>> [info]   at
>>>>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>>>>> [info]   at
>>>>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>>>>> [info]   at
>>>>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>>>>> [info]   at
>>>>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>>>>
>>>>
>>>>
>>>> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Clay,
>>>>>
>>>>> I am not a Table API expert but let me try to answer your question:
>>>>>
>>>>> With FLINK-17748 [1] the community removed the registerTableSink in
>>>>> favour of the connect API. The connect API has been deprecated [2] because
>>>>> it was not well maintained. Now the recommended way for specifying sinks 
>>>>> is
>>>>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>>>>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>>>>> on how to register your jdbc table sink.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>>>>>> postgres sink from 1.10 to 1.12
>>>>>>
>>>>>> I'm using:
>>>>>> * StreamTableEnvironment
>>>>>> * JdbcUpsertTableSink
>>>>>>
>>>>>> What I'm having difficulty with is how to register the sink with the
>>>>>> streaming table environment.
>>>>>>
>>>>>> In 1.10:
>>>>>>
>>>>>>     tableEnv.registerTableSink(
>>>>>>>       s"${interval}_maalka_jdbc_output_table",
>>>>>>>       jdbcTableSink)
>>>>>>
>>>>>>
>>>>>> This method doesn't exist in 1.12, what is the equivalent?
>>>>>>
>>>>>> Thanks!
>>>>>> Clay
>>>>>>
>>>>>>

Reply via email to