Good morning Daniel, … so my guess was not the cause of your problem 😊, anyway it seems like you always want to use your LogSessionProducer with Session? In that case you could drop the generics from the class like this:
class LogSessionProducer(schema: SerializationSchema[Session], props: Properties) extends FlinkKinesisProducer[Session](schema, props) with LazyLogging { ... override def invoke(value: Session, context: SinkFunction.Context[_]): Unit = { ... As to your assumption that the problems could be in your override def open() … … I don’t see you invoke the super.open(…) function which would leave the producer only half initialized Thias From: Daniel Vol <vold...@gmail.com> Sent: Donnerstag, 12. August 2021 08:01 To: Guowei Ma <guowei....@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Odd Serialization exception Hi Guowei, I am running on EMR 5.32.0 with Flink 1.11.2 In meanwhile I did some tests and commented out part of the new code - override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { try { // val session = value.asInstanceOf[Session] // sessionDuration = 17L //session.getSessionDuration // sessionSize = 19 //session.getSessionTotalEvents super.invoke(value, context) sessionsWritten.inc() } Though I still get Caused by: org.apache.flink.util.SerializedThrowable: null So, my assumption is that something wrong with "override def open()" method Thanks! On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi, Daniel Could you tell me the version of Flink you use? I want to look at the corresponding code. Best, Guowei On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com<mailto:vold...@gmail.com>> wrote: Hi Matthias, First, thanks for a fast reply. I am new to Flink, so probably I miss a lot in terms of flow and objects passed. The motivation is to get internal data from the transferred OUT Object to send metrics. So I do downscale it but as per my perspective it is not forwarded (super called with original value) or expected to be used in later steps (this expected to be a local scope variable) As I am suspect that you are right - can you point me to how can I get internal data from OUT without changing it or affecting next steps. As well - when I create the object - I specify OUT type (which is Session): val flinkKinesisProducer = new LogSessionProducer[Session](new KinesisEventSerializer[Session], producerConfig) "… but of course I might be completely be mistaken due to incomplete information." What kind of information can I supply? Thanks a lot! Daniel On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote: Hi Daniel, On the first look there is one thing that catches my eye: In line ‘val session = value.asInstanceOf[Session]' it looks like you are downcasting the event from OUT to Session. In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific serializer[OUT] to transport events from one operator to the next (or at least from one task to the next, if configured this way). These serializers usually only understand one type, OUT in your case. Only in certain circumstances the java object (the event) is transported directly from one operator to the next. I guess this is what happened, you serializer that only understands OUT can not cope with a Session object … … but of course I might be completely be mistaken due to incomplete information. I hope this helps 😊 Feel free to get back to me for clarifications (on the mailing list) Cheers Thias From: Daniel Vol <vold...@gmail.com<mailto:vold...@gmail.com>> Sent: Mittwoch, 11. August 2021 14:47 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Odd Serialization exception I started to get the following exception: 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8)] INFO o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: null at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) ... 23 common frames omitted The only change from version that has no this issue is adding some metrics to Producer (marked in red): class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties) extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging { @transient private var sessionsProcessed: Counter = _ @transient private var sessionsWritten: Counter = _ @transient private var sessionWriteFailed: Counter = _ @transient private var sessionDuration: Long = 0 @transient private var sessionSize: Int = 0 override def open(parameters: Configuration): Unit = { val metrics = getRuntimeContext.getMetricGroup sessionsProcessed = metrics.counter("sessionsProcessed") sessionsWritten = metrics.counter("sessionsWritten") sessionWriteFailed = metrics.counter("sessionWriteFailed") metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration)) metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize)) } @throws[Exception] override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { try { val session = value.asInstanceOf[Session] sessionDuration = session.getSessionDuration sessionSize = session.getSessionTotalEvents super.invoke(value, context) sessionsWritten.inc() } catch { case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") => logger.error("failed session ended = " + value, e) sessionWriteFailed.inc() case _ : Throwbale => sessionWriteFailed.inc() } finally sessionsProcessed.inc() } } Anyone is familiar and can point out what may cause this exception and how should I solve it? Thanks! Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.