Hi Guowei, I am running on EMR 5.32.0 with Flink 1.11.2
In meanwhile I did some tests and commented out part of the new code - override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { try { // val session = value.asInstanceOf[Session] // sessionDuration = 17L //session.getSessionDuration // sessionSize = 19 //session.getSessionTotalEvents super.invoke(value, context) sessionsWritten.inc() } Though I still get Caused by: org.apache.flink.util.SerializedThrowable: null So, my assumption is that something wrong with "override def open()" method Thanks! On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Daniel > Could you tell me the version of Flink you use? I want to look at the > corresponding code. > Best, > Guowei > > > On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote: > >> Hi Matthias, >> >> First, thanks for a fast reply. >> I am new to Flink, so probably I miss a lot in terms of flow and objects >> passed. >> >> The motivation is to get internal data from the transferred OUT Object to >> send metrics. So I do downscale it but as per my perspective it is not >> forwarded (super called with original value) or expected to be used in >> later steps (this expected to be a local scope variable) >> As I am suspect that you are right - can you point me to how can I get >> internal data from OUT without changing it or affecting next steps. >> As well - when I create the object - I specify OUT type (which is >> Session): >> >> val flinkKinesisProducer = new LogSessionProducer[*Session*](new >> KinesisEventSerializer[Session], producerConfig) >> >> "… but of course I might be completely be mistaken due to incomplete >> information." >> What kind of information can I supply? >> >> Thanks a lot! >> >> Daniel >> >> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch> >> wrote: >> >> >> >> Hi Daniel, >> >> >> >> On the first look there is one thing that catches my eye: >> >> In line ‘val session = value.asInstanceOf[Session]' it looks like you >> are downcasting the event from OUT to Session. >> >> In Flink this is a dangerous thing to do … DataStream[OUT] uses a >> specific serializer[OUT] to transport events from one operator to the next >> (or at least from one task to the next, if configured this way). >> >> These serializers usually only understand one type, OUT in your case. >> Only in certain circumstances the java object (the event) is transported >> directly from one operator to the next. >> >> >> >> I guess this is what happened, you serializer that only understands OUT >> can not cope with a Session object … >> >> >> >> … but of course I might be completely be mistaken due to incomplete >> information. >> >> >> >> I hope this helps 😊 >> >> >> >> Feel free to get back to me for clarifications (on the mailing list) >> >> >> >> Cheers >> >> >> >> Thias >> >> >> >> >> >> >> >> >> >> *From:* Daniel Vol <vold...@gmail.com> >> *Sent:* Mittwoch, 11. August 2021 14:47 >> *To:* user@flink.apache.org >> *Subject:* Odd Serialization exception >> >> >> >> I started to get the following exception: >> >> >> >> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), >> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >> Unnamed (1/8)] INFO >> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not >> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), >> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >> Unnamed (1/8). Failure reason: Checkpoint was declined. >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), >> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >> Unnamed (1/8). Failure reason: Checkpoint was declined. >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) >> at org.apache.flink.streaming.runtime.io >> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) >> at org.apache.flink.streaming.runtime.io >> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198) >> at org.apache.flink.streaming.runtime.io >> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) >> at org.apache.flink.streaming.runtime.io >> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.util.SerializedThrowable: null >> at >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411) >> at >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) >> ... 23 common frames omitted >> >> >> >> The only change from version that has no this issue is adding some >> metrics to Producer (marked in red): >> >> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: >> Properties) >> >> extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging { >> >> >> >> @transient private var sessionsProcessed: Counter = _ >> >> @transient private var sessionsWritten: Counter = _ >> >> @transient private var sessionWriteFailed: Counter = _ >> >> >> >> @transient private var sessionDuration: Long = 0 >> >> @transient private var sessionSize: Int = 0 >> >> >> >> override def open(parameters: Configuration): Unit = { >> >> val metrics = getRuntimeContext.getMetricGroup >> >> sessionsProcessed = metrics.counter("sessionsProcessed") >> >> sessionsWritten = metrics.counter("sessionsWritten") >> >> sessionWriteFailed = metrics.counter("sessionWriteFailed") >> >> metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", >> ScalaGauge[Long](() => sessionDuration)) >> >> metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => >> sessionSize)) >> >> } >> >> >> >> @throws[Exception] >> >> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { >> >> try { >> >> val session = value.asInstanceOf[Session] >> >> sessionDuration = session.getSessionDuration >> >> sessionSize = session.getSessionTotalEvents >> >> super.invoke(value, context) >> >> sessionsWritten.inc() >> >> } >> >> catch { >> >> case e: IllegalArgumentException if e.getMessage.contains("Data must >> be less than or equal to") => >> >> logger.error("failed session ended = " + value, e) >> >> sessionWriteFailed.inc() >> >> case _ : Throwbale => sessionWriteFailed.inc() >> >> } finally sessionsProcessed.inc() >> >> } >> >> } >> >> Anyone is familiar and can point out what may cause this exception and how >> should I solve it? >> >> Thanks! >> >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >>