featzhang created FLINK-39170:
---------------------------------
Summary: [Table/SQL] Add async batch lookup join optimization for
temporal table joins
Key: FLINK-39170
URL: https://issues.apache.org/jira/browse/FLINK-39170
Project: Flink
Issue Type: New Feature
Components: Table SQL / API, Table SQL / Planner
Reporter: featzhang
h2. Overview
This feature introduces an async batch lookup join optimization for temporal
table joins in Flink's Table API & SQL. The optimization batches multiple
lookup requests together to reduce network overhead and improve throughput,
particularly beneficial for high-throughput scenarios with frequent dimension
table lookups.
h2. Motivation
Currently, Flink's temporal table joins perform individual lookup requests for
each input record, which can lead to:
* *High network overhead*: Each lookup request requires a separate network
round-trip
* *Suboptimal throughput*: Network latency becomes a bottleneck in
high-throughput scenarios
* *Resource inefficiency*: Frequent small requests don't utilize network
bandwidth effectively
h2. Proposed Solution
Implement an async batch lookup join that:
* *Batches lookup requests*: Groups multiple lookup requests together before
sending to the external system
* *Configurable batch size*: Allows users to tune batch size based on their
specific use case
* *Timeout-based flushing*: Ensures low latency by flushing incomplete batches
after a configurable interval
* *Backward compatibility*: Maintains existing behavior when batch mode is
disabled (default)
h2. Use Cases
* *Real-time analytics*: High-throughput streaming jobs that need to enrich
data with dimension tables
* *Event processing*: Applications processing millions of events per second
with frequent lookups
* *Data warehousing*: ETL pipelines that need to join streaming data with
slowly changing dimensions
h2. Expected Benefits
* *Improved throughput*: 2-5x improvement in lookup throughput for high-volume
scenarios
* *Reduced network overhead*: Fewer network round-trips through batching
* *Better resource utilization*: More efficient use of network bandwidth and
external system resources
* *Configurable performance*: Users can tune batch size and flush interval
based on their latency/throughput requirements
h2. Configuration Options
h3. table.optimizer.dim-lookup-join.batch.enabled
* *Type*: Boolean
* *Default*: false
* *Description*: Whether to enable the dim table batch lookup join optimization
h3. table.optimizer.dim-lookup-join.batch.size
* *Type*: Integer
* *Default*: 100
* *Description*: The batch size of dim table lookup join. Controls how many
lookup requests are batched together.
h3. table.optimizer.dim-lookup-join.batch.flush.millis
* *Type*: Long
* *Default*: 2000L
* *Description*: The flush interval of dim table lookup join in batch mode, in
milliseconds. Controls the maximum time to wait before flushing a batch.
h2. Implementation Details
h3. Core Components
# *AsyncBatchLookupJoinRunner*: Main runner for batch async lookup joins
without calc
# *AsyncBatchLookupJoinWithCalcRunner*: Runner for batch async lookup joins
with post-lookup calculations
# *BatchLookupFunctionWrapper*: Adapter that wraps single lookup functions for
batch processing
# *BatchResultFutureWrapper*: Wrapper for handling batch result futures
h3. Architecture
{noformat}
Input Stream → Batch Accumulator → Batch Lookup Function → Result Processor →
Output Stream
↓
Timeout Flusher (configurable interval)
{noformat}
h3. Batch Processing Flow
# *Accumulation*: Input records are accumulated into batches up to the
configured batch size
# *Timeout Handling*: If batch size isn't reached within the flush interval,
the batch is processed anyway
# *Batch Lookup*: The entire batch is sent to the lookup function as a single
request
# *Result Distribution*: Results are distributed back to the corresponding
input records
# *Output*: Joined results are emitted to the output stream
h2. Compatibility
* *Backward Compatible*: Existing temporal table joins continue to work
unchanged
* *Opt-in Feature*: Must be explicitly enabled via configuration
* *API Compatible*: No changes to existing Table API or SQL syntax
h2. Testing Strategy
* *Unit Tests*: Comprehensive tests for all batch processing components
* *Integration Tests*: End-to-end tests with various batch sizes and flush
intervals
* *Performance Tests*: Benchmarks comparing batch vs non-batch performance
* *Configuration Tests*: Validation of all configuration options
--
This message was sent by Atlassian Jira
(v8.20.10#820010)