Thank you both! Looks much better and hopefully works!
Daniel. On Thu, Aug 12, 2021 at 10:35 AM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Good morning Daniel, > > > > … so my guess was not the cause of your problem 😊, anyway it seems like > you always want to use your LogSessionProducer with Session? > > In that case you could drop the generics from the class like this: > > > > class LogSessionProducer(schema: SerializationSchema[Session], props: > Properties) > > extends FlinkKinesisProducer[Session](schema, props) with LazyLogging { > > ... > > override def invoke(value: Session, context: SinkFunction.Context[_]): > Unit = { > > ... > > > > As to your assumption that the problems could be in your override def > open() … > > … I don’t see you invoke the super.open(…) function which would leave the > producer only half initialized > > > > Thias > > > > > > > > > > *From:* Daniel Vol <vold...@gmail.com> > *Sent:* Donnerstag, 12. August 2021 08:01 > *To:* Guowei Ma <guowei....@gmail.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Odd Serialization exception > > > > Hi Guowei, > > > > I am running on EMR 5.32.0 with Flink 1.11.2 > > > > In meanwhile I did some tests and commented out part of the new code - > > override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { > try { > > > > *// val session = value.asInstanceOf[Session]// sessionDuration = > 17L //session.getSessionDuration// sessionSize = 19 > //session.getSessionTotalEvents *super.invoke(value, context) > *sessionsWritten*.inc() > } > > Though I still get Caused by: org.apache.flink.util.SerializedThrowable: > null > > So, my assumption is that something wrong with "override def open()" method > > > > Thanks! > > > > On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote: > > Hi, Daniel > > Could you tell me the version of Flink you use? I want to look at the > corresponding code. > > Best, > > Guowei > > > > > > On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote: > > Hi Matthias, > > > > First, thanks for a fast reply. > I am new to Flink, so probably I miss a lot in terms of flow and objects > passed. > > > > The motivation is to get internal data from the transferred OUT Object to > send metrics. So I do downscale it but as per my perspective it is not > forwarded (super called with original value) or expected to be used in > later steps (this expected to be a local scope variable) > > As I am suspect that you are right - can you point me to how can I get > internal data from OUT without changing it or affecting next steps. > > As well - when I create the object - I specify OUT type (which is Session): > > val flinkKinesisProducer = new LogSessionProducer[*Session*](new > KinesisEventSerializer[Session], producerConfig) > > "… but of course I might be completely be mistaken due to incomplete > information." > > What kind of information can I supply? > > > > Thanks a lot! > > > > Daniel > > > > On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch> > wrote: > > > > Hi Daniel, > > > > On the first look there is one thing that catches my eye: > > In line ‘val session = value.asInstanceOf[Session]' it looks like you are > downcasting the event from OUT to Session. > > In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific > serializer[OUT] to transport events from one operator to the next (or at > least from one task to the next, if configured this way). > > These serializers usually only understand one type, OUT in your case. Only > in certain circumstances the java object (the event) is transported > directly from one operator to the next. > > > > I guess this is what happened, you serializer that only understands OUT > can not cope with a Session object … > > > > … but of course I might be completely be mistaken due to incomplete > information. > > > > I hope this helps 😊 > > > > Feel free to get back to me for clarifications (on the mailing list) > > > > Cheers > > > > Thias > > > > > > > > > > *From:* Daniel Vol <vold...@gmail.com> > *Sent:* Mittwoch, 11. August 2021 14:47 > *To:* user@flink.apache.org > *Subject:* Odd Serialization exception > > > > I started to get the following exception: > > > > 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), > EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: > Unnamed (1/8)] INFO > o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not > complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), > EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: > Unnamed (1/8). Failure reason: Checkpoint was declined. > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), > EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: > Unnamed (1/8). Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) > at org.apache.flink.streaming.runtime.io > .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) > at org.apache.flink.streaming.runtime.io > .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198) > at org.apache.flink.streaming.runtime.io > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.SerializedThrowable: null > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) > ... 23 common frames omitted > > > > The only change from version that has no this issue is adding some metrics > to Producer (marked in red): > > class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: > Properties) > > extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging { > > > > @transient private var sessionsProcessed: Counter = _ > > @transient private var sessionsWritten: Counter = _ > > @transient private var sessionWriteFailed: Counter = _ > > > > @transient private var sessionDuration: Long = 0 > > @transient private var sessionSize: Int = 0 > > > > override def open(parameters: Configuration): Unit = { > > val metrics = getRuntimeContext.getMetricGroup > > sessionsProcessed = metrics.counter("sessionsProcessed") > > sessionsWritten = metrics.counter("sessionsWritten") > > sessionWriteFailed = metrics.counter("sessionWriteFailed") > > metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", > ScalaGauge[Long](() => sessionDuration)) > > metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => > sessionSize)) > > } > > > > @throws[Exception] > > override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { > > try { > > val session = value.asInstanceOf[Session] > > sessionDuration = session.getSessionDuration > > sessionSize = session.getSessionTotalEvents > > super.invoke(value, context) > > sessionsWritten.inc() > > } > > catch { > > case e: IllegalArgumentException if e.getMessage.contains("Data must be > less than or equal to") => > > logger.error("failed session ended = " + value, e) > > sessionWriteFailed.inc() > > case _ : Throwbale => sessionWriteFailed.inc() > > } finally sessionsProcessed.inc() > > } > > } > > Anyone is familiar and can point out what may cause this exception and how > should I solve it? > > Thanks! > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >