Matthias Pohl created FLINK-30354:
-------------------------------------

             Summary: Reducing the number of ThreadPools in LookupFullCache and 
related cache-loading classes
                 Key: FLINK-30354
                 URL: https://issues.apache.org/jira/browse/FLINK-30354
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Runtime
    Affects Versions: 1.17.0
            Reporter: Matthias Pohl


In the course of reviewing FLINK-29405, I came up with a proposal to reduce the 
complexity of the {{LookupFullCache}} implementation and shrinking the amount 
of threadpools being used from 3 to 2. Here's the proposal I also shared in the 
[FLINK-29405 PR 
comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:

About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} 
and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but 
also some utility methods for providing processing or event time (where it's 
not clear to me why this is connected with the reload. It looks like a future 
task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible 
concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.

About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which 
triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes 
longer, subsequently triggered calls pile up. Here, I'm wondering whether 
that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in 
{{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading 
the data. It triggers {{CacheLoader#updateCache}} with 
{{CacheLoader#reloadLock}} being acquired. 
{{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data 
is loaded concurrently if possible using a {{FixedThreadPool}}.

My proposal is now to reduce the number of used thread pools: Instead of having 
a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} 
implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where 
we specify the minimum number of threads being 1 and the maximum being the 
number of cores (similar to what is already there with 
[ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
 That would free the {{CacheLoader}} from starting and shutting down thread 
pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} 
calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the 
{{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} 
implementations could move into {{LookupFullCache}} as well calling it 
something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in 
charge of managing all cache loading-related threads. Additionally, it would 
manage the current execution through {{CompletableFutures}} (one for triggering 
the reload and one for executing the reload. Triggering a reload would require 
cancelling the current future (if it's not completed, yet) or ignoring the 
trigger if we want a reload to finish before triggering a new one.

{{CacheLoader#updateCache}} would become 
{{CacheLoader#updateCacheAsync(ExecutorService)}} returning a 
{{CompletableFuture}} that completes as soon as all subtasks are completed. 
{{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of 
creating its own future. The lifecycle (as already explained in the previous 
paragraph) would be managed by {{LookupFullCache}}. The benefit would be that 
we wouldn't have to deal interrupts in {{CacheLoader}}.

I see the following benefits:

{{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event 
time and processing time functions are for, though).
{{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the 
completion of the cache loading in {{LookupFullCache}} through the 
{{CompletableFuture}} instances.
{{CacheReloadTrigger}} can focus on the strategy implementation without 
worrying about instantiating threads. This is duplicated code right now in 
{{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
I might miss something here. I'm curious what you think. I probably got carried 
away a bit by your proposal introducing async calls. innocent I totally 
understand if you argue that it's way too much out-of-scope for this issue and 
we actually want to focus on fixing the test instability. In that case, I would 
do another round of review on your current proposal. But I'm happy to help you 
if you think that my proposal is reasonable. Or we create a follow-up Jira 
issue to tackle that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to