Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens 
somewhere in managing a jar file, it might be related to this: 
https://bugs.openjdk.java.net/browse/JDK-8142508 
<https://bugs.openjdk.java.net/browse/JDK-8142508> , point (2). Maybe your jar 
gets overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same 
serializer in different threads concurrently. I don’t have the full custom code 
but this would be my guess. Ensure to duplicate serializers whenever different 
threads could work on them, e.g. processing vs checkpointing.

Best,
Stefan


 
> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>:
> 
> Hello all,
> 
> I am working on a custom sink implementation, but having weird issues with 
> checkpointing.
> 
> I am using a custom ListState to checkpoint, and it looks like this:
> private var checkpointMessages: ListState[Bucket] =_
> 
> My snapshot function looks like:
> 
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>       for((bucketName, bucket) <- bufferedMessages) {
> 
>         // cloning to avoid any conncurrent modification issues
>         var new_buffer = new ListBuffer[GenericRecord]()
> 
>         bucket.buffer.foreach(f=> new_buffer += f)
> 
>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
> 
>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>         else checkpointMessages.add(new_bucket)
>       }}
> where class bucket is:
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var 
> timestamp: Long) extends Serializable{
>   def this(name: String) = {
>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
> 
> BufferredMessages signature is 
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
> 
> The basic idea behind this implementation is I maintain multiple buffers, and 
> push messages(org.apache.avro.generic.GenericRecord) during the @invoke 
> section of the sink, upon reaching certain thresholds I archive these on s3.
> 
> I try to run this both locally in intellij and on a cluster:
> 
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then error 
> out with the exception below:
> 
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 
> 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 
> compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp 
> <http://bugreport.java.com/bugreport/crash.jsp>
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979 
> <http://127.0.0.1:60979/>', transport: 'socket'
> 
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> 
> I managed to collect a core dump: 
> https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a 
> <https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a>. 
> 
> On a cluster I start to set concurrent serialization issues: 
> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 
> <https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47>
> 
> My initial guess is this is happening due to the size of the ListState? but i 
> checked the number of records are around ~10k in the buffer. Due to the 
> nature of the application, we have to implement this in a custom sink.
> 
> Could someone please help me/ guide me to troubleshoot this further.
> 
> -- 
> Thanking in advance,
> Vipul

Reply via email to