featzhang created FLINK-39170:
---------------------------------

             Summary: [Table/SQL] Add async batch lookup join optimization for 
temporal table joins
                 Key: FLINK-39170
                 URL: https://issues.apache.org/jira/browse/FLINK-39170
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / API, Table SQL / Planner
            Reporter: featzhang


h2. Overview
This feature introduces an async batch lookup join optimization for temporal 
table joins in Flink's Table API & SQL. The optimization batches multiple 
lookup requests together to reduce network overhead and improve throughput, 
particularly beneficial for high-throughput scenarios with frequent dimension 
table lookups.

h2. Motivation
Currently, Flink's temporal table joins perform individual lookup requests for 
each input record, which can lead to:
* *High network overhead*: Each lookup request requires a separate network 
round-trip
* *Suboptimal throughput*: Network latency becomes a bottleneck in 
high-throughput scenarios  
* *Resource inefficiency*: Frequent small requests don't utilize network 
bandwidth effectively

h2. Proposed Solution
Implement an async batch lookup join that:
* *Batches lookup requests*: Groups multiple lookup requests together before 
sending to the external system
* *Configurable batch size*: Allows users to tune batch size based on their 
specific use case
* *Timeout-based flushing*: Ensures low latency by flushing incomplete batches 
after a configurable interval
* *Backward compatibility*: Maintains existing behavior when batch mode is 
disabled (default)

h2. Use Cases
* *Real-time analytics*: High-throughput streaming jobs that need to enrich 
data with dimension tables
* *Event processing*: Applications processing millions of events per second 
with frequent lookups
* *Data warehousing*: ETL pipelines that need to join streaming data with 
slowly changing dimensions

h2. Expected Benefits
* *Improved throughput*: 2-5x improvement in lookup throughput for high-volume 
scenarios
* *Reduced network overhead*: Fewer network round-trips through batching
* *Better resource utilization*: More efficient use of network bandwidth and 
external system resources
* *Configurable performance*: Users can tune batch size and flush interval 
based on their latency/throughput requirements

h2. Configuration Options

h3. table.optimizer.dim-lookup-join.batch.enabled
* *Type*: Boolean
* *Default*: false
* *Description*: Whether to enable the dim table batch lookup join optimization

h3. table.optimizer.dim-lookup-join.batch.size
* *Type*: Integer
* *Default*: 100
* *Description*: The batch size of dim table lookup join. Controls how many 
lookup requests are batched together.

h3. table.optimizer.dim-lookup-join.batch.flush.millis
* *Type*: Long
* *Default*: 2000L
* *Description*: The flush interval of dim table lookup join in batch mode, in 
milliseconds. Controls the maximum time to wait before flushing a batch.

h2. Implementation Details

h3. Core Components
# *AsyncBatchLookupJoinRunner*: Main runner for batch async lookup joins 
without calc
# *AsyncBatchLookupJoinWithCalcRunner*: Runner for batch async lookup joins 
with post-lookup calculations
# *BatchLookupFunctionWrapper*: Adapter that wraps single lookup functions for 
batch processing
# *BatchResultFutureWrapper*: Wrapper for handling batch result futures

h3. Architecture
{noformat}
Input Stream → Batch Accumulator → Batch Lookup Function → Result Processor → 
Output Stream
                     ↓
              Timeout Flusher (configurable interval)
{noformat}

h3. Batch Processing Flow
# *Accumulation*: Input records are accumulated into batches up to the 
configured batch size
# *Timeout Handling*: If batch size isn't reached within the flush interval, 
the batch is processed anyway
# *Batch Lookup*: The entire batch is sent to the lookup function as a single 
request
# *Result Distribution*: Results are distributed back to the corresponding 
input records
# *Output*: Joined results are emitted to the output stream

h2. Compatibility
* *Backward Compatible*: Existing temporal table joins continue to work 
unchanged
* *Opt-in Feature*: Must be explicitly enabled via configuration
* *API Compatible*: No changes to existing Table API or SQL syntax

h2. Testing Strategy
* *Unit Tests*: Comprehensive tests for all batch processing components
* *Integration Tests*: End-to-end tests with various batch sizes and flush 
intervals
* *Performance Tests*: Benchmarks comparing batch vs non-batch performance
* *Configuration Tests*: Validation of all configuration options




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to