Hi,
I think you might be right. So you could try to call the super.open(...) in
your LogSessionProducer.
Best,
Guowei


On Thu, Aug 12, 2021 at 2:01 PM Daniel Vol <vold...@gmail.com> wrote:

> Hi Guowei,
>
> I am running on EMR 5.32.0 with Flink 1.11.2
>
> In meanwhile I did some tests and commented out part of the new code -
>
> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>     try {
> //      val session = value.asInstanceOf[Session]
> //      sessionDuration = 17L //session.getSessionDuration
> //      sessionSize = 19 //session.getSessionTotalEvents
>       super.invoke(value, context)
>       sessionsWritten.inc()
>     }
>
> Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
> null
> So, my assumption is that something wrong with "override def open()" method
>
> Thanks!
>
> On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, Daniel
>> Could you tell me the version of Flink you use? I want to look at the
>> corresponding code.
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote:
>>
>>> Hi Matthias,
>>>
>>> First, thanks for a fast reply.
>>> I am new to Flink, so probably I miss a lot in terms of flow and objects
>>> passed.
>>>
>>> The motivation is to get internal data from the transferred OUT Object
>>> to send metrics. So I do downscale it but as per my perspective it is not
>>> forwarded (super called with original value) or expected to be used in
>>> later steps (this expected to be a local scope variable)
>>> As I am suspect that you are right - can you point me to how can I get
>>> internal data from OUT without changing it or affecting next steps.
>>> As well - when I create the object - I specify OUT type (which is
>>> Session):
>>>
>>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
>>> KinesisEventSerializer[Session], producerConfig)
>>>
>>> "… but of course I might be completely be mistaken due to incomplete
>>> information."
>>> What kind of information can I supply?
>>>
>>> Thanks a lot!
>>>
>>> Daniel
>>>
>>> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch>
>>> wrote:
>>>
>>> 
>>>
>>> Hi Daniel,
>>>
>>>
>>>
>>> On the first look there is one thing that catches my eye:
>>>
>>> In line ‘val session = value.asInstanceOf[Session]' it looks like you
>>> are downcasting the event from OUT to Session.
>>>
>>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a
>>> specific serializer[OUT] to transport events from one operator to the next
>>> (or at least from one task to the next, if configured this way).
>>>
>>> These serializers usually only understand one type, OUT in your case.
>>> Only in certain circumstances the java object (the event) is transported
>>> directly from one operator to the next.
>>>
>>>
>>>
>>> I guess this is what happened, you serializer that only understands OUT
>>> can not cope with a Session object …
>>>
>>>
>>>
>>> … but of course I might be completely be mistaken due to incomplete
>>> information.
>>>
>>>
>>>
>>> I hope this helps 😊
>>>
>>>
>>>
>>> Feel free to get back to me for clarifications (on the mailing list)
>>>
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Daniel Vol <vold...@gmail.com>
>>> *Sent:* Mittwoch, 11. August 2021 14:47
>>> *To:* user@flink.apache.org
>>> *Subject:* Odd Serialization exception
>>>
>>>
>>>
>>> I started to get the following exception:
>>>
>>>
>>>
>>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8)] INFO
>>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
>>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>>>     at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>>>     at org.apache.flink.streaming.runtime.io
>>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>>>     at org.apache.flink.streaming.runtime.io
>>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>>     at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>>>     at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>>>     at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>>>     at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>>>     at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>>>     ... 23 common frames omitted
>>>
>>>
>>>
>>> The only change from version that has no this issue is adding some
>>> metrics to Producer (marked in red):
>>>
>>> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
>>> Properties)
>>>
>>>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>>>
>>>
>>>
>>>   @transient private var sessionsProcessed: Counter = _
>>>
>>>   @transient private var sessionsWritten: Counter = _
>>>
>>>   @transient private var sessionWriteFailed: Counter = _
>>>
>>>
>>>
>>>   @transient private var sessionDuration: Long = 0
>>>
>>>   @transient private var sessionSize: Int = 0
>>>
>>>
>>>
>>>     override def open(parameters: Configuration): Unit = {
>>>
>>>     val metrics = getRuntimeContext.getMetricGroup
>>>
>>>     sessionsProcessed = metrics.counter("sessionsProcessed")
>>>
>>>     sessionsWritten = metrics.counter("sessionsWritten")
>>>
>>>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>>>
>>>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
>>> ScalaGauge[Long](() => sessionDuration))
>>>
>>>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() 
>>> => sessionSize))
>>>
>>>   }
>>>
>>>
>>>
>>>   @throws[Exception]
>>>
>>>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = 
>>> {
>>>
>>>     try {
>>>
>>>       val session = value.asInstanceOf[Session]
>>>
>>>       sessionDuration = session.getSessionDuration
>>>
>>>       sessionSize = session.getSessionTotalEvents
>>>
>>>       super.invoke(value, context)
>>>
>>>       sessionsWritten.inc()
>>>
>>>     }
>>>
>>>     catch {
>>>
>>>       case e: IllegalArgumentException if e.getMessage.contains("Data must 
>>> be less than or equal to") =>
>>>
>>>         logger.error("failed session ended = " + value, e)
>>>
>>>         sessionWriteFailed.inc()
>>>
>>>       case _ : Throwbale => sessionWriteFailed.inc()
>>>
>>>     } finally sessionsProcessed.inc()
>>>
>>>   }
>>>
>>> }
>>>
>>> Anyone is familiar and can point out what may cause this exception and how 
>>> should I solve it?
>>>
>>> Thanks!
>>>
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>>

Reply via email to