xudong963 commented on code in PR #22698:
URL: https://github.com/apache/datafusion/pull/22698#discussion_r3472642166


##########
datafusion/physical-expr-common/src/adaptive/registry.rs:
##########
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Concurrent registry of per-predicate [`SelectivityStats`] plus the
+//! "skip" flags that let an *optional* predicate be turned into a no-op
+//! mid-stream.
+//!
+//! This is shared plumbing, free of any placement/ordering policy. A consumer
+//! [`register`](AdaptiveStatsRegistry::register)s the predicates it tracks,
+//! calls [`record`](AdaptiveStatsRegistry::record) on the per-batch hot path,
+//! and reads back [`snapshot`](AdaptiveStatsRegistry::snapshot)s when it
+//! periodically re-decides (placement, ordering, drop, …).
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use parking_lot::{Mutex, RwLock};
+
+use super::stats::{FilterId, SelectivityStats};
+
+/// Thread-safe map of [`FilterId`] → online [`SelectivityStats`], with
+/// per-predicate skip flags.
+///
+/// # Locking
+///
+/// The outer [`RwLock`] over the stats map is almost always *read*-locked: 
both
+/// [`record`](Self::record) (hot, per-batch) and the snapshot readers only 
need
+/// shared access to look up an existing entry. The write lock is taken only by
+/// [`register`](Self::register) when a new [`FilterId`] is first seen — a 
brief,
+/// infrequent operation.
+///
+/// Each entry is an independent [`Mutex<SelectivityStats>`], so concurrent
+/// `record` calls on *different* predicates proceed in parallel with zero
+/// contention.
+#[derive(Debug, Default)]
+pub struct AdaptiveStatsRegistry {
+    /// Per-predicate selectivity statistics, each individually 
`Mutex`-guarded.
+    stats: RwLock<HashMap<FilterId, Mutex<SelectivityStats>>>,
+    /// Per-predicate "skip" flags. When set, the consumer treats the predicate
+    /// as a no-op for subsequent batches. Only ever set for predicates whose
+    /// [`SelectivityStats::is_optional`] is `true` — mandatory predicates must
+    /// always execute or queries return wrong rows.
+    skip_flags: RwLock<HashMap<FilterId, Arc<AtomicBool>>>,
+}
+
+impl AdaptiveStatsRegistry {
+    /// Create an empty registry.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Register a predicate so future [`record`](Self::record) calls can find
+    /// it. Idempotent: an already-registered id keeps its accumulated stats 
and
+    /// its existing optional flag.
+    ///
+    /// `is_optional` records whether the predicate may be dropped without
+    /// affecting correctness (see [`SelectivityStats::is_optional`]).
+    pub fn register(&self, id: FilterId, is_optional: bool) {
+        if self.stats.read().contains_key(&id) {
+            return;
+        }
+        let mut stats = self.stats.write();
+        stats
+            .entry(id)
+            .or_insert_with(|| Mutex::new(SelectivityStats::new(is_optional)));
+        self.skip_flags
+            .write()
+            .entry(id)
+            .or_insert_with(|| Arc::new(AtomicBool::new(false)));
+    }
+
+    /// Register many predicates at once (see [`register`](Self::register)).
+    pub fn register_all(&self, entries: impl IntoIterator<Item = (FilterId, 
bool)>) {
+        let mut stats = self.stats.write();
+        let mut flags = self.skip_flags.write();
+        for (id, is_optional) in entries {
+            stats
+                .entry(id)
+                .or_insert_with(|| 
Mutex::new(SelectivityStats::new(is_optional)));
+            flags
+                .entry(id)
+                .or_insert_with(|| Arc::new(AtomicBool::new(false)));
+        }
+    }
+
+    /// Record one batch of observations for `id` (per-batch hot path).
+    ///
+    /// Takes only a shared lock on the map plus the per-predicate mutex, so it
+    /// never contends with `record` calls on other predicates. A no-op if `id`
+    /// was never [`register`](Self::register)ed.
+    pub fn record(
+        &self,
+        id: FilterId,
+        matched: u64,
+        total: u64,
+        eval_nanos: u64,
+        effectiveness_sample: f64,
+    ) {
+        let map = self.stats.read();
+        if let Some(entry) = map.get(&id) {
+            entry
+                .lock()
+                .record(matched, total, eval_nanos, effectiveness_sample);
+        }
+    }
+
+    /// Merge a locally-accumulated delta into `id`'s shared stats (see
+    /// [`SelectivityStats::merge`]). A no-op if `id` was never
+    /// [`register`](Self::register)ed.
+    ///
+    /// Lets a consumer record per-batch observations into a private
+    /// accumulator with no locking and fold them in here periodically.
+    pub fn merge(&self, id: FilterId, delta: &SelectivityStats) {
+        let map = self.stats.read();
+        if let Some(entry) = map.get(&id) {
+            entry.lock().merge(delta);
+        }
+    }
+
+    /// Copy out the current stats for `id`, or `None` if unregistered.
+    ///
+    /// [`SelectivityStats`] is `Copy`, so consumers read every derived metric
+    /// (pass rate, cost-per-row, effectiveness, confidence bounds) off the
+    /// returned value without holding any lock.
+    pub fn snapshot(&self, id: FilterId) -> Option<SelectivityStats> {
+        self.stats.read().get(&id).map(|entry| *entry.lock())
+    }
+
+    /// Clear the accumulated stats for `id`, preserving its optional flag and
+    /// skip flag. Used when a dynamic predicate re-arms under a stable id (see
+    /// [`SelectivityStats::reset`]). A no-op if `id` is unregistered.
+    pub fn reset(&self, id: FilterId) {
+        if let Some(entry) = self.stats.read().get(&id) {
+            entry.lock().reset();
+        }
+    }
+
+    /// The shared skip flag for `id`, registering `id` as optional if it was
+    /// not already present. The returned `Arc` can be cached by an evaluator 
so
+    /// it can cheaply check the flag without touching the registry.
+    pub fn skip_flag(&self, id: FilterId) -> Arc<AtomicBool> {
+        if let Some(flag) = self.skip_flags.read().get(&id) {
+            return Arc::clone(flag);
+        }
+        // First sighting: register as optional and create the flag.
+        self.register(id, true);
+        Arc::clone(
+            self.skip_flags
+                .read()
+                .get(&id)
+                .expect("skip flag inserted by register"),
+        )
+    }
+
+    /// Whether `id`'s skip flag is currently set. `false` if unregistered.
+    pub fn is_skipped(&self, id: FilterId) -> bool {
+        self.skip_flags
+            .read()
+            .get(&id)
+            .is_some_and(|flag| flag.load(Ordering::Relaxed))
+    }
+
+    /// Set or clear `id`'s skip flag. A no-op if `id` is unregistered.
+    pub fn set_skipped(&self, id: FilterId, skipped: bool) {

Review Comment:
   `set_skipped` can mark any registered predicate as skipped, even if 
`SelectivityStats::is_optional()` is false. How about checking `is_optional == 
true` in the method



##########
datafusion/physical-plan/src/adaptive_filter.rs:
##########
@@ -0,0 +1,1841 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Runtime-adaptive evaluation of a conjunctive (`AND`) predicate for
+//! [`FilterExec`](crate::filter::FilterExec).
+//!
+//! This is the *ordering* policy that sits on top of the shared measurement
+//! substrate in
+//! [`datafusion_physical_expr_common::adaptive`]. The substrate
+//! ([`AdaptiveStatsRegistry`]) measures per-conjunct selectivity and cost; 
this
+//! module decides, from those measurements, what order to evaluate the
+//! conjuncts in.
+//!
+//! ## How it evaluates
+//!
+//! The conjuncts are evaluated sequentially, combining their boolean results
+//! with `AND`. The working batch is physically compacted to the surviving rows
+//! only once the accumulated mask becomes selective enough (the same
+//! pre-selection gate [`BinaryExpr`]'s `AND` short-circuit uses) — so a run of
+//! non-selective conjuncts costs only cheap bitwise `AND`s, while a selective
+//! conjunct shrinks the batch the expensive conjuncts after it must decode.
+//! Gating is what makes ordering matter: a conjunct that compacts early saves
+//! every later conjunct work, so the cheap-and-selective ones should run 
first.
+//! Each conjunct is timed and counted on exactly the rows it evaluated, giving
+//! the *marginal* selectivity and cost on the current working population.
+//!
+//! ## How it reorders: the model proposes, a trial disposes
+//!
+//! Every conjunct accrues a per-batch *effectiveness* sample of **rows
+//! discarded per second** (`(total - matched) * 1e9 / eval_nanos`). Maximising
+//! discards-per-second is exactly minimising `cost_per_row / (1 - pass_rate)`,
+//! the classic optimal ordering key for independent conjuncts — so an 
expensive
+//! but very selective predicate (e.g. a `LIKE` that keeps one row) correctly
+//! sorts ahead of a cheap but unselective one.
+//!
+//! Crucially, this ranking is only ever a *proposal*. The classic key is
+//! built on assumptions real execution violates: it prices a conjunct's work
+//! as `rows × cost_per_row` even though vectorised kernels do not scale down
+//! without paying for compaction; and the marginal cost-per-row measured on a
+//! handful of survivor rows says little about cost at the front of the order.
+//! An order that looks optimal on paper can run slower than the one the query
+//! author wrote. So a proposed order is never adopted on the model's word: it
+//! is put through an **A/B trial** — batches alternate between the incumbent
+//! and the candidate arrangement, each timed end-to-end — and the candidate
+//! wins only if its measured ns/row beats the incumbent's with statistical
+//! confidence. Ties favour the incumbent; a rejected candidate is remembered
+//! and not re-tried. Adopted orders run via the compact-once loop (the
+//! arrangement the trial actually measured), not re-fused into a chain with
+//! different cost behaviour.
+//!
+//! Proposals themselves are debounced: the learner only settles once every
+//! adjacent pair of the ranking is either statistically certain or a tie
+//! whose order provably cannot matter, and a proposal that does not promise a
+//! material improvement over the incumbent freezes the incumbent unchanged —
+//! interchangeable conjuncts never trigger an experiment at all.
+//!
+//! To stay correct under distribution drift, a frozen evaluator periodically
+//! *re-thaws*: it re-measures a short window and re-proposes. Each re-thaw
+//! that keeps the incumbent backs the next one off exponentially, so a stable
+//! filter is re-checked geometrically less often and steady-state overhead
+//! decays toward zero; a candidate that wins its trial resets the interval so
+//! real drift is caught quickly.
+//!
+//! All partition streams of a `FilterExec` share their measurements and trial
+//! verdicts, so the learning cost is paid once per query, not once per
+//! stream, and a stream that starts late adopts the validated champion on its
+//! first batch.
+//!
+//! ## Known limitation
+//!
+//! The measured selectivity of a conjunct is *conditional* on the conjuncts
+//! ordered before it (it only sees their survivors), so with strongly
+//! correlated predicates the *proposal* can be a local optimum — the trial
+//! protects against adopting a bad one, but a better unexplored order may be
+//! missed. A proper exploration phase (measuring each conjunct's marginal
+//! selectivity on a common population) is future work.
+//!
+//! [`BinaryExpr`]: datafusion_physical_expr::expressions::BinaryExpr
+
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, 
UInt32Array};
+use arrow::buffer::BooleanBuffer;
+use arrow::compute::kernels::boolean::and;
+use arrow::compute::{filter, filter_record_batch, prep_null_mask_filter};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::instant::Instant;
+use datafusion_expr::Operator;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::utils::split_conjunction;
+use datafusion_physical_expr_common::adaptive::{
+    AdaptiveArbiter, AdaptiveStatsRegistry, SelectivityStats, TrialUpdate,
+};
+use datafusion_physical_expr_common::physical_expr::is_volatile;
+use log::debug;
+
+/// Confidence multiplier for the one-sided interval on effectiveness
+/// (~97.5% one-sided at 2.0).
+const CONFIDENCE_Z: f64 = 2.0;
+/// Minimum per-conjunct samples before the confidence intervals are trusted
+/// enough to make a freeze decision.
+const MIN_SAMPLES_FOR_CI: u64 = 4;
+/// Freeze after this many learning samples even if the order's confidence
+/// intervals still overlap: if the conjuncts cannot be told apart by now, 
their
+/// relative order does not matter, so stop paying to measure it.
+const MAX_LEARNING_SAMPLES: u64 = 64;
+/// Batches the fast frozen path runs before the first re-thaw check.
+const INITIAL_THAW_INTERVAL: u64 = 64;
+/// Each re-thaw that confirms the order is unchanged multiplies the next
+/// interval by this factor (exponential backoff), so a stable filter is
+/// re-checked geometrically less often and steady-state overhead decays to ~0.
+const THAW_BACKOFF: u64 = 4;
+/// Upper bound on the re-thaw interval.
+const MAX_THAW_INTERVAL: u64 = 16_384;
+/// Batches measured during a re-thaw before re-deciding the order.
+const REMEASURE_WINDOW: u64 = 16;
+/// Measured batches at the start of each measuring phase (learning and
+/// re-measure windows) that run back-to-back, so freeze decisions are
+/// reachable quickly even on short streams.
+const MEASURE_WARMUP: u64 = 8;
+/// After the warmup, measure one batch in this many; the rest evaluate the
+/// fused predicate in the current order with no instrumentation. Bounds the
+/// steady measurement overhead even when the order never resolves.
+const MEASURE_STRIDE: u64 = 8;
+/// Fraction of the conjunction's expected total cost below which a difference
+/// is immaterial. Adjacent conjuncts whose swap could not change the expected
+/// cost by more than this are a *tie*: their relative order does not matter,
+/// so an unresolved tie neither delays freezing nor counts as drift when a
+/// re-thaw re-ranks them.
+const TIE_COST_FRACTION: f64 = 0.05;
+/// Physically compact the working batch to the surviving rows only when the
+/// accumulated mask keeps at most this fraction of them. Above this, the cost
+/// of materializing a barely-smaller batch is not repaid, so we keep 
evaluating
+/// against the full working batch and just AND the boolean masks — mirroring
+/// the pre-selection gate in `BinaryExpr`'s `AND` short-circuit.
+const COMPACTION_SELECTIVITY_THRESHOLD: f64 = 0.2;
+/// Paired samples before a trial verdict. The challenger must beat the
+/// incumbent with confidence within this many pairs or it is rejected (ties
+/// favour the incumbent).
+/// How an arrangement evaluates its conjuncts.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum Strategy {
+    /// A single left-deep `AND` of [`BinaryExpr`]s — byte-for-byte what
+    /// `FilterExec` runs when the feature is off. The incumbent starts here so
+    /// that doing nothing costs nothing.
+    Fused,
+    /// The compact-once loop ([`eval_conjuncts`]): masks are `AND`-combined
+    /// and the working batch is physically compacted at most when the
+    /// survivors drop below the pre-selection threshold, instead of
+    /// re-filtering and re-scattering at every `AND` level like the fused
+    /// chain does. Orders adopted by a trial run this way, since this is the
+    /// strategy the trial measured.
+    CompactOnce,
+}
+
+/// The executable arrangement an adaptive `FilterExec` decides between: an
+/// evaluation order plus the strategy that runs it. This is the type the
+/// shared [`AdaptiveArbiter`] coordinates trials over; the arbiter itself is
+/// arrangement-agnostic (a parquet scan would put filter *placement* here
+/// instead).
+#[derive(Debug, Clone, PartialEq)]
+struct Arrangement {
+    /// Evaluation order: indices into the conjunct list.
+    order: Vec<usize>,
+    /// How the order is executed.
+    strategy: Strategy,
+}
+
+/// Lifecycle of the adaptive evaluator.
+#[derive(Debug)]
+enum Phase {
+    /// Measuring batches (warmup, then strided), building confidence in the
+    /// per-conjunct statistics that *propose* a candidate order.
+    Learning,
+    /// Participating in the shared A/B trial (see [`AdaptiveArbiter`]):
+    /// alternating arms on consecutive batches and submitting paired
+    /// end-to-end timings. Only a measured win changes the order — the
+    /// per-conjunct cost model proposes, but never decides.
+    Trial {
+        /// The candidate arrangement, copied out of the shared trial so the
+        /// hot path does not lock to learn what to evaluate.
+        candidate: Arrangement,
+        /// ns/row of the incumbent leg of the current pair, once run.
+        pending_incumbent: Option<f64>,
+        /// Freeze interval to use if the candidate is rejected (carries this
+        /// stream's re-thaw backoff); an adopted candidate resets it.
+        interval_if_rejected: u64,
+    },
+    /// Order settled; the incumbent arrangement is evaluated with no
+    /// measurement until `thaw_at` batches have been processed.
+    Frozen {
+        /// Batch count at which to re-measure.
+        thaw_at: u64,
+        /// Interval that produced `thaw_at`; grows on each confirmation.
+        interval: u64,
+    },
+    /// Briefly measuring again after a thaw, to detect distribution drift.
+    Remeasuring {
+        /// Measured batches left in the re-measurement window.
+        window_left: u64,
+        /// The frozen interval before this thaw (for backoff bookkeeping).
+        interval: u64,
+    },
+}
+
+/// State shared by every partition stream of one `FilterExec`: the
+/// per-conjunct measurement registry plus the arrangement arbiter, so the
+/// streams learn as one and one stream's completed trial serves all of them.
+#[derive(Debug, Default)]
+pub(crate) struct AdaptiveFilterShared {
+    /// Per-conjunct selectivity/cost stats, keyed by conjunct index.
+    stats: AdaptiveStatsRegistry,
+    /// Champion/challenger coordination over [`Arrangement`]s.
+    arbiter: AdaptiveArbiter<Arrangement>,
+}
+
+impl AdaptiveFilterShared {
+    pub(crate) fn new() -> Self {
+        Self::default()
+    }
+}
+
+/// Adaptive evaluator for a single conjunctive predicate.
+///
+/// Owned per partition stream (single-threaded), so all state — order, stats,
+/// and lifecycle [`Phase`] — is held directly and mutated through `&mut self`
+/// with no locking. The stats are a plain `Vec` indexed by conjunct id (ids 
are
+/// dense `0..n`), so the per-batch hot path is a direct index, not a locked 
map
+/// lookup.
+///
+/// What *is* shared is the measurements and the trial verdicts: per-batch
+/// observations accumulate in the stream-local `stats` (lock-free) and are
+/// folded into the [`AdaptiveFilterShared`] registry whenever this stream
+/// makes a decision, so N streams converge ~N× faster; and the first stream
+/// whose trial validates (or rejects) a candidate publishes the verdict so the
+/// others adopt it without re-running the experiment. Drift decisions
+/// (re-thaw windows) deliberately use only the local fresh window — the shared
+/// accumulator is a long-run prior and would dilute a distribution change.
+#[derive(Debug)]
+pub(crate) struct AdaptiveConjunction {
+    /// The conjuncts. Stats/order indices refer to positions in this `Vec`.
+    conjuncts: Vec<Arc<dyn PhysicalExpr>>,
+    /// Per-conjunct observations accumulated locally since the last flush to
+    /// [`shared`](Self::shared), indexed by conjunct id.
+    stats: Vec<SelectivityStats>,
+    /// Measurements and verdicts shared by every partition stream of the same
+    /// `FilterExec`.
+    shared: Arc<AdaptiveFilterShared>,
+    /// Champion epoch this stream has caught up to (see
+    /// [`AdaptiveArbiter::epoch`]).
+    epoch_seen: u64,
+    /// Incumbent evaluation order: indices into 
[`conjuncts`](Self::conjuncts).
+    order: Vec<usize>,
+    /// Incumbent evaluation strategy. Starts as [`Strategy::Fused`] over the
+    /// written order (identical to the feature being off); becomes
+    /// [`Strategy::CompactOnce`] when a trial adopts a new arrangement.
+    strategy: Strategy,
+    /// The conjuncts fused into a left-deep `AND` in [`order`](Self::order),
+    /// used by [`Strategy::Fused`] and by unmeasured learning batches.
+    /// Rebuilt when the order changes.
+    fused: Arc<dyn PhysicalExpr>,
+    /// Total batches processed; drives the re-thaw schedule.
+    batches: u64,
+    /// Measured batches in the current measuring phase; drives the
+    /// warmup-then-stride measurement schedule.
+    measured: u64,
+    /// Current lifecycle phase.
+    phase: Phase,
+}
+
+impl AdaptiveConjunction {
+    /// Build an adaptive evaluator for `predicate`, or `None` if adaptive
+    /// reordering does not apply:
+    ///
+    /// - `enabled` is false (the config flag is off);
+    /// - the predicate has fewer than two `AND` conjuncts (nothing to 
reorder);
+    /// - any conjunct is volatile (reordering could change results).
+    ///
+    /// `shared` is the state common to all partition streams of the owning
+    /// `FilterExec`; the conjuncts are registered in it under their indices.
+    pub(crate) fn try_new(
+        predicate: &Arc<dyn PhysicalExpr>,
+        enabled: bool,
+        shared: Arc<AdaptiveFilterShared>,
+    ) -> Option<Self> {
+        if !enabled {
+            return None;
+        }
+        let conjuncts: Vec<Arc<dyn PhysicalExpr>> = 
split_conjunction(predicate)
+            .into_iter()
+            .map(Arc::clone)
+            .collect();
+        if conjuncts.len() < 2 {
+            return None;
+        }
+        if conjuncts.iter().any(is_volatile) {
+            return None;
+        }
+
+        shared
+            .stats
+            .register_all((0..conjuncts.len()).map(|id| (id, false)));
+        let stats = vec![SelectivityStats::new(false); conjuncts.len()];
+        let order: Vec<usize> = (0..conjuncts.len()).collect();
+        let fused = fuse(&conjuncts, &order);
+
+        Some(Self {
+            conjuncts,
+            stats,
+            shared,
+            epoch_seen: 0,
+            order,
+            strategy: Strategy::Fused,
+            fused,
+            batches: 0,
+            measured: 0,
+            phase: Phase::Learning,
+        })
+    }
+
+    /// Evaluate the conjunction against `batch`, returning the boolean mask
+    /// (over the batch's original rows) of rows that passed every conjunct.
+    ///
+    /// While [`Learning`](Phase::Learning) or 
[`Remeasuring`](Phase::Remeasuring),
+    /// a warmup-then-strided subset of batches is evaluated and measured
+    /// per-conjunct (see [`evaluate_measured`](Self::evaluate_measured));
+    /// [`Trial`](Phase::Trial) batches alternate between the incumbent and the
+    /// candidate arrangement, timed end-to-end; all other batches — including
+    /// the whole [`Frozen`](Phase::Frozen) phase — evaluate the incumbent with
+    /// no instrumentation.
+    pub(crate) fn evaluate(&mut self, batch: &RecordBatch) -> Result<ArrayRef> 
{
+        self.batches += 1;
+
+        // Adopt a champion another stream's trial has validated since we last
+        // looked: a relaxed atomic load per batch, a lock only on change.
+        if self.shared.arbiter.epoch() != self.epoch_seen {
+            self.adopt_shared_champion();
+        }
+
+        // Frozen fast path; when the interval elapses, drop into a fresh
+        // measurement window to check whether the data has drifted.
+        if let Phase::Frozen { thaw_at, interval } = self.phase {
+            if self.batches < thaw_at {
+                return self.evaluate_incumbent(batch);
+            }
+            self.stats.iter_mut().for_each(SelectivityStats::reset);
+            self.measured = 0;
+            self.phase = Phase::Remeasuring {
+                window_left: REMEASURE_WINDOW,
+                interval,
+            };
+        }
+
+        if matches!(self.phase, Phase::Trial { .. }) {
+            return self.evaluate_trial(batch);
+        }
+
+        // Learning or re-measuring: measure a warmup of consecutive batches,
+        // then only one batch per stride, so an order that stays unresolved
+        // costs a bounded fraction of the stream rather than all of it.
+        if self.measured >= MEASURE_WARMUP && 
!self.batches.is_multiple_of(MEASURE_STRIDE)
+        {
+            return self.evaluate_incumbent(batch);
+        }
+
+        self.measured += 1;
+        let result = self.evaluate_measured(batch)?;
+        self.update_phase();
+        Ok(result)
+    }
+
+    /// Evaluate the incumbent arrangement with no instrumentation.
+    fn evaluate_incumbent(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        match self.strategy {
+            Strategy::Fused => 
self.fused.evaluate(batch)?.into_array(batch.num_rows()),
+            Strategy::CompactOnce => {
+                eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+            }
+        }
+    }
+
+    /// Install the trial-validated champion published by another stream and
+    /// freeze on it: the experiment has already been run, re-running it here
+    /// would only repeat the cost.
+    fn adopt_shared_champion(&mut self) {
+        let (champion, epoch) = self.shared.arbiter.champion();
+        self.epoch_seen = epoch;
+        if let Some(champion) = champion {
+            self.set_order(champion.order);
+            self.strategy = champion.strategy;
+            self.freeze(INITIAL_THAW_INTERVAL);
+        }
+    }
+
+    /// Contribute one batch to the shared A/B trial. Each stream runs the
+    /// incumbent and the candidate on consecutive batches, timing both, and
+    /// submits the pair's `ln(candidate / incumbent)` ns/row ratio; once
+    /// enough pairs accumulate (across all streams) the trial concludes. The
+    /// candidate is adopted only if the mean log-ratio is below zero with
+    /// confidence; ties favour the incumbent.
+    fn evaluate_trial(&mut self, batch: &RecordBatch) -> Result<ArrayRef> {
+        let Phase::Trial {
+            candidate,
+            pending_incumbent,
+            interval_if_rejected,
+        } = &mut self.phase
+        else {
+            unreachable!("caller checked the phase");
+        };
+        let interval_if_rejected = *interval_if_rejected;
+        let rows = batch.num_rows();
+
+        // Incumbent leg first; no locking on either leg.
+        let Some(incumbent_ns) = *pending_incumbent else {
+            let timer = Instant::now();
+            let result = match self.strategy {
+                Strategy::Fused => {
+                    self.fused.evaluate(batch)?.into_array(batch.num_rows())
+                }
+                Strategy::CompactOnce => {
+                    eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+                }
+            }?;
+            let nanos = timer.elapsed().as_nanos() as u64;
+            if rows > 0
+                && nanos > 0
+                && let Phase::Trial {
+                    pending_incumbent, ..
+                } = &mut self.phase
+            {
+                *pending_incumbent = Some(nanos as f64 / rows as f64);
+            }
+            return Ok(result);
+        };
+
+        // Candidate leg; completes the pair.
+        let candidate = std::mem::replace(
+            candidate,
+            Arrangement {
+                order: Vec::new(),
+                strategy: Strategy::CompactOnce,
+            },
+        );
+        let timer = Instant::now();
+        let result = eval_conjuncts(&self.conjuncts, &candidate.order, batch, 
None)?;
+        let nanos = timer.elapsed().as_nanos() as u64;
+        let sample = (rows > 0 && nanos > 0)
+            .then(|| (nanos as f64 / rows as f64 / incumbent_ns).ln())
+            .filter(|s| s.is_finite());
+
+        let Some(sample) = sample else {
+            // Not a usable pair; start the next one.
+            self.phase = Phase::Trial {
+                candidate,
+                pending_incumbent: None,
+                interval_if_rejected,
+            };
+            return Ok(result);
+        };
+        match self
+            .shared
+            .arbiter
+            .submit_pair(&candidate, rows as u64, nanos, sample)
+        {
+            TrialUpdate::Running => {
+                self.phase = Phase::Trial {
+                    candidate,
+                    pending_incumbent: None,
+                    interval_if_rejected,
+                };
+            }
+            TrialUpdate::Adopted(champion) => {
+                debug!("adaptive filter trial adopted {champion:?}");
+                self.epoch_seen = self.shared.arbiter.epoch();
+                self.set_order(champion.order);
+                self.strategy = champion.strategy;
+                self.freeze(INITIAL_THAW_INTERVAL);
+            }
+            // Rejected here, or concluded on another stream (an adoption
+            // there is handled by the epoch check): the incumbent stands.
+            TrialUpdate::Rejected(rejected) => {
+                debug!("adaptive filter trial rejected {rejected:?}");
+                self.freeze(interval_if_rejected);
+            }
+            TrialUpdate::Superseded => self.freeze(interval_if_rejected),
+        }
+        Ok(result)
+    }
+
+    /// Evaluate the conjuncts in the incumbent order via the compact-once
+    /// loop, recording each conjunct's marginal selectivity and cost into the
+    /// stream-local stats.
+    fn evaluate_measured(&mut self, batch: &RecordBatch) -> Result<ArrayRef> {
+        eval_conjuncts(&self.conjuncts, &self.order, batch, Some(&mut 
self.stats))
+    }
+
+    /// Advance the lifecycle phase after measuring a batch:
+    /// - [`Learning`](Phase::Learning) folds the local observations into the
+    ///   shared registry and proposes a candidate once the *shared* stats
+    ///   settle the ranking (see [`settled_order`]), so all partition streams
+    ///   learn as one;
+    /// - [`Remeasuring`](Phase::Remeasuring) proposes from the *local* fresh
+    ///   window only when its window ends (the shared accumulator is a
+    ///   long-run prior that would dilute a distribution change), then folds
+    ///   the window into the shared registry.
+    ///
+    /// Either way the proposal goes through [`decide`](Self::decide): a
+    /// proposal that does not promise a material improvement freezes the
+    /// incumbent unchanged, and one that does is handed to an A/B
+    /// [`Trial`](Phase::Trial) rather than adopted on the model's word.
+    fn update_phase(&mut self) {
+        match std::mem::replace(&mut self.phase, Phase::Learning) {
+            Phase::Learning => {
+                self.flush_to_shared();
+                let stats = self.shared_snapshot();
+                if let Some(order) = settled_order(&stats) {
+                    self.decide(order, &stats, INITIAL_THAW_INTERVAL);
+                }
+                // else: stay in `Learning` (already restored by the replace).
+            }
+            Phase::Remeasuring {
+                window_left,
+                interval,
+            } => {
+                let window_left = window_left - 1;
+                if window_left == 0 {
+                    let candidate = rank_by_effectiveness(&self.stats);
+                    let stats = std::mem::take(&mut self.stats);
+                    let backoff =
+                        
interval.saturating_mul(THAW_BACKOFF).min(MAX_THAW_INTERVAL);
+                    self.decide(candidate, &stats, backoff);
+                    self.stats = stats;
+                    self.flush_to_shared();
+                } else {
+                    self.phase = Phase::Remeasuring {
+                        window_left,
+                        interval,
+                    };
+                }
+            }
+            // Not reachable: frozen and trial batches never take the measured
+            // path. Restore the phase.
+            other => self.phase = other,
+        }
+    }
+
+    /// Act on a proposed `candidate` order, judged against `stats`:
+    ///
+    /// - the incumbent's order (or an immaterial reshuffle of it, per
+    ///   [`TIE_COST_FRACTION`]) freezes the incumbent for
+    ///   `interval_if_unchanged` batches — ties never trigger an experiment;
+    /// - a candidate the shared verdicts already rejected does the same — the
+    ///   experiment has been run and lost;
+    /// - otherwise the candidate enters an A/B [`Trial`](Phase::Trial). The
+    ///   per-conjunct stats only ever *propose*; adoption requires the trial's
+    ///   measured end-to-end win.
+    fn decide(
+        &mut self,
+        candidate: Vec<usize>,
+        stats: &[SelectivityStats],
+        interval_if_unchanged: u64,
+    ) {
+        let materially_better = expected_cost_per_row(stats, &candidate)
+            < (1.0 - TIE_COST_FRACTION) * expected_cost_per_row(stats, 
&self.order);
+        if candidate == self.order || !materially_better {
+            self.freeze(interval_if_unchanged);
+            return;
+        }
+        // A candidate whose modelled cost is not materially better than the
+        // last rejected candidate's is the same experiment re-proposed —
+        // with tied conjuncts the ranking reshuffles them freely, and an
+        // exact-match memory would re-run the lost trial forever.
+        if let Some(rejected) = self.shared.arbiter.rejected()
+            && expected_cost_per_row(stats, &candidate)
+                >= (1.0 - TIE_COST_FRACTION)
+                    * expected_cost_per_row(stats, &rejected.order)
+        {
+            self.freeze(interval_if_unchanged);
+            return;
+        }
+        // Start the shared trial, or join the one already in progress (its
+        // candidate stands in for ours: proposals are made from the same
+        // shared statistics, and a verdict on any reordering beats racing
+        // experiments against each other).
+        let candidate = self.shared.arbiter.begin_trial(Arrangement {
+            order: candidate,
+            strategy: Strategy::CompactOnce,
+        });
+        self.phase = Phase::Trial {
+            candidate,
+            pending_incumbent: None,
+            interval_if_rejected: interval_if_unchanged,
+        };
+    }
+
+    /// Fold the locally-accumulated observations into the shared registry and
+    /// clear them.
+    fn flush_to_shared(&mut self) {
+        for (id, stats) in self.stats.iter_mut().enumerate() {
+            if stats.sample_count() > 0 || stats.pass_rate().is_some() {
+                self.shared.stats.merge(id, stats);
+                stats.reset();
+            }
+        }
+    }
+
+    /// Copy out the shared stats for every conjunct.
+    fn shared_snapshot(&self) -> Vec<SelectivityStats> {
+        (0..self.conjuncts.len())
+            .map(|id| self.shared.stats.snapshot(id).unwrap_or_default())
+            .collect()
+    }
+
+    /// Install `order` as the incumbent order, rebuilding the fused predicate
+    /// if it changed.
+    fn set_order(&mut self, order: Vec<usize>) {
+        if order != self.order {
+            self.order = order;
+            self.fused = fuse(&self.conjuncts, &self.order);
+        }
+    }
+
+    /// Freeze the incumbent, due to re-measure after `interval` batches.
+    fn freeze(&mut self, interval: u64) {
+        self.phase = Phase::Frozen {
+            thaw_at: self.batches + interval,
+            interval,
+        };
+    }
+}
+
+/// Evaluate `conjuncts` in `order` against `batch` via the compact-once loop,
+/// returning the boolean mask (over the batch's original rows) of rows that
+/// passed every conjunct. With `stats`, each conjunct is additionally timed
+/// and counted on exactly the rows it evaluated (its *marginal* selectivity
+/// and cost on the current working population).
+///
+/// The working batch is physically compacted to the surviving rows only once
+/// the accumulated mask becomes selective enough (see
+/// [`COMPACTION_SELECTIVITY_THRESHOLD`]); until then masks are combined with a
+/// cheap bitwise `AND`, so a run of non-selective conjuncts pays no
+/// materialization cost. Unlike the fused `BinaryExpr` chain, survivors stay
+/// compacted across the remaining conjuncts instead of being re-filtered and
+/// re-scattered at every `AND` level.
+///
+/// The bookkeeping is all deferred so the common shapes cost what the fused
+/// chain would: an all-true mask is dropped without an `AND` merge, the
+/// row-index array mapping survivors back to original rows is only
+/// materialized at the first compaction, and the final scatter only happens
+/// if a compaction occurred (otherwise the accumulated mask already covers
+/// the original rows).
+fn eval_conjuncts(
+    conjuncts: &[Arc<dyn PhysicalExpr>],
+    order: &[usize],
+    batch: &RecordBatch,
+    mut stats: Option<&mut [SelectivityStats]>,
+) -> Result<ArrayRef> {
+    let num_rows = batch.num_rows();
+    if num_rows == 0 {
+        return Ok(Arc::new(BooleanArray::from(Vec::<bool>::new())));
+    }
+
+    // `working` is the batch conjuncts are evaluated against. `acc` is the
+    // accumulated (`AND`-combined, null-free) result over `working`'s rows
+    // since the last compaction; `None` means all of them are still live.
+    // `live` maps `working`'s rows back to original row indices; `None`
+    // until a compaction first drops rows.
+    let mut working = batch.clone();
+    let mut acc: Option<BooleanArray> = None;
+    let mut live: Option<ArrayRef> = None;
+
+    for &id in order {
+        let rows_in = working.num_rows();
+
+        let timer = stats.is_some().then(Instant::now);
+        let array = conjuncts[id].evaluate(&working)?.into_array(rows_in)?;
+        let mask = as_boolean_array(&array)?;
+        // `matched` counts non-null trues (SQL filter semantics).
+        let matched = mask.true_count() as u64;
+
+        if let (Some(stats), Some(timer)) = (stats.as_deref_mut(), timer) {
+            let eval_nanos = timer.elapsed().as_nanos() as u64;
+            let discarded = rows_in as u64 - matched;
+            let sample = if eval_nanos > 0 {
+                discarded as f64 * 1e9 / eval_nanos as f64
+            } else {
+                0.0
+            };
+            stats[id].record(matched, rows_in as u64, eval_nanos, sample);
+        }
+
+        // An all-true mask leaves the accumulated result untouched.
+        if matched == rows_in as u64 && mask.null_count() == 0 {
+            continue;
+        }
+
+        // Fold this conjunct into the accumulated mask (null -> false).
+        let mask = if mask.null_count() > 0 {
+            prep_null_mask_filter(mask)
+        } else {
+            mask.clone()
+        };
+        let folded = match &acc {
+            None => mask,
+            Some(prev) => and(prev, &mask)?,
+        };
+
+        let alive = folded.true_count();
+        if alive == 0 {
+            // Nothing survives; the result is all-false over the original
+            // rows no matter what the remaining conjuncts say.
+            return Ok(Arc::new(BooleanArray::new(
+                BooleanBuffer::new_unset(num_rows),
+                None,
+            )));
+        }
+        // Compact only when the survivors are a small fraction of the
+        // working batch — otherwise the copy is not worth it.
+        if (alive as f64) <= COMPACTION_SELECTIVITY_THRESHOLD * rows_in as f64 
{
+            working = filter_record_batch(&working, &folded)?;
+            let indices = live.take().unwrap_or_else(|| {
+                Arc::new(UInt32Array::from_iter_values(0..num_rows as u32))
+            });
+            live = Some(filter(&indices, &folded)?);
+            acc = None;
+        } else {
+            acc = Some(folded);
+        }
+    }
+
+    match live {
+        // Never compacted: `acc` (or all-true) already covers the
+        // original rows.
+        None => Ok(match acc {
+            Some(acc) => Arc::new(acc),
+            None => 
Arc::new(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)),
+        }),
+        // Compacted at least once: scatter the surviving original indices
+        // (`live`, narrowed by any residual `acc`) into a full-length mask.
+        Some(indices) => {
+            let indices = match acc {
+                Some(acc) => filter(&indices, &acc)?,
+                None => indices,
+            };
+            let indices = indices
+                .as_any()
+                .downcast_ref::<UInt32Array>()
+                .expect("u32 live");

Review Comment:
   can we avoid the expect by 
   ```rust
   let Some(indices) = indices.as_any().downcast_ref::<UInt32Array>() else {
       return internal_err!("expected UInt32Array for adaptive filter live row 
indices");
   };
   ```
   ?



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -98,6 +99,10 @@ pub struct FilterExec {
     batch_size: usize,
     /// Number of rows to fetch
     fetch: Option<usize>,
+    /// Measurements and trial verdicts shared by all partition streams,
+    /// used by adaptive conjunct reordering (see [`AdaptiveConjunction`]) so
+    /// the streams learn as one. Fresh per plan node; never affects the plan.
+    adaptive_stats: Arc<AdaptiveFilterShared>,

Review Comment:
   it is stored on the `FilterExec` plan node and is reused by every future 
`execute` call on that plan. That leaks adaptive learning across independent 
executions.



##########
datafusion/physical-plan/src/adaptive_filter.rs:
##########
@@ -0,0 +1,1841 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Runtime-adaptive evaluation of a conjunctive (`AND`) predicate for
+//! [`FilterExec`](crate::filter::FilterExec).
+//!
+//! This is the *ordering* policy that sits on top of the shared measurement
+//! substrate in
+//! [`datafusion_physical_expr_common::adaptive`]. The substrate
+//! ([`AdaptiveStatsRegistry`]) measures per-conjunct selectivity and cost; 
this
+//! module decides, from those measurements, what order to evaluate the
+//! conjuncts in.
+//!
+//! ## How it evaluates
+//!
+//! The conjuncts are evaluated sequentially, combining their boolean results
+//! with `AND`. The working batch is physically compacted to the surviving rows
+//! only once the accumulated mask becomes selective enough (the same
+//! pre-selection gate [`BinaryExpr`]'s `AND` short-circuit uses) — so a run of
+//! non-selective conjuncts costs only cheap bitwise `AND`s, while a selective
+//! conjunct shrinks the batch the expensive conjuncts after it must decode.
+//! Gating is what makes ordering matter: a conjunct that compacts early saves
+//! every later conjunct work, so the cheap-and-selective ones should run 
first.
+//! Each conjunct is timed and counted on exactly the rows it evaluated, giving
+//! the *marginal* selectivity and cost on the current working population.
+//!
+//! ## How it reorders: the model proposes, a trial disposes
+//!
+//! Every conjunct accrues a per-batch *effectiveness* sample of **rows
+//! discarded per second** (`(total - matched) * 1e9 / eval_nanos`). Maximising
+//! discards-per-second is exactly minimising `cost_per_row / (1 - pass_rate)`,
+//! the classic optimal ordering key for independent conjuncts — so an 
expensive
+//! but very selective predicate (e.g. a `LIKE` that keeps one row) correctly
+//! sorts ahead of a cheap but unselective one.
+//!
+//! Crucially, this ranking is only ever a *proposal*. The classic key is
+//! built on assumptions real execution violates: it prices a conjunct's work
+//! as `rows × cost_per_row` even though vectorised kernels do not scale down
+//! without paying for compaction; and the marginal cost-per-row measured on a
+//! handful of survivor rows says little about cost at the front of the order.
+//! An order that looks optimal on paper can run slower than the one the query
+//! author wrote. So a proposed order is never adopted on the model's word: it
+//! is put through an **A/B trial** — batches alternate between the incumbent
+//! and the candidate arrangement, each timed end-to-end — and the candidate
+//! wins only if its measured ns/row beats the incumbent's with statistical
+//! confidence. Ties favour the incumbent; a rejected candidate is remembered
+//! and not re-tried. Adopted orders run via the compact-once loop (the
+//! arrangement the trial actually measured), not re-fused into a chain with
+//! different cost behaviour.
+//!
+//! Proposals themselves are debounced: the learner only settles once every
+//! adjacent pair of the ranking is either statistically certain or a tie
+//! whose order provably cannot matter, and a proposal that does not promise a
+//! material improvement over the incumbent freezes the incumbent unchanged —
+//! interchangeable conjuncts never trigger an experiment at all.
+//!
+//! To stay correct under distribution drift, a frozen evaluator periodically
+//! *re-thaws*: it re-measures a short window and re-proposes. Each re-thaw
+//! that keeps the incumbent backs the next one off exponentially, so a stable
+//! filter is re-checked geometrically less often and steady-state overhead
+//! decays toward zero; a candidate that wins its trial resets the interval so
+//! real drift is caught quickly.
+//!
+//! All partition streams of a `FilterExec` share their measurements and trial
+//! verdicts, so the learning cost is paid once per query, not once per
+//! stream, and a stream that starts late adopts the validated champion on its
+//! first batch.
+//!
+//! ## Known limitation
+//!
+//! The measured selectivity of a conjunct is *conditional* on the conjuncts
+//! ordered before it (it only sees their survivors), so with strongly
+//! correlated predicates the *proposal* can be a local optimum — the trial
+//! protects against adopting a bad one, but a better unexplored order may be
+//! missed. A proper exploration phase (measuring each conjunct's marginal
+//! selectivity on a common population) is future work.
+//!
+//! [`BinaryExpr`]: datafusion_physical_expr::expressions::BinaryExpr
+
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, 
UInt32Array};
+use arrow::buffer::BooleanBuffer;
+use arrow::compute::kernels::boolean::and;
+use arrow::compute::{filter, filter_record_batch, prep_null_mask_filter};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::instant::Instant;
+use datafusion_expr::Operator;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::utils::split_conjunction;
+use datafusion_physical_expr_common::adaptive::{
+    AdaptiveArbiter, AdaptiveStatsRegistry, SelectivityStats, TrialUpdate,
+};
+use datafusion_physical_expr_common::physical_expr::is_volatile;
+use log::debug;
+
+/// Confidence multiplier for the one-sided interval on effectiveness
+/// (~97.5% one-sided at 2.0).
+const CONFIDENCE_Z: f64 = 2.0;
+/// Minimum per-conjunct samples before the confidence intervals are trusted
+/// enough to make a freeze decision.
+const MIN_SAMPLES_FOR_CI: u64 = 4;
+/// Freeze after this many learning samples even if the order's confidence
+/// intervals still overlap: if the conjuncts cannot be told apart by now, 
their
+/// relative order does not matter, so stop paying to measure it.
+const MAX_LEARNING_SAMPLES: u64 = 64;
+/// Batches the fast frozen path runs before the first re-thaw check.
+const INITIAL_THAW_INTERVAL: u64 = 64;
+/// Each re-thaw that confirms the order is unchanged multiplies the next
+/// interval by this factor (exponential backoff), so a stable filter is
+/// re-checked geometrically less often and steady-state overhead decays to ~0.
+const THAW_BACKOFF: u64 = 4;
+/// Upper bound on the re-thaw interval.
+const MAX_THAW_INTERVAL: u64 = 16_384;
+/// Batches measured during a re-thaw before re-deciding the order.
+const REMEASURE_WINDOW: u64 = 16;
+/// Measured batches at the start of each measuring phase (learning and
+/// re-measure windows) that run back-to-back, so freeze decisions are
+/// reachable quickly even on short streams.
+const MEASURE_WARMUP: u64 = 8;
+/// After the warmup, measure one batch in this many; the rest evaluate the
+/// fused predicate in the current order with no instrumentation. Bounds the
+/// steady measurement overhead even when the order never resolves.
+const MEASURE_STRIDE: u64 = 8;
+/// Fraction of the conjunction's expected total cost below which a difference
+/// is immaterial. Adjacent conjuncts whose swap could not change the expected
+/// cost by more than this are a *tie*: their relative order does not matter,
+/// so an unresolved tie neither delays freezing nor counts as drift when a
+/// re-thaw re-ranks them.
+const TIE_COST_FRACTION: f64 = 0.05;
+/// Physically compact the working batch to the surviving rows only when the
+/// accumulated mask keeps at most this fraction of them. Above this, the cost
+/// of materializing a barely-smaller batch is not repaid, so we keep 
evaluating
+/// against the full working batch and just AND the boolean masks — mirroring
+/// the pre-selection gate in `BinaryExpr`'s `AND` short-circuit.
+const COMPACTION_SELECTIVITY_THRESHOLD: f64 = 0.2;
+/// Paired samples before a trial verdict. The challenger must beat the
+/// incumbent with confidence within this many pairs or it is rejected (ties
+/// favour the incumbent).
+/// How an arrangement evaluates its conjuncts.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum Strategy {
+    /// A single left-deep `AND` of [`BinaryExpr`]s — byte-for-byte what
+    /// `FilterExec` runs when the feature is off. The incumbent starts here so
+    /// that doing nothing costs nothing.
+    Fused,
+    /// The compact-once loop ([`eval_conjuncts`]): masks are `AND`-combined
+    /// and the working batch is physically compacted at most when the
+    /// survivors drop below the pre-selection threshold, instead of
+    /// re-filtering and re-scattering at every `AND` level like the fused
+    /// chain does. Orders adopted by a trial run this way, since this is the
+    /// strategy the trial measured.
+    CompactOnce,
+}
+
+/// The executable arrangement an adaptive `FilterExec` decides between: an
+/// evaluation order plus the strategy that runs it. This is the type the
+/// shared [`AdaptiveArbiter`] coordinates trials over; the arbiter itself is
+/// arrangement-agnostic (a parquet scan would put filter *placement* here
+/// instead).
+#[derive(Debug, Clone, PartialEq)]
+struct Arrangement {
+    /// Evaluation order: indices into the conjunct list.
+    order: Vec<usize>,
+    /// How the order is executed.
+    strategy: Strategy,
+}
+
+/// Lifecycle of the adaptive evaluator.
+#[derive(Debug)]
+enum Phase {
+    /// Measuring batches (warmup, then strided), building confidence in the
+    /// per-conjunct statistics that *propose* a candidate order.
+    Learning,
+    /// Participating in the shared A/B trial (see [`AdaptiveArbiter`]):
+    /// alternating arms on consecutive batches and submitting paired
+    /// end-to-end timings. Only a measured win changes the order — the
+    /// per-conjunct cost model proposes, but never decides.
+    Trial {
+        /// The candidate arrangement, copied out of the shared trial so the
+        /// hot path does not lock to learn what to evaluate.
+        candidate: Arrangement,
+        /// ns/row of the incumbent leg of the current pair, once run.
+        pending_incumbent: Option<f64>,
+        /// Freeze interval to use if the candidate is rejected (carries this
+        /// stream's re-thaw backoff); an adopted candidate resets it.
+        interval_if_rejected: u64,
+    },
+    /// Order settled; the incumbent arrangement is evaluated with no
+    /// measurement until `thaw_at` batches have been processed.
+    Frozen {
+        /// Batch count at which to re-measure.
+        thaw_at: u64,
+        /// Interval that produced `thaw_at`; grows on each confirmation.
+        interval: u64,
+    },
+    /// Briefly measuring again after a thaw, to detect distribution drift.
+    Remeasuring {
+        /// Measured batches left in the re-measurement window.
+        window_left: u64,
+        /// The frozen interval before this thaw (for backoff bookkeeping).
+        interval: u64,
+    },
+}
+
+/// State shared by every partition stream of one `FilterExec`: the
+/// per-conjunct measurement registry plus the arrangement arbiter, so the
+/// streams learn as one and one stream's completed trial serves all of them.
+#[derive(Debug, Default)]
+pub(crate) struct AdaptiveFilterShared {
+    /// Per-conjunct selectivity/cost stats, keyed by conjunct index.
+    stats: AdaptiveStatsRegistry,
+    /// Champion/challenger coordination over [`Arrangement`]s.
+    arbiter: AdaptiveArbiter<Arrangement>,
+}
+
+impl AdaptiveFilterShared {
+    pub(crate) fn new() -> Self {
+        Self::default()
+    }
+}
+
+/// Adaptive evaluator for a single conjunctive predicate.
+///
+/// Owned per partition stream (single-threaded), so all state — order, stats,
+/// and lifecycle [`Phase`] — is held directly and mutated through `&mut self`
+/// with no locking. The stats are a plain `Vec` indexed by conjunct id (ids 
are
+/// dense `0..n`), so the per-batch hot path is a direct index, not a locked 
map
+/// lookup.
+///
+/// What *is* shared is the measurements and the trial verdicts: per-batch
+/// observations accumulate in the stream-local `stats` (lock-free) and are
+/// folded into the [`AdaptiveFilterShared`] registry whenever this stream
+/// makes a decision, so N streams converge ~N× faster; and the first stream
+/// whose trial validates (or rejects) a candidate publishes the verdict so the
+/// others adopt it without re-running the experiment. Drift decisions
+/// (re-thaw windows) deliberately use only the local fresh window — the shared
+/// accumulator is a long-run prior and would dilute a distribution change.
+#[derive(Debug)]
+pub(crate) struct AdaptiveConjunction {
+    /// The conjuncts. Stats/order indices refer to positions in this `Vec`.
+    conjuncts: Vec<Arc<dyn PhysicalExpr>>,
+    /// Per-conjunct observations accumulated locally since the last flush to
+    /// [`shared`](Self::shared), indexed by conjunct id.
+    stats: Vec<SelectivityStats>,
+    /// Measurements and verdicts shared by every partition stream of the same
+    /// `FilterExec`.
+    shared: Arc<AdaptiveFilterShared>,
+    /// Champion epoch this stream has caught up to (see
+    /// [`AdaptiveArbiter::epoch`]).
+    epoch_seen: u64,
+    /// Incumbent evaluation order: indices into 
[`conjuncts`](Self::conjuncts).
+    order: Vec<usize>,
+    /// Incumbent evaluation strategy. Starts as [`Strategy::Fused`] over the
+    /// written order (identical to the feature being off); becomes
+    /// [`Strategy::CompactOnce`] when a trial adopts a new arrangement.
+    strategy: Strategy,
+    /// The conjuncts fused into a left-deep `AND` in [`order`](Self::order),
+    /// used by [`Strategy::Fused`] and by unmeasured learning batches.
+    /// Rebuilt when the order changes.
+    fused: Arc<dyn PhysicalExpr>,
+    /// Total batches processed; drives the re-thaw schedule.
+    batches: u64,
+    /// Measured batches in the current measuring phase; drives the
+    /// warmup-then-stride measurement schedule.
+    measured: u64,
+    /// Current lifecycle phase.
+    phase: Phase,
+}
+
+impl AdaptiveConjunction {
+    /// Build an adaptive evaluator for `predicate`, or `None` if adaptive
+    /// reordering does not apply:
+    ///
+    /// - `enabled` is false (the config flag is off);
+    /// - the predicate has fewer than two `AND` conjuncts (nothing to 
reorder);
+    /// - any conjunct is volatile (reordering could change results).
+    ///
+    /// `shared` is the state common to all partition streams of the owning
+    /// `FilterExec`; the conjuncts are registered in it under their indices.
+    pub(crate) fn try_new(
+        predicate: &Arc<dyn PhysicalExpr>,
+        enabled: bool,
+        shared: Arc<AdaptiveFilterShared>,
+    ) -> Option<Self> {
+        if !enabled {
+            return None;
+        }
+        let conjuncts: Vec<Arc<dyn PhysicalExpr>> = 
split_conjunction(predicate)
+            .into_iter()
+            .map(Arc::clone)
+            .collect();
+        if conjuncts.len() < 2 {
+            return None;
+        }
+        if conjuncts.iter().any(is_volatile) {
+            return None;
+        }
+
+        shared
+            .stats
+            .register_all((0..conjuncts.len()).map(|id| (id, false)));
+        let stats = vec![SelectivityStats::new(false); conjuncts.len()];
+        let order: Vec<usize> = (0..conjuncts.len()).collect();
+        let fused = fuse(&conjuncts, &order);
+
+        Some(Self {
+            conjuncts,
+            stats,
+            shared,
+            epoch_seen: 0,
+            order,
+            strategy: Strategy::Fused,
+            fused,
+            batches: 0,
+            measured: 0,
+            phase: Phase::Learning,
+        })
+    }
+
+    /// Evaluate the conjunction against `batch`, returning the boolean mask
+    /// (over the batch's original rows) of rows that passed every conjunct.
+    ///
+    /// While [`Learning`](Phase::Learning) or 
[`Remeasuring`](Phase::Remeasuring),
+    /// a warmup-then-strided subset of batches is evaluated and measured
+    /// per-conjunct (see [`evaluate_measured`](Self::evaluate_measured));
+    /// [`Trial`](Phase::Trial) batches alternate between the incumbent and the
+    /// candidate arrangement, timed end-to-end; all other batches — including
+    /// the whole [`Frozen`](Phase::Frozen) phase — evaluate the incumbent with
+    /// no instrumentation.
+    pub(crate) fn evaluate(&mut self, batch: &RecordBatch) -> Result<ArrayRef> 
{
+        self.batches += 1;
+
+        // Adopt a champion another stream's trial has validated since we last
+        // looked: a relaxed atomic load per batch, a lock only on change.
+        if self.shared.arbiter.epoch() != self.epoch_seen {
+            self.adopt_shared_champion();
+        }
+
+        // Frozen fast path; when the interval elapses, drop into a fresh
+        // measurement window to check whether the data has drifted.
+        if let Phase::Frozen { thaw_at, interval } = self.phase {
+            if self.batches < thaw_at {
+                return self.evaluate_incumbent(batch);
+            }
+            self.stats.iter_mut().for_each(SelectivityStats::reset);
+            self.measured = 0;
+            self.phase = Phase::Remeasuring {
+                window_left: REMEASURE_WINDOW,
+                interval,
+            };
+        }
+
+        if matches!(self.phase, Phase::Trial { .. }) {
+            return self.evaluate_trial(batch);
+        }
+
+        // Learning or re-measuring: measure a warmup of consecutive batches,
+        // then only one batch per stride, so an order that stays unresolved
+        // costs a bounded fraction of the stream rather than all of it.
+        if self.measured >= MEASURE_WARMUP && 
!self.batches.is_multiple_of(MEASURE_STRIDE)
+        {
+            return self.evaluate_incumbent(batch);
+        }
+
+        self.measured += 1;
+        let result = self.evaluate_measured(batch)?;
+        self.update_phase();
+        Ok(result)
+    }
+
+    /// Evaluate the incumbent arrangement with no instrumentation.
+    fn evaluate_incumbent(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        match self.strategy {
+            Strategy::Fused => 
self.fused.evaluate(batch)?.into_array(batch.num_rows()),
+            Strategy::CompactOnce => {
+                eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+            }
+        }
+    }
+
+    /// Install the trial-validated champion published by another stream and
+    /// freeze on it: the experiment has already been run, re-running it here
+    /// would only repeat the cost.
+    fn adopt_shared_champion(&mut self) {
+        let (champion, epoch) = self.shared.arbiter.champion();
+        self.epoch_seen = epoch;
+        if let Some(champion) = champion {
+            self.set_order(champion.order);
+            self.strategy = champion.strategy;
+            self.freeze(INITIAL_THAW_INTERVAL);
+        }
+    }
+
+    /// Contribute one batch to the shared A/B trial. Each stream runs the
+    /// incumbent and the candidate on consecutive batches, timing both, and
+    /// submits the pair's `ln(candidate / incumbent)` ns/row ratio; once
+    /// enough pairs accumulate (across all streams) the trial concludes. The
+    /// candidate is adopted only if the mean log-ratio is below zero with
+    /// confidence; ties favour the incumbent.
+    fn evaluate_trial(&mut self, batch: &RecordBatch) -> Result<ArrayRef> {
+        let Phase::Trial {
+            candidate,
+            pending_incumbent,
+            interval_if_rejected,
+        } = &mut self.phase
+        else {
+            unreachable!("caller checked the phase");
+        };
+        let interval_if_rejected = *interval_if_rejected;
+        let rows = batch.num_rows();
+
+        // Incumbent leg first; no locking on either leg.
+        let Some(incumbent_ns) = *pending_incumbent else {
+            let timer = Instant::now();
+            let result = match self.strategy {
+                Strategy::Fused => {
+                    self.fused.evaluate(batch)?.into_array(batch.num_rows())
+                }
+                Strategy::CompactOnce => {
+                    eval_conjuncts(&self.conjuncts, &self.order, batch, None)
+                }
+            }?;
+            let nanos = timer.elapsed().as_nanos() as u64;
+            if rows > 0
+                && nanos > 0
+                && let Phase::Trial {
+                    pending_incumbent, ..
+                } = &mut self.phase
+            {
+                *pending_incumbent = Some(nanos as f64 / rows as f64);
+            }
+            return Ok(result);
+        };
+
+        // Candidate leg; completes the pair.
+        let candidate = std::mem::replace(
+            candidate,
+            Arrangement {
+                order: Vec::new(),
+                strategy: Strategy::CompactOnce,
+            },
+        );
+        let timer = Instant::now();
+        let result = eval_conjuncts(&self.conjuncts, &candidate.order, batch, 
None)?;
+        let nanos = timer.elapsed().as_nanos() as u64;
+        let sample = (rows > 0 && nanos > 0)
+            .then(|| (nanos as f64 / rows as f64 / incumbent_ns).ln())
+            .filter(|s| s.is_finite());
+
+        let Some(sample) = sample else {
+            // Not a usable pair; start the next one.
+            self.phase = Phase::Trial {
+                candidate,
+                pending_incumbent: None,
+                interval_if_rejected,
+            };
+            return Ok(result);
+        };
+        match self
+            .shared
+            .arbiter
+            .submit_pair(&candidate, rows as u64, nanos, sample)
+        {
+            TrialUpdate::Running => {
+                self.phase = Phase::Trial {
+                    candidate,
+                    pending_incumbent: None,
+                    interval_if_rejected,
+                };
+            }
+            TrialUpdate::Adopted(champion) => {
+                debug!("adaptive filter trial adopted {champion:?}");
+                self.epoch_seen = self.shared.arbiter.epoch();
+                self.set_order(champion.order);
+                self.strategy = champion.strategy;
+                self.freeze(INITIAL_THAW_INTERVAL);
+            }
+            // Rejected here, or concluded on another stream (an adoption
+            // there is handled by the epoch check): the incumbent stands.
+            TrialUpdate::Rejected(rejected) => {
+                debug!("adaptive filter trial rejected {rejected:?}");
+                self.freeze(interval_if_rejected);
+            }
+            TrialUpdate::Superseded => self.freeze(interval_if_rejected),
+        }
+        Ok(result)
+    }
+
+    /// Evaluate the conjuncts in the incumbent order via the compact-once
+    /// loop, recording each conjunct's marginal selectivity and cost into the
+    /// stream-local stats.
+    fn evaluate_measured(&mut self, batch: &RecordBatch) -> Result<ArrayRef> {
+        eval_conjuncts(&self.conjuncts, &self.order, batch, Some(&mut 
self.stats))
+    }
+
+    /// Advance the lifecycle phase after measuring a batch:
+    /// - [`Learning`](Phase::Learning) folds the local observations into the
+    ///   shared registry and proposes a candidate once the *shared* stats
+    ///   settle the ranking (see [`settled_order`]), so all partition streams
+    ///   learn as one;
+    /// - [`Remeasuring`](Phase::Remeasuring) proposes from the *local* fresh
+    ///   window only when its window ends (the shared accumulator is a
+    ///   long-run prior that would dilute a distribution change), then folds
+    ///   the window into the shared registry.
+    ///
+    /// Either way the proposal goes through [`decide`](Self::decide): a
+    /// proposal that does not promise a material improvement freezes the
+    /// incumbent unchanged, and one that does is handed to an A/B
+    /// [`Trial`](Phase::Trial) rather than adopted on the model's word.
+    fn update_phase(&mut self) {
+        match std::mem::replace(&mut self.phase, Phase::Learning) {
+            Phase::Learning => {
+                self.flush_to_shared();
+                let stats = self.shared_snapshot();
+                if let Some(order) = settled_order(&stats) {
+                    self.decide(order, &stats, INITIAL_THAW_INTERVAL);
+                }
+                // else: stay in `Learning` (already restored by the replace).
+            }
+            Phase::Remeasuring {
+                window_left,
+                interval,
+            } => {
+                let window_left = window_left - 1;
+                if window_left == 0 {
+                    let candidate = rank_by_effectiveness(&self.stats);
+                    let stats = std::mem::take(&mut self.stats);
+                    let backoff =
+                        
interval.saturating_mul(THAW_BACKOFF).min(MAX_THAW_INTERVAL);
+                    self.decide(candidate, &stats, backoff);
+                    self.stats = stats;
+                    self.flush_to_shared();
+                } else {
+                    self.phase = Phase::Remeasuring {
+                        window_left,
+                        interval,
+                    };
+                }
+            }
+            // Not reachable: frozen and trial batches never take the measured
+            // path. Restore the phase.
+            other => self.phase = other,
+        }
+    }
+
+    /// Act on a proposed `candidate` order, judged against `stats`:
+    ///
+    /// - the incumbent's order (or an immaterial reshuffle of it, per
+    ///   [`TIE_COST_FRACTION`]) freezes the incumbent for
+    ///   `interval_if_unchanged` batches — ties never trigger an experiment;
+    /// - a candidate the shared verdicts already rejected does the same — the
+    ///   experiment has been run and lost;
+    /// - otherwise the candidate enters an A/B [`Trial`](Phase::Trial). The
+    ///   per-conjunct stats only ever *propose*; adoption requires the trial's
+    ///   measured end-to-end win.
+    fn decide(
+        &mut self,
+        candidate: Vec<usize>,
+        stats: &[SelectivityStats],
+        interval_if_unchanged: u64,
+    ) {
+        let materially_better = expected_cost_per_row(stats, &candidate)
+            < (1.0 - TIE_COST_FRACTION) * expected_cost_per_row(stats, 
&self.order);
+        if candidate == self.order || !materially_better {
+            self.freeze(interval_if_unchanged);
+            return;
+        }
+        // A candidate whose modelled cost is not materially better than the
+        // last rejected candidate's is the same experiment re-proposed —
+        // with tied conjuncts the ranking reshuffles them freely, and an
+        // exact-match memory would re-run the lost trial forever.
+        if let Some(rejected) = self.shared.arbiter.rejected()
+            && expected_cost_per_row(stats, &candidate)
+                >= (1.0 - TIE_COST_FRACTION)
+                    * expected_cost_per_row(stats, &rejected.order)
+        {
+            self.freeze(interval_if_unchanged);
+            return;
+        }
+        // Start the shared trial, or join the one already in progress (its
+        // candidate stands in for ours: proposals are made from the same
+        // shared statistics, and a verdict on any reordering beats racing
+        // experiments against each other).
+        let candidate = self.shared.arbiter.begin_trial(Arrangement {
+            order: candidate,
+            strategy: Strategy::CompactOnce,
+        });
+        self.phase = Phase::Trial {
+            candidate,
+            pending_incumbent: None,
+            interval_if_rejected: interval_if_unchanged,
+        };
+    }
+
+    /// Fold the locally-accumulated observations into the shared registry and
+    /// clear them.
+    fn flush_to_shared(&mut self) {
+        for (id, stats) in self.stats.iter_mut().enumerate() {
+            if stats.sample_count() > 0 || stats.pass_rate().is_some() {
+                self.shared.stats.merge(id, stats);
+                stats.reset();
+            }
+        }
+    }
+
+    /// Copy out the shared stats for every conjunct.
+    fn shared_snapshot(&self) -> Vec<SelectivityStats> {
+        (0..self.conjuncts.len())
+            .map(|id| self.shared.stats.snapshot(id).unwrap_or_default())
+            .collect()
+    }
+
+    /// Install `order` as the incumbent order, rebuilding the fused predicate
+    /// if it changed.
+    fn set_order(&mut self, order: Vec<usize>) {
+        if order != self.order {
+            self.order = order;
+            self.fused = fuse(&self.conjuncts, &self.order);
+        }
+    }
+
+    /// Freeze the incumbent, due to re-measure after `interval` batches.
+    fn freeze(&mut self, interval: u64) {
+        self.phase = Phase::Frozen {
+            thaw_at: self.batches + interval,
+            interval,
+        };
+    }
+}
+
+/// Evaluate `conjuncts` in `order` against `batch` via the compact-once loop,
+/// returning the boolean mask (over the batch's original rows) of rows that
+/// passed every conjunct. With `stats`, each conjunct is additionally timed
+/// and counted on exactly the rows it evaluated (its *marginal* selectivity
+/// and cost on the current working population).
+///
+/// The working batch is physically compacted to the surviving rows only once
+/// the accumulated mask becomes selective enough (see
+/// [`COMPACTION_SELECTIVITY_THRESHOLD`]); until then masks are combined with a
+/// cheap bitwise `AND`, so a run of non-selective conjuncts pays no
+/// materialization cost. Unlike the fused `BinaryExpr` chain, survivors stay
+/// compacted across the remaining conjuncts instead of being re-filtered and
+/// re-scattered at every `AND` level.
+///
+/// The bookkeeping is all deferred so the common shapes cost what the fused
+/// chain would: an all-true mask is dropped without an `AND` merge, the
+/// row-index array mapping survivors back to original rows is only
+/// materialized at the first compaction, and the final scatter only happens
+/// if a compaction occurred (otherwise the accumulated mask already covers
+/// the original rows).
+fn eval_conjuncts(
+    conjuncts: &[Arc<dyn PhysicalExpr>],
+    order: &[usize],
+    batch: &RecordBatch,
+    mut stats: Option<&mut [SelectivityStats]>,
+) -> Result<ArrayRef> {
+    let num_rows = batch.num_rows();
+    if num_rows == 0 {
+        return Ok(Arc::new(BooleanArray::from(Vec::<bool>::new())));
+    }
+
+    // `working` is the batch conjuncts are evaluated against. `acc` is the
+    // accumulated (`AND`-combined, null-free) result over `working`'s rows
+    // since the last compaction; `None` means all of them are still live.
+    // `live` maps `working`'s rows back to original row indices; `None`
+    // until a compaction first drops rows.
+    let mut working = batch.clone();
+    let mut acc: Option<BooleanArray> = None;
+    let mut live: Option<ArrayRef> = None;
+
+    for &id in order {
+        let rows_in = working.num_rows();
+
+        let timer = stats.is_some().then(Instant::now);
+        let array = conjuncts[id].evaluate(&working)?.into_array(rows_in)?;
+        let mask = as_boolean_array(&array)?;
+        // `matched` counts non-null trues (SQL filter semantics).
+        let matched = mask.true_count() as u64;
+
+        if let (Some(stats), Some(timer)) = (stats.as_deref_mut(), timer) {
+            let eval_nanos = timer.elapsed().as_nanos() as u64;
+            let discarded = rows_in as u64 - matched;
+            let sample = if eval_nanos > 0 {
+                discarded as f64 * 1e9 / eval_nanos as f64
+            } else {
+                0.0
+            };
+            stats[id].record(matched, rows_in as u64, eval_nanos, sample);
+        }
+
+        // An all-true mask leaves the accumulated result untouched.
+        if matched == rows_in as u64 && mask.null_count() == 0 {
+            continue;
+        }
+
+        // Fold this conjunct into the accumulated mask (null -> false).
+        let mask = if mask.null_count() > 0 {
+            prep_null_mask_filter(mask)
+        } else {
+            mask.clone()
+        };
+        let folded = match &acc {
+            None => mask,
+            Some(prev) => and(prev, &mask)?,
+        };
+
+        let alive = folded.true_count();
+        if alive == 0 {
+            // Nothing survives; the result is all-false over the original
+            // rows no matter what the remaining conjuncts say.
+            return Ok(Arc::new(BooleanArray::new(
+                BooleanBuffer::new_unset(num_rows),
+                None,
+            )));
+        }
+        // Compact only when the survivors are a small fraction of the
+        // working batch — otherwise the copy is not worth it.
+        if (alive as f64) <= COMPACTION_SELECTIVITY_THRESHOLD * rows_in as f64 
{
+            working = filter_record_batch(&working, &folded)?;
+            let indices = live.take().unwrap_or_else(|| {

Review Comment:
   the live-row index array is `UInt32Array`, built from `0..num_rows as u32`. 
Usually, it's not an issue, but under extreme situations: that silently 
truncates if a batch ever exceeds `u32::MAX` rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to