Hi, Daniel
Could you tell me the version of Flink you use? I want to look at the
corresponding code.
Best,
Guowei


On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote:

> Hi Matthias,
>
> First, thanks for a fast reply.
> I am new to Flink, so probably I miss a lot in terms of flow and objects
> passed.
>
> The motivation is to get internal data from the transferred OUT Object to
> send metrics. So I do downscale it but as per my perspective it is not
> forwarded (super called with original value) or expected to be used in
> later steps (this expected to be a local scope variable)
> As I am suspect that you are right - can you point me to how can I get
> internal data from OUT without changing it or affecting next steps.
> As well - when I create the object - I specify OUT type (which is Session):
>
> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
> KinesisEventSerializer[Session], producerConfig)
>
> "… but of course I might be completely be mistaken due to incomplete
> information."
> What kind of information can I supply?
>
> Thanks a lot!
>
> Daniel
>
> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch>
> wrote:
>
> 
>
> Hi Daniel,
>
>
>
> On the first look there is one thing that catches my eye:
>
> In line ‘val session = value.asInstanceOf[Session]' it looks like you are
> downcasting the event from OUT to Session.
>
> In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
> serializer[OUT] to transport events from one operator to the next (or at
> least from one task to the next, if configured this way).
>
> These serializers usually only understand one type, OUT in your case. Only
> in certain circumstances the java object (the event) is transported
> directly from one operator to the next.
>
>
>
> I guess this is what happened, you serializer that only understands OUT
> can not cope with a Session object …
>
>
>
> … but of course I might be completely be mistaken due to incomplete
> information.
>
>
>
> I hope this helps 😊
>
>
>
> Feel free to get back to me for clarifications (on the mailing list)
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vold...@gmail.com>
> *Sent:* Mittwoch, 11. August 2021 14:47
> *To:* user@flink.apache.org
> *Subject:* Odd Serialization exception
>
>
>
> I started to get the following exception:
>
>
>
> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8)] INFO
> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: null
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>     ... 23 common frames omitted
>
>
>
> The only change from version that has no this issue is adding some metrics
> to Producer (marked in red):
>
> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
> Properties)
>
>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>
>
>
>   @transient private var sessionsProcessed: Counter = _
>
>   @transient private var sessionsWritten: Counter = _
>
>   @transient private var sessionWriteFailed: Counter = _
>
>
>
>   @transient private var sessionDuration: Long = 0
>
>   @transient private var sessionSize: Int = 0
>
>
>
>     override def open(parameters: Configuration): Unit = {
>
>     val metrics = getRuntimeContext.getMetricGroup
>
>     sessionsProcessed = metrics.counter("sessionsProcessed")
>
>     sessionsWritten = metrics.counter("sessionsWritten")
>
>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>
>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
> ScalaGauge[Long](() => sessionDuration))
>
>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => 
> sessionSize))
>
>   }
>
>
>
>   @throws[Exception]
>
>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>
>     try {
>
>       val session = value.asInstanceOf[Session]
>
>       sessionDuration = session.getSessionDuration
>
>       sessionSize = session.getSessionTotalEvents
>
>       super.invoke(value, context)
>
>       sessionsWritten.inc()
>
>     }
>
>     catch {
>
>       case e: IllegalArgumentException if e.getMessage.contains("Data must be 
> less than or equal to") =>
>
>         logger.error("failed session ended = " + value, e)
>
>         sessionWriteFailed.inc()
>
>       case _ : Throwbale => sessionWriteFailed.inc()
>
>     } finally sessionsProcessed.inc()
>
>   }
>
> }
>
> Anyone is familiar and can point out what may cause this exception and how 
> should I solve it?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
>

Reply via email to