Thanks Stefan for the answers. The serialization is happening during the creation of snapshot state. I have added a gist with a larger stacktrace( https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am not using any serializer, in the custom sink.
We have src.keyBy(m => (m.topic, m.partition)) .map(message => updateMessage(message, config)) .addSink(new CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism) .name(FLINK_JOB_ID) So there should be a 1-1 source and sink mapping, i am assuming. If possible could you could please give some more pointers to help troubleshoot Thanks, Vipul On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > the crash looks unrelated to Flink code from the dump’s trace. Since it > happens somewhere in managing a jar file, it might be related to this: > https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your > jar gets overwritten while running, e.g. from your IDE? > > The serialization exception looks like the custom sink is using the same > serializer in different threads concurrently. I don’t have the full custom > code but this would be my guess. Ensure to duplicate serializers whenever > different threads could work on them, e.g. processing vs checkpointing. > > Best, > Stefan > > > > > Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>: > > Hello all, > > I am working on a custom sink implementation, but having weird issues with > checkpointing. > > I am using a custom ListState to checkpoint, and it looks like this: > > private var checkpointMessages: ListState[Bucket] =_ > > > My snapshot function looks like: > > @throws[IOException] > def snapshotState(context: FunctionSnapshotContext): Unit = { > checkpointMessages.clear() > for((bucketName, bucket) <- bufferedMessages) { > > // cloning to avoid any conncurrent modification issues > var new_buffer = new ListBuffer[GenericRecord]() > > bucket.buffer.foreach(f=> new_buffer += f) > > val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp) > > if(shouldUpload(bucketName)) uploadFile (bucketName) > else checkpointMessages.add(new_bucket) > }} > > where class bucket is: > > @SerialVersionUID(1L) > class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var > timestamp: Long) extends Serializable{ > def this(name: String) = { > this(name, ListBuffer[GenericRecord](), new Date().getTime) > } > } > > > BufferredMessages signature is > > private val bufferedMessages = collection.mutable.Map[String, Bucket]() > > > The basic idea behind this implementation is I maintain multiple buffers, > and push messages(org.apache.avro.generic.GenericRecord) during the > @invoke section of the sink, upon reaching certain thresholds I archive > these on s3. > > I try to run this both locally in intellij and on a cluster: > > On Intellij the process runs for a bit( checkpoints 3-4 times) and then > error out with the exception below: > > > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, > tid=0x0000000000003903 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build > 1.8.0_131-b11) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode > bsd-amd64 compressed oops) > # Problematic frame: > # V [libjvm.dylib+0x46440c] > # > # Core dump written. Default location: /cores/core or core.25232 > # > # An error report file with more information is saved as: > # hs_err_pid25232.log > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > Disconnected from the target VM, address: '127.0.0.1:60979', transport: > 'socket' > > Process finished with exit code 134 (interrupted by signal 6: SIGABRT) > > I managed to collect a core dump: https://gist.github.com/neoeahit/ > 38a02955c1de7501561fba2e593d5f6a. > > On a cluster I start to set concurrent serialization issues: > https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 > > My initial guess is this is happening due to the size of the ListState? > but i checked the number of records are around ~10k in the buffer. Due to > the nature of the application, we have to implement this in a custom sink. > > Could someone please help me/ guide me to troubleshoot this further. > > -- > Thanking in advance, > Vipul > > > -- Thanks, Vipul