Thanks Stefan. I found the issue in my application. Everything is working as excepted now. Once again thanks for the help and advice.
On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <neoea...@gmail.com> wrote: > Thanks Stefan for the answers. The serialization is happening during the > creation of snapshot state. I have added a gist with a larger stacktrace( > https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am > not using any serializer, in the custom sink. > > We have > > src.keyBy(m => (m.topic, m.partition)) > .map(message => updateMessage(message, config)) > .addSink(new > CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism) > .name(FLINK_JOB_ID) > > So there should be a 1-1 source and sink mapping, i am assuming. > > If possible could you could please give some more pointers to help > troubleshoot > > Thanks, > Vipul > > > On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> the crash looks unrelated to Flink code from the dump’s trace. Since it >> happens somewhere in managing a jar file, it might be related to this: >> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your >> jar gets overwritten while running, e.g. from your IDE? >> >> The serialization exception looks like the custom sink is using the same >> serializer in different threads concurrently. I don’t have the full custom >> code but this would be my guess. Ensure to duplicate serializers whenever >> different threads could work on them, e.g. processing vs checkpointing. >> >> Best, >> Stefan >> >> >> >> >> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>: >> >> Hello all, >> >> I am working on a custom sink implementation, but having weird issues >> with checkpointing. >> >> I am using a custom ListState to checkpoint, and it looks like this: >> >> private var checkpointMessages: ListState[Bucket] =_ >> >> >> My snapshot function looks like: >> >> @throws[IOException] >> def snapshotState(context: FunctionSnapshotContext): Unit = { >> checkpointMessages.clear() >> for((bucketName, bucket) <- bufferedMessages) { >> >> // cloning to avoid any conncurrent modification issues >> var new_buffer = new ListBuffer[GenericRecord]() >> >> bucket.buffer.foreach(f=> new_buffer += f) >> >> val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp) >> >> if(shouldUpload(bucketName)) uploadFile (bucketName) >> else checkpointMessages.add(new_bucket) >> }} >> >> where class bucket is: >> >> @SerialVersionUID(1L) >> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var >> timestamp: Long) extends Serializable{ >> def this(name: String) = { >> this(name, ListBuffer[GenericRecord](), new Date().getTime) >> } >> } >> >> >> BufferredMessages signature is >> >> private val bufferedMessages = collection.mutable.Map[String, Bucket]() >> >> >> The basic idea behind this implementation is I maintain multiple buffers, >> and push messages(org.apache.avro.generic.GenericRecord) during the >> @invoke section of the sink, upon reaching certain thresholds I archive >> these on s3. >> >> I try to run this both locally in intellij and on a cluster: >> >> On Intellij the process runs for a bit( checkpoints 3-4 times) and then >> error out with the exception below: >> >> >> # A fatal error has been detected by the Java Runtime Environment: >> # >> # SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, >> tid=0x0000000000003903 >> # >> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build >> 1.8.0_131-b11) >> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode >> bsd-amd64 compressed oops) >> # Problematic frame: >> # V [libjvm.dylib+0x46440c] >> # >> # Core dump written. Default location: /cores/core or core.25232 >> # >> # An error report file with more information is saved as: >> # hs_err_pid25232.log >> # >> # If you would like to submit a bug report, please visit: >> # http://bugreport.java.com/bugreport/crash.jsp >> # The crash happened outside the Java Virtual Machine in native code. >> # See problematic frame for where to report the bug. >> # >> Disconnected from the target VM, address: '127.0.0.1:60979', transport: >> 'socket' >> >> Process finished with exit code 134 (interrupted by signal 6: SIGABRT) >> >> I managed to collect a core dump: https://gist.github.com/ >> neoeahit/38a02955c1de7501561fba2e593d5f6a. >> >> On a cluster I start to set concurrent serialization issues: >> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 >> >> My initial guess is this is happening due to the size of the ListState? >> but i checked the number of records are around ~10k in the buffer. Due to >> the nature of the application, we have to implement this in a custom sink. >> >> Could someone please help me/ guide me to troubleshoot this further. >> >> -- >> Thanking in advance, >> Vipul >> >> >> > > > -- > Thanks, > Vipul > -- Thanks, Vipul