Thanks Stefan. I found the issue in my application. Everything is working
as excepted now.
Once again thanks for the help and advice.

On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <neoea...@gmail.com> wrote:

> Thanks Stefan for the answers. The serialization is happening during the
> creation of snapshot state. I have added a gist with a larger stacktrace(
> https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
> not using any serializer, in the custom sink.
>
> We have
>
> src.keyBy(m => (m.topic, m.partition))
>     .map(message => updateMessage(message, config))
> .addSink(new 
> CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
> .name(FLINK_JOB_ID)
>
> So there should be a 1-1 source and sink mapping, i am assuming.
>
> If possible could you could please give some more pointers to help
> troubleshoot
>
> Thanks,
> Vipul
>
>
> On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> the crash looks unrelated to Flink code from the dump’s trace. Since it
>> happens somewhere in managing a jar file, it might be related to this:
>> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
>> jar gets overwritten while running, e.g. from your IDE?
>>
>> The serialization exception looks like the custom sink is using the same
>> serializer in different threads concurrently. I don’t have the full custom
>> code but this would be my guess. Ensure to duplicate serializers whenever
>> different threads could work on them, e.g. processing vs checkpointing.
>>
>> Best,
>> Stefan
>>
>>
>>
>>
>> Am 20.10.2017 um 14:24 schrieb vipul singh <neoea...@gmail.com>:
>>
>> Hello all,
>>
>> I am working on a custom sink implementation, but having weird issues
>> with checkpointing.
>>
>> I am using a custom ListState to checkpoint, and it looks like this:
>>
>> private var checkpointMessages: ListState[Bucket] =_
>>
>>
>> My snapshot function looks like:
>>
>> @throws[IOException]
>> def snapshotState(context: FunctionSnapshotContext): Unit = {
>>   checkpointMessages.clear()
>>       for((bucketName, bucket) <- bufferedMessages) {
>>
>>         // cloning to avoid any conncurrent modification issues
>>         var new_buffer = new ListBuffer[GenericRecord]()
>>
>>         bucket.buffer.foreach(f=> new_buffer += f)
>>
>>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>>
>>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>>         else checkpointMessages.add(new_bucket)
>>       }}
>>
>> where class bucket is:
>>
>> @SerialVersionUID(1L)
>> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var 
>> timestamp: Long) extends Serializable{
>>   def this(name: String) = {
>>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>>   }
>> }
>>
>>
>> BufferredMessages signature is
>>
>> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>>
>>
>> The basic idea behind this implementation is I maintain multiple buffers,
>> and push messages(org.apache.avro.generic.GenericRecord) during the
>> @invoke section of the sink, upon reaching certain thresholds I archive
>> these on s3.
>>
>> I try to run this both locally in intellij and on a cluster:
>>
>> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
>> error out with the exception below:
>>
>>
>> # A fatal error has been detected by the Java Runtime Environment:
>> #
>> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232,
>> tid=0x0000000000003903
>> #
>> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
>> 1.8.0_131-b11)
>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
>> bsd-amd64 compressed oops)
>> # Problematic frame:
>> # V  [libjvm.dylib+0x46440c]
>> #
>> # Core dump written. Default location: /cores/core or core.25232
>> #
>> # An error report file with more information is saved as:
>> # hs_err_pid25232.log
>> #
>> # If you would like to submit a bug report, please visit:
>> #   http://bugreport.java.com/bugreport/crash.jsp
>> # The crash happened outside the Java Virtual Machine in native code.
>> # See problematic frame for where to report the bug.
>> #
>> Disconnected from the target VM, address: '127.0.0.1:60979', transport:
>> 'socket'
>>
>> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
>>
>> I managed to collect a core dump: https://gist.github.com/
>> neoeahit/38a02955c1de7501561fba2e593d5f6a.
>>
>> On a cluster I start to set concurrent serialization issues:
>> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
>>
>> My initial guess is this is happening due to the size of the ListState?
>> but i checked the number of records are around ~10k in the buffer. Due to
>> the nature of the application, we have to implement this in a custom sink.
>>
>> Could someone please help me/ guide me to troubleshoot this further.
>>
>> --
>> Thanking in advance,
>> Vipul
>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>



-- 
Thanks,
Vipul

Reply via email to