Hi Guowei,

I am running on EMR 5.32.0 with Flink 1.11.2

In meanwhile I did some tests and commented out part of the new code -

override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
//      val session = value.asInstanceOf[Session]
//      sessionDuration = 17L //session.getSessionDuration
//      sessionSize = 19 //session.getSessionTotalEvents
      super.invoke(value, context)
      sessionsWritten.inc()
    }

Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
null
So, my assumption is that something wrong with "override def open()" method

Thanks!

On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Daniel
> Could you tell me the version of Flink you use? I want to look at the
> corresponding code.
> Best,
> Guowei
>
>
> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> First, thanks for a fast reply.
>> I am new to Flink, so probably I miss a lot in terms of flow and objects
>> passed.
>>
>> The motivation is to get internal data from the transferred OUT Object to
>> send metrics. So I do downscale it but as per my perspective it is not
>> forwarded (super called with original value) or expected to be used in
>> later steps (this expected to be a local scope variable)
>> As I am suspect that you are right - can you point me to how can I get
>> internal data from OUT without changing it or affecting next steps.
>> As well - when I create the object - I specify OUT type (which is
>> Session):
>>
>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
>> KinesisEventSerializer[Session], producerConfig)
>>
>> "… but of course I might be completely be mistaken due to incomplete
>> information."
>> What kind of information can I supply?
>>
>> Thanks a lot!
>>
>> Daniel
>>
>> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch>
>> wrote:
>>
>> 
>>
>> Hi Daniel,
>>
>>
>>
>> On the first look there is one thing that catches my eye:
>>
>> In line ‘val session = value.asInstanceOf[Session]' it looks like you
>> are downcasting the event from OUT to Session.
>>
>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a
>> specific serializer[OUT] to transport events from one operator to the next
>> (or at least from one task to the next, if configured this way).
>>
>> These serializers usually only understand one type, OUT in your case.
>> Only in certain circumstances the java object (the event) is transported
>> directly from one operator to the next.
>>
>>
>>
>> I guess this is what happened, you serializer that only understands OUT
>> can not cope with a Session object …
>>
>>
>>
>> … but of course I might be completely be mistaken due to incomplete
>> information.
>>
>>
>>
>> I hope this helps 😊
>>
>>
>>
>> Feel free to get back to me for clarifications (on the mailing list)
>>
>>
>>
>> Cheers
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Daniel Vol <vold...@gmail.com>
>> *Sent:* Mittwoch, 11. August 2021 14:47
>> *To:* user@flink.apache.org
>> *Subject:* Odd Serialization exception
>>
>>
>>
>> I started to get the following exception:
>>
>>
>>
>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8)] INFO
>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>     at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>>     at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>>     at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>>     at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>>     ... 23 common frames omitted
>>
>>
>>
>> The only change from version that has no this issue is adding some
>> metrics to Producer (marked in red):
>>
>> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
>> Properties)
>>
>>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>>
>>
>>
>>   @transient private var sessionsProcessed: Counter = _
>>
>>   @transient private var sessionsWritten: Counter = _
>>
>>   @transient private var sessionWriteFailed: Counter = _
>>
>>
>>
>>   @transient private var sessionDuration: Long = 0
>>
>>   @transient private var sessionSize: Int = 0
>>
>>
>>
>>     override def open(parameters: Configuration): Unit = {
>>
>>     val metrics = getRuntimeContext.getMetricGroup
>>
>>     sessionsProcessed = metrics.counter("sessionsProcessed")
>>
>>     sessionsWritten = metrics.counter("sessionsWritten")
>>
>>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>>
>>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
>> ScalaGauge[Long](() => sessionDuration))
>>
>>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => 
>> sessionSize))
>>
>>   }
>>
>>
>>
>>   @throws[Exception]
>>
>>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>>
>>     try {
>>
>>       val session = value.asInstanceOf[Session]
>>
>>       sessionDuration = session.getSessionDuration
>>
>>       sessionSize = session.getSessionTotalEvents
>>
>>       super.invoke(value, context)
>>
>>       sessionsWritten.inc()
>>
>>     }
>>
>>     catch {
>>
>>       case e: IllegalArgumentException if e.getMessage.contains("Data must 
>> be less than or equal to") =>
>>
>>         logger.error("failed session ended = " + value, e)
>>
>>         sessionWriteFailed.inc()
>>
>>       case _ : Throwbale => sessionWriteFailed.inc()
>>
>>     } finally sessionsProcessed.inc()
>>
>>   }
>>
>> }
>>
>> Anyone is familiar and can point out what may cause this exception and how 
>> should I solve it?
>>
>> Thanks!
>>
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>>

Reply via email to