Thanks Stefan for the answers. The serialization is happening during the
creation of snapshot state. I have added a gist with a larger stacktrace(
https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
not using any serializer, in the custom sink.

We have

src.keyBy(m => (m.topic, m.partition))
    .map(message => updateMessage(message, config))
.addSink(new 
CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
.name(FLINK_JOB_ID)

So there should be a 1-1 source and sink mapping, i am assuming.

If possible could you could please give some more pointers to help
troubleshoot

Thanks,
Vipul


On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> the crash looks unrelated to Flink code from the dump’s trace. Since it
> happens somewhere in managing a jar file, it might be related to this:
> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
> jar gets overwritten while running, e.g. from your IDE?
>
> The serialization exception looks like the custom sink is using the same
> serializer in different threads concurrently. I don’t have the full custom
> code but this would be my guess. Ensure to duplicate serializers whenever
> different threads could work on them, e.g. processing vs checkpointing.
>
> Best,
> Stefan
>
>
>
>
> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>:
>
> Hello all,
>
> I am working on a custom sink implementation, but having weird issues with
> checkpointing.
>
> I am using a custom ListState to checkpoint, and it looks like this:
>
> private var checkpointMessages: ListState[Bucket] =_
>
>
> My snapshot function looks like:
>
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>       for((bucketName, bucket) <- bufferedMessages) {
>
>         // cloning to avoid any conncurrent modification issues
>         var new_buffer = new ListBuffer[GenericRecord]()
>
>         bucket.buffer.foreach(f=> new_buffer += f)
>
>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>
>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>         else checkpointMessages.add(new_bucket)
>       }}
>
> where class bucket is:
>
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var 
> timestamp: Long) extends Serializable{
>   def this(name: String) = {
>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
>
>
> BufferredMessages signature is
>
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>
>
> The basic idea behind this implementation is I maintain multiple buffers,
> and push messages(org.apache.avro.generic.GenericRecord) during the
> @invoke section of the sink, upon reaching certain thresholds I archive
> these on s3.
>
> I try to run this both locally in intellij and on a cluster:
>
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
> error out with the exception below:
>
>
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232,
> tid=0x0000000000003903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
> 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
> bsd-amd64 compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979', transport:
> 'socket'
>
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
>
> I managed to collect a core dump: https://gist.github.com/neoeahit/
> 38a02955c1de7501561fba2e593d5f6a.
>
> On a cluster I start to set concurrent serialization issues:
> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
>
> My initial guess is this is happening due to the size of the ListState?
> but i checked the number of records are around ~10k in the buffer. Due to
> the nature of the application, we have to implement this in a custom sink.
>
> Could someone please help me/ guide me to troubleshoot this further.
>
> --
> Thanking in advance,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Reply via email to