zoucao created FLINK-34964:
------------------------------

             Summary: ScheduledTask leak in registering the processing timer
                 Key: FLINK-34964
                 URL: https://issues.apache.org/jira/browse/FLINK-34964
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.20.0
            Reporter: zoucao
         Attachments: image-2024-03-29-16-40-11-928.png

I have come across a problem regarding a leak in the 'ScheduledTask' while 
registering the processing timer. Upon further investigation, I have identified 
two factors that are responsible for the leak.


*1. Registered 'ScheduledTask' has not been canceled*


see 
`org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
 when a registered timer want be deleted, flink only removes it from the 
'processingTimeTimersQueue'. However, it's possible that this timer is the 
earliest one that will be triggered in the future and has been scheduled as a 
task submitted to the ScheduledThreadPoolExecutor.

When deleting a registered timer, flink should check whether this timer is the 
next triggered time, if true, the current 'ScheduledTask' should be canceled.

 

*2. Re-submit a timer earlier than the System.currentTimeMillis*

Considering a case, the current time-millis is 100, and there exist 100、101、102 
in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At 
this moment, the user registers a timer-99. 99 is less than 100(the peek timer 
in queue), then Flink will cancel timer 100‘s task, and re-register using timer 
99. However, before canceling timer-100, the thread pool has submitted it to 
mailbox.
Then, the mail in mailbox is as follows:
{code:java}
 ->  * register timer-99
 ->    trigger timer-100
->     trigger timer-99
{code}
 - when executing 'trigger timer 100', Flink will flush records whose timer 
belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
 - when executing 'trigger timer-99', no records need to flush, then it also 
submits timer-101 to the scheduled thread pool, because timer-101 is the next 
timer needs to trigger.
Obviously, Two tasks are registered to Flink's scheduled thread pool with the 
same timer.

In our online job, the number of these leaked Scheduled Tasks could be in the 
thousands, see the following figure.

 

Here an example is posted, convenient for reproducing the case-2.
{code:java}
    @Test
    public void testTimerTaskLeak() {
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        MailboxExecutor mailboxExecutor =
                new MailboxExecutorImpl(
                        mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        SystemProcessingTimeService processingTimeService =
                new SystemProcessingTimeService(ex -> handleException(ex));

        ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
                processingTimeService,
                callback -> deferCallbackToMailbox(mailboxExecutor, callback));

        TestKeyContext keyContext = new TestKeyContext();

        Queue<String> mailQueue = new LinkedBlockingDeque<>();
        long curr = System.currentTimeMillis();
        InternalTimerServiceImpl<Integer, String> timerService =
                createAndStartInternalTimerService(
                        mock(Triggerable.class),
                        keyContext,
                        timeService,
                        testKeyGroupRange,
                        createQueueFactory());

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.execute(
                () -> {
                    try {
                            keyContext.setCurrentKey(1);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");

                            Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
                        Thread.sleep(2L);

                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
                        Thread.sleep(2L);
                            mailboxExecutor.execute(
                                    () -> {
                                        
timerService.registerProcessingTimeTimer("void", curr + 1);
                                    }, "1");

                            mailboxExecutor.execute(
                                    () -> {
                                        Thread.sleep(3); // wait timer +1 
submitted to mailbox
                                        
timerService.registerProcessingTimeTimer("void", curr - 5);
                                    }, "-5");
                            Thread.sleep(5L);
                            mailboxExecutor.execute(
                                    () -> 
timerService.registerProcessingTimeTimer("void", curr + 4), "4");
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

        );

        while (mailQueue.size() < 14) {
            if (mailbox.mailQueue().size() > 0) {
                String mail = mailbox.mailQueue().peek().toString();
                if (mail.length() > 5) {
                    mailQueue.add("trigger " + (Long.parseLong(mail.split("@ 
")[1]) - curr));
                } else {
                    mailQueue.add("register " + mail);
                }
            }
            mailboxExecutor.tryYield();
        }

        System.out.println(mailQueue);
        executorService.shutdownNow();
    }
{code}
!image-2024-03-29-16-40-11-928.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to