Hi, I think you might be right. So you could try to call the super.open(...) in your LogSessionProducer. Best, Guowei
On Thu, Aug 12, 2021 at 2:01 PM Daniel Vol <vold...@gmail.com> wrote: > Hi Guowei, > > I am running on EMR 5.32.0 with Flink 1.11.2 > > In meanwhile I did some tests and commented out part of the new code - > > override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { > try { > // val session = value.asInstanceOf[Session] > // sessionDuration = 17L //session.getSessionDuration > // sessionSize = 19 //session.getSessionTotalEvents > super.invoke(value, context) > sessionsWritten.inc() > } > > Though I still get Caused by: org.apache.flink.util.SerializedThrowable: > null > So, my assumption is that something wrong with "override def open()" method > > Thanks! > > On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, Daniel >> Could you tell me the version of Flink you use? I want to look at the >> corresponding code. >> Best, >> Guowei >> >> >> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote: >> >>> Hi Matthias, >>> >>> First, thanks for a fast reply. >>> I am new to Flink, so probably I miss a lot in terms of flow and objects >>> passed. >>> >>> The motivation is to get internal data from the transferred OUT Object >>> to send metrics. So I do downscale it but as per my perspective it is not >>> forwarded (super called with original value) or expected to be used in >>> later steps (this expected to be a local scope variable) >>> As I am suspect that you are right - can you point me to how can I get >>> internal data from OUT without changing it or affecting next steps. >>> As well - when I create the object - I specify OUT type (which is >>> Session): >>> >>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new >>> KinesisEventSerializer[Session], producerConfig) >>> >>> "… but of course I might be completely be mistaken due to incomplete >>> information." >>> What kind of information can I supply? >>> >>> Thanks a lot! >>> >>> Daniel >>> >>> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch> >>> wrote: >>> >>> >>> >>> Hi Daniel, >>> >>> >>> >>> On the first look there is one thing that catches my eye: >>> >>> In line ‘val session = value.asInstanceOf[Session]' it looks like you >>> are downcasting the event from OUT to Session. >>> >>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a >>> specific serializer[OUT] to transport events from one operator to the next >>> (or at least from one task to the next, if configured this way). >>> >>> These serializers usually only understand one type, OUT in your case. >>> Only in certain circumstances the java object (the event) is transported >>> directly from one operator to the next. >>> >>> >>> >>> I guess this is what happened, you serializer that only understands OUT >>> can not cope with a Session object … >>> >>> >>> >>> … but of course I might be completely be mistaken due to incomplete >>> information. >>> >>> >>> >>> I hope this helps 😊 >>> >>> >>> >>> Feel free to get back to me for clarifications (on the mailing list) >>> >>> >>> >>> Cheers >>> >>> >>> >>> Thias >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *From:* Daniel Vol <vold...@gmail.com> >>> *Sent:* Mittwoch, 11. August 2021 14:47 >>> *To:* user@flink.apache.org >>> *Subject:* Odd Serialization exception >>> >>> >>> >>> I started to get the following exception: >>> >>> >>> >>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), >>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >>> Unnamed (1/8)] INFO >>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not >>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), >>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >>> Unnamed (1/8). Failure reason: Checkpoint was declined. >>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), >>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: >>> Unnamed (1/8). Failure reason: Checkpoint was declined. >>> at >>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) >>> at >>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) >>> at >>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) >>> at >>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) >>> at >>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) >>> at >>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) >>> at org.apache.flink.streaming.runtime.io >>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) >>> at org.apache.flink.streaming.runtime.io >>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198) >>> at org.apache.flink.streaming.runtime.io >>> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) >>> at org.apache.flink.streaming.runtime.io >>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) >>> at org.apache.flink.streaming.runtime.io >>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.util.SerializedThrowable: null >>> at >>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411) >>> at >>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >>> at >>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) >>> ... 23 common frames omitted >>> >>> >>> >>> The only change from version that has no this issue is adding some >>> metrics to Producer (marked in red): >>> >>> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: >>> Properties) >>> >>> extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging { >>> >>> >>> >>> @transient private var sessionsProcessed: Counter = _ >>> >>> @transient private var sessionsWritten: Counter = _ >>> >>> @transient private var sessionWriteFailed: Counter = _ >>> >>> >>> >>> @transient private var sessionDuration: Long = 0 >>> >>> @transient private var sessionSize: Int = 0 >>> >>> >>> >>> override def open(parameters: Configuration): Unit = { >>> >>> val metrics = getRuntimeContext.getMetricGroup >>> >>> sessionsProcessed = metrics.counter("sessionsProcessed") >>> >>> sessionsWritten = metrics.counter("sessionsWritten") >>> >>> sessionWriteFailed = metrics.counter("sessionWriteFailed") >>> >>> metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", >>> ScalaGauge[Long](() => sessionDuration)) >>> >>> metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() >>> => sessionSize)) >>> >>> } >>> >>> >>> >>> @throws[Exception] >>> >>> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = >>> { >>> >>> try { >>> >>> val session = value.asInstanceOf[Session] >>> >>> sessionDuration = session.getSessionDuration >>> >>> sessionSize = session.getSessionTotalEvents >>> >>> super.invoke(value, context) >>> >>> sessionsWritten.inc() >>> >>> } >>> >>> catch { >>> >>> case e: IllegalArgumentException if e.getMessage.contains("Data must >>> be less than or equal to") => >>> >>> logger.error("failed session ended = " + value, e) >>> >>> sessionWriteFailed.inc() >>> >>> case _ : Throwbale => sessionWriteFailed.inc() >>> >>> } finally sessionsProcessed.inc() >>> >>> } >>> >>> } >>> >>> Anyone is familiar and can point out what may cause this exception and how >>> should I solve it? >>> >>> Thanks! >>> >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>> dieser Informationen ist streng verboten. >>> >>> This message is intended only for the named recipient and may contain >>> confidential or privileged information. As the confidentiality of email >>> communication cannot be guaranteed, we do not accept any responsibility for >>> the confidentiality and the intactness of this message. If you have >>> received it in error, please advise the sender by return e-mail and delete >>> this message and any attachments. Any unauthorised use or dissemination of >>> this information is strictly prohibited. >>> >>>