Hi, the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 <https://bugs.openjdk.java.net/browse/JDK-8142508> , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE?
The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing. Best, Stefan > Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>: > > Hello all, > > I am working on a custom sink implementation, but having weird issues with > checkpointing. > > I am using a custom ListState to checkpoint, and it looks like this: > private var checkpointMessages: ListState[Bucket] =_ > > My snapshot function looks like: > > @throws[IOException] > def snapshotState(context: FunctionSnapshotContext): Unit = { > checkpointMessages.clear() > for((bucketName, bucket) <- bufferedMessages) { > > // cloning to avoid any conncurrent modification issues > var new_buffer = new ListBuffer[GenericRecord]() > > bucket.buffer.foreach(f=> new_buffer += f) > > val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp) > > if(shouldUpload(bucketName)) uploadFile (bucketName) > else checkpointMessages.add(new_bucket) > }} > where class bucket is: > @SerialVersionUID(1L) > class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var > timestamp: Long) extends Serializable{ > def this(name: String) = { > this(name, ListBuffer[GenericRecord](), new Date().getTime) > } > } > > BufferredMessages signature is > private val bufferedMessages = collection.mutable.Map[String, Bucket]() > > The basic idea behind this implementation is I maintain multiple buffers, and > push messages(org.apache.avro.generic.GenericRecord) during the @invoke > section of the sink, upon reaching certain thresholds I archive these on s3. > > I try to run this both locally in intellij and on a cluster: > > On Intellij the process runs for a bit( checkpoints 3-4 times) and then error > out with the exception below: > > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build > 1.8.0_131-b11) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 > compressed oops) > # Problematic frame: > # V [libjvm.dylib+0x46440c] > # > # Core dump written. Default location: /cores/core or core.25232 > # > # An error report file with more information is saved as: > # hs_err_pid25232.log > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > <http://bugreport.java.com/bugreport/crash.jsp> > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > Disconnected from the target VM, address: '127.0.0.1:60979 > <http://127.0.0.1:60979/>', transport: 'socket' > > Process finished with exit code 134 (interrupted by signal 6: SIGABRT) > > I managed to collect a core dump: > https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a > <https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a>. > > On a cluster I start to set concurrent serialization issues: > https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 > <https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47> > > My initial guess is this is happening due to the size of the ListState? but i > checked the number of records are around ~10k in the buffer. Due to the > nature of the application, we have to implement this in a custom sink. > > Could someone please help me/ guide me to troubleshoot this further. > > -- > Thanking in advance, > Vipul