[ 
https://issues.apache.org/jira/browse/FLINK-20228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liqi316 updated FLINK-20228:
----------------------------
    Description: 
I will write the data to es(es version is 5.6.8. and flink use 
flink-connector-elasticsearch6), the task can run good at begin.But,after a 
while, the checkpoint will failed,and there are no any exception in taskmanager 
log .I get the jvm stack info as follow:

java.lang.Thread.State: BLOCKED (on object monitor)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:770)
 - waiting to lock <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:706)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:184)
 at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1169)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

"Legacy Source Thread - Source: Custom Source -> Map -> 
SourceConversion(table=[Unregistered_DataStream_2], fields=[C01, C02, C03, C04, 
C05, C06, C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C1
 9, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, 
C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, 
C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, 
 C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, topic, 
record_timestamp, key, partition, offset, rawmessage, isvalidate, proctime]) -> 
Calc(select=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10
 , C11, C12, C13, (C14 CONCAT _UTF-16LE' ' CONCAT C15) AS C14, C15, C16, C17, 
C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C
 44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, 
C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> 
SinkConversionToTuple2 -> Filter -> Map -> SourceConversion(
 table=[Unregistered_DataStream_7], fields=[C01, C02, C03, C04, C05, C06, C07, 
C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, 
C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
 C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, 
C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, 
C65, C66, C67, C68, C69, C70, C71, C72]) -> SinkConve
 rsionToRow -> Sink: Elasticsearch (1/6)" #91 prio=5 os_prio=0 
tid=0x00007f3e7d096800 nid=0xe050 waiting on condition [0x00007f3e41393000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x000000071f04f2d8> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
 at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
 at 
org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
 at 
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
 - locked <0x0000000713d8c188> (a org.elasticsearch.action.bulk.BulkProcessor)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
 at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
 at 
io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:119)
 at 
io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:25)
 at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:308)
 at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SinkConversion$236.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$234.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$234.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SinkConversion$232.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at StreamExecCalc$229.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$80.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 - locked <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
 - locked <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)

  was:
java.lang.Thread.State: BLOCKED (on object monitor)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:770)
 - waiting to lock <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:706)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:184)
 at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1169)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

"Legacy Source Thread - Source: Custom Source -> Map -> 
SourceConversion(table=[Unregistered_DataStream_2], fields=[C01, C02, C03, C04, 
C05, C06, C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C1
9, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, 
C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, 
C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, 
C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, topic, 
record_timestamp, key, partition, offset, rawmessage, isvalidate, proctime]) -> 
Calc(select=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10
, C11, C12, C13, (C14 CONCAT _UTF-16LE' ' CONCAT C15) AS C14, C15, C16, C17, 
C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C
44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, 
C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> 
SinkConversionToTuple2 -> Filter -> Map -> SourceConversion(
table=[Unregistered_DataStream_7], fields=[C01, C02, C03, C04, C05, C06, C07, 
C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, 
C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, 
C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, C65, 
C66, C67, C68, C69, C70, C71, C72]) -> SinkConve
rsionToRow -> Sink: Elasticsearch (1/6)" #91 prio=5 os_prio=0 
tid=0x00007f3e7d096800 nid=0xe050 waiting on condition [0x00007f3e41393000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x000000071f04f2d8> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at 
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
 at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
 at 
org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
 at 
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
 - locked <0x0000000713d8c188> (a org.elasticsearch.action.bulk.BulkProcessor)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
 at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
 at 
io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:119)
 at 
io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:25)
 at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:308)
 at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SinkConversion$236.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$234.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$234.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SinkConversion$232.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at StreamExecCalc$229.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at SourceConversion$80.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 - locked <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
 - locked <0x0000000713f7fef8> (a java.lang.Object)
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)


> ESsink accur thread blocked
> ---------------------------
>
>                 Key: FLINK-20228
>                 URL: https://issues.apache.org/jira/browse/FLINK-20228
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.9.1
>            Reporter: liqi316
>            Priority: Blocker
>             Fix For: 1.9.1
>
>
> I will write the data to es(es version is 5.6.8. and flink use 
> flink-connector-elasticsearch6), the task can run good at begin.But,after a 
> while, the checkpoint will failed,and there are no any exception in 
> taskmanager log .I get the jvm stack info as follow:
> java.lang.Thread.State: BLOCKED (on object monitor)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:770)
>  - waiting to lock <0x0000000713f7fef8> (a java.lang.Object)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:706)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:184)
>  at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1169)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> "Legacy Source Thread - Source: Custom Source -> Map -> 
> SourceConversion(table=[Unregistered_DataStream_2], fields=[C01, C02, C03, 
> C04, C05, C06, C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C1
>  9, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
> C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, 
> C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, 
>  C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, topic, 
> record_timestamp, key, partition, offset, rawmessage, isvalidate, proctime]) 
> -> Calc(select=[C01, C02, C03, C04, C05, C06, C07, C08, C09, C10
>  , C11, C12, C13, (C14 CONCAT _UTF-16LE' ' CONCAT C15) AS C14, C15, C16, C17, 
> C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, 
> C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C
>  44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, 
> C59, C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> 
> SinkConversionToTuple2 -> Filter -> Map -> SourceConversion(
>  table=[Unregistered_DataStream_7], fields=[C01, C02, C03, C04, C05, C06, 
> C07, C08, C09, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, 
> C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, 
>  C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, 
> C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, 
> C64, C65, C66, C67, C68, C69, C70, C71, C72]) -> SinkConve
>  rsionToRow -> Sink: Elasticsearch (1/6)" #91 prio=5 os_prio=0 
> tid=0x00007f3e7d096800 nid=0xe050 waiting on condition [0x00007f3e41393000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x000000071f04f2d8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
>  at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
>  at 
> org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
>  at 
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
>  - locked <0x0000000713d8c188> (a org.elasticsearch.action.bulk.BulkProcessor)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
>  at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
>  at 
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
>  at 
> io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:119)
>  at 
> io.github.interestinglab.waterdrop.flink.sink.function.ElasticsearchRichSink.process(ElasticsearchRichSink.java:25)
>  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:308)
>  at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SinkConversion$236.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$234.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$234.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SinkConversion$232.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at StreamExecCalc$229.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at SourceConversion$80.processElement(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  - locked <0x0000000713f7fef8> (a java.lang.Object)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>  - locked <0x0000000713f7fef8> (a java.lang.Object)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to