gaoyan1998 opened a new issue, #336: URL: https://github.com/apache/doris-flink-connector/issues/336
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Version 1.2.7 ### What's Wrong? ``` 2024-03-11 08:59:49,243 INFO org.apache.flink.runtime.taskmanager.Task [] - doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) switched from CREATED to DEPLOYING. 2024-03-11 08:59:49,244 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) [DEPLOYING]. 2024-03-11 08:59:49,244 INFO org.apache.flink.runtime.io.network.partition.ResultPartition [] - Sort-merge partition 83a2598d6bedaa33bba189576122d476#0@42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0 initialized. 2024-03-11 08:59:49,584 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend@6e41178f 2024-03-11 08:59:49,584 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as BatchExecutionStateBackend 2024-03-11 08:59:49,584 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application defined checkpoint storage: org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage@338a690e 2024-03-11 08:59:49,584 INFO org.apache.flink.runtime.taskmanager.Task [] - doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) switched from DEPLOYING to INITIALIZING. 2024-03-11 08:59:49,850 INFO org.apache.doris.flink.sink.writer.DorisWriter [] - restore checkpointId 0 2024-03-11 08:59:49,850 INFO org.apache.doris.flink.sink.writer.DorisWriter [] - labelPrefix scps_log_database_load2dws 2024-03-11 08:59:49,855 INFO org.apache.doris.flink.sink.writer.DorisWriter [] - Send request to Doris FE 'http://10.0.1.36:8030/api/backends?is_alive=true' with user 'admin'. 2024-03-11 08:59:49,901 INFO org.apache.doris.flink.sink.writer.DorisWriter [] - Backend Info:{"backends":[{"ip":"10.0.1.46","http_port":8040,"is_alive":true},{"ip":"10.0.1.28","http_port":8040,"is_alive":true},{"ip":"10.0.1.22","http_port":8040,"is_alive":true}]} 2024-03-11 08:59:49,988 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - init RecordBuffer capacity 262144, count 3 2024-03-11 08:59:49,989 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - abort for labelSuffix scps_log_database_load2dws_0. start chkId 1. 2024-03-11 08:59:50,079 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result { "TxnId": 53343092, "Label": "scps_log_database_load2dws_0_1", "TwoPhaseCommit": "true", "Status": "Success", "Message": "OK", "NumberTotalRows": 0, "NumberLoadedRows": 0, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 0, "LoadTimeMs": 16, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 6, "ReadDataTimeMs": 0, "WriteDataTimeMs": 7, "CommitAndPublishTimeMs": 0 } 2024-03-11 08:59:50,082 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - abort 53343092 for check label scps_log_database_load2dws_0_1. 2024-03-11 08:59:50,085 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - abort for labelSuffix scps_log_database_load2dws_0 finished 2024-03-11 08:59:50,088 INFO org.apache.flink.runtime.taskmanager.Task [] - doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) switched from INITIALIZING to RUNNING. 2024-03-11 08:59:50,090 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3 2024-03-11 08:59:50,090 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - stream load started for scps_log_database_load2dws_0_1 on host 10.0.1.46:8040 2024-03-11 08:59:50,091 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - start execute load 2024-03-11 09:01:30,985 INFO org.apache.http.impl.execchain.RetryExec [] - I/O exception (java.net.SocketException) caught when processing request to {}->http://10.0.1.46:8040: Broken pipe (Write failed) 2024-03-11 09:01:40,287 ERROR org.apache.doris.flink.sink.writer.DorisWriter [] - stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException 2024-03-11 09:01:40,290 WARN org.apache.flink.runtime.taskmanager.Task [] - doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: java.lang.InterruptedException at org.apache.doris.flink.sink.writer.RecordStream.write(RecordStream.java:59) ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0] at org.apache.doris.flink.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:195) ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0] at org.apache.doris.flink.sink.writer.DorisWriter.write(DorisWriter.java:143) ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0] at org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.write(SinkV1Adapter.java:136) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.2.jar:1.17.2] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(Unknown Source) ~[?:?] at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) ~[?:?] at java.util.concurrent.ArrayBlockingQueue.take(Unknown Source) ~[?:?] at org.apache.doris.flink.sink.writer.RecordBuffer.write(RecordBuffer.java:91) ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0] at org.apache.doris.flink.sink.writer.RecordStream.write(RecordStream.java:57) ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0] ... 17 more 2024-03-11 09:01:40,294 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for doris_dws_log[3]: Writer (1/1)#0 (42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0). 2024-03-11 09:01:40,300 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task doris_dws_log[3]: Writer (1/1)#0 42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0. 2024-03-11 09:01:40,342 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.01, taskHeapMemory=537.600mb (563714445 bytes), taskOffHeapMemory=0 bytes, managedMemory=634.880mb (665719939 bytes), networkMemory=158.720mb (166429984 bytes)}, allocationId: af4ef76e87f887d787b3b34db222e140, jobId: 108303d779c058df66183f711b014211). 2024-03-11 09:01:40,345 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 108303d779c058df66183f711b014211 from job leader monitoring. 2024-03-11 09:01:40,345 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 108303d779c058df66183f711b014211. 2024-03-11 09:02:14,326 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection f1cded2e81112d915b164504ca51796a. 2024-03-11 09:02:14,327 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager akka.tcp://[email protected]:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000). 2024-03-11 09:02:14,331 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved ResourceManager address, beginning registration 2024-03-11 09:02:14,337 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Fatal error occurred in TaskExecutor akka.tcp://[email protected]:6122/user/rpc/taskmanager_0. org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[email protected]:6123/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManager because: The ResourceManager does not recognize this TaskExecutor. at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2442) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2386) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2] at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?] 2024-03-11 09:02:14,338 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... ``` ### What You Expected? i have try 1.4, 1.5 the same ### How to Reproduce? _No response_ ### Anything Else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
