This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 97a74521d feat: add remote hash policy as not used (#1526)
97a74521d is described below

commit 97a74521de39568d695e4ccab2dfdb50a79e9feb
Author: Saj <[email protected]>
AuthorDate: Fri Mar 27 17:27:50 2026 +0000

    feat: add remote hash policy as not used (#1526)
---
 ballista/core/src/consistent_hash/mod.rs        | 343 ---------------------
 ballista/core/src/consistent_hash/node.rs       |  27 --
 ballista/core/src/lib.rs                        |   2 -
 ballista/scheduler/src/cluster/memory.rs        |  93 +-----
 ballista/scheduler/src/cluster/mod.rs           | 384 +-----------------------
 ballista/scheduler/src/config.rs                |  24 --
 ballista/scheduler/src/scheduler_server/grpc.rs |   5 -
 docs/source/user-guide/configs.md               |   2 +-
 8 files changed, 19 insertions(+), 861 deletions(-)

diff --git a/ballista/core/src/consistent_hash/mod.rs 
b/ballista/core/src/consistent_hash/mod.rs
deleted file mode 100644
index 3be7be872..000000000
--- a/ballista/core/src/consistent_hash/mod.rs
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Consistent hashing implementation for distributing data across nodes.
-
-use crate::consistent_hash::node::Node;
-use md5::{Digest, Md5};
-use std::collections::{BTreeMap, HashMap};
-
-/// Node trait and implementations for consistent hashing.
-pub mod node;
-
-/// Function type for computing hash values from byte slices.
-pub type HashFunction = fn(&[u8]) -> Vec<u8>;
-
-/// A consistent hash ring for distributing keys across nodes.
-///
-/// Uses virtual nodes (replicas) to ensure even distribution of keys
-/// and provides tolerance for node failures.
-pub struct ConsistentHash<N>
-where
-    N: Node,
-{
-    virtual_nodes: BTreeMap<Vec<u8>, String>,
-    node_replicas: HashMap<String, (N, usize)>,
-    hash_func: HashFunction,
-}
-
-impl<N> ConsistentHash<N>
-where
-    N: Node,
-{
-    /// Creates a new consistent hash ring with the given nodes and replica 
counts.
-    ///
-    /// Uses MD5 as the default hash function.
-    pub fn new(node_replicas: Vec<(N, usize)>) -> Self {
-        let consistent_hash = Self {
-            virtual_nodes: BTreeMap::new(),
-            node_replicas: HashMap::new(),
-            hash_func: md5_hash,
-        };
-        consistent_hash.init(node_replicas)
-    }
-
-    /// Creates a new consistent hash ring with a custom hash function.
-    pub fn new_with_hash(
-        node_replicas: Vec<(N, usize)>,
-        hash_func: HashFunction,
-    ) -> Self {
-        let consistent_hash = Self {
-            virtual_nodes: BTreeMap::new(),
-            node_replicas: HashMap::new(),
-            hash_func,
-        };
-        consistent_hash.init(node_replicas)
-    }
-
-    fn init(mut self, node_replicas: Vec<(N, usize)>) -> Self {
-        node_replicas.into_iter().for_each(|(node, num_replicas)| {
-            self.add(node, num_replicas);
-        });
-        self
-    }
-
-    /// Returns references to all nodes in the hash ring.
-    pub fn nodes(&self) -> Vec<&N> {
-        self.node_replicas
-            .values()
-            .map(|(node, _)| node)
-            .collect::<Vec<_>>()
-    }
-
-    /// Returns mutable references to all nodes in the hash ring.
-    pub fn nodes_mut(&mut self) -> Vec<&mut N> {
-        self.node_replicas
-            .values_mut()
-            .map(|(node, _)| node)
-            .collect::<Vec<_>>()
-    }
-
-    /// Adds a node to the hash ring with the specified number of virtual 
replicas.
-    pub fn add(&mut self, node: N, num_replicas: usize) {
-        // Remove existing ones
-        self.remove(node.name());
-
-        for i in 0..num_replicas {
-            let vnode_id = format!("{}:{i}", node.name());
-            let vnode_key = (self.hash_func)(vnode_id.as_bytes());
-            self.virtual_nodes
-                .insert(vnode_key, node.name().to_string());
-        }
-        self.node_replicas
-            .insert(node.name().to_string(), (node, num_replicas));
-    }
-
-    /// Removes a node from the hash ring by name, returning the node and 
replica count if found.
-    pub fn remove(&mut self, node_name: &str) -> Option<(N, usize)> {
-        if let Some((node, num_replicas)) = 
self.node_replicas.remove(node_name) {
-            for i in 0..num_replicas {
-                let vnode_id = format!("{node_name}:{i}");
-                let vnode_key = (self.hash_func)(vnode_id.as_bytes());
-                self.virtual_nodes.remove(vnode_key.as_slice());
-            }
-            Some((node, num_replicas))
-        } else {
-            None
-        }
-    }
-
-    /// Gets the node responsible for the given key.
-    pub fn get(&self, key: &[u8]) -> Option<&N> {
-        self.get_with_tolerance(key, 0)
-    }
-
-    /// Gets the node responsible for the given key, skipping up to 
`tolerance` invalid nodes.
-    pub fn get_with_tolerance(&self, key: &[u8], tolerance: usize) -> 
Option<&N> {
-        self.get_position_key(key, tolerance)
-            .and_then(move |position_key| {
-                self.virtual_nodes
-                    .get(&position_key)
-                    .map(|node_name| 
&(self.node_replicas.get(node_name).unwrap().0))
-            })
-    }
-
-    /// Gets a mutable reference to the node responsible for the given key.
-    pub fn get_mut(&mut self, key: &[u8]) -> Option<&mut N> {
-        self.get_mut_with_tolerance(key, 0)
-    }
-
-    /// Gets a mutable reference to the node for the given key, with failure 
tolerance.
-    pub fn get_mut_with_tolerance(
-        &mut self,
-        key: &[u8],
-        tolerance: usize,
-    ) -> Option<&mut N> {
-        self.get_position_key(key, tolerance)
-            .and_then(move |position_key| {
-                if let Some(node_name) = self.virtual_nodes.get(&position_key) 
{
-                    Some(&mut 
(self.node_replicas.get_mut(node_name).unwrap().0))
-                } else {
-                    None
-                }
-            })
-    }
-
-    fn get_position_key(&self, key: &[u8], tolerance: usize) -> 
Option<Vec<u8>> {
-        if self.node_replicas.is_empty() {
-            return None;
-        };
-
-        let mut tolerance = if tolerance >= self.virtual_nodes.len() {
-            self.virtual_nodes.len() - 1
-        } else {
-            tolerance
-        };
-        let hashed_key = (self.hash_func)(key);
-        for (position_key, node_name) in self
-            .virtual_nodes
-            .range(hashed_key..)
-            .chain(self.virtual_nodes.iter())
-        {
-            if let Some((node, _)) = self.node_replicas.get(node_name)
-                && node.is_valid()
-            {
-                return Some(position_key.clone());
-            }
-            if tolerance == 0 {
-                return None;
-            } else {
-                tolerance -= 1;
-            }
-        }
-
-        None
-    }
-}
-
-/// Computes an MD5 hash of the input data.
-pub fn md5_hash(data: &[u8]) -> Vec<u8> {
-    let mut digest = Md5::default();
-    digest.update(data);
-    digest.finalize().to_vec()
-}
-
-#[cfg(test)]
-mod test {
-    use crate::consistent_hash::ConsistentHash;
-    use crate::consistent_hash::node::Node;
-
-    #[test]
-    fn test_topology() {
-        let (mut consistent_hash, nodes, keys) = prepare_consistent_hash();
-
-        // Test removal case
-        let (node, num_replicas) = 
consistent_hash.remove(nodes[3].name()).unwrap();
-        for (i, key) in keys.iter().enumerate() {
-            if i == 3 {
-                assert_ne!(
-                    consistent_hash.get(key.as_bytes()).unwrap().name(),
-                    nodes[i].name()
-                );
-            } else {
-                assert_eq!(
-                    consistent_hash.get(key.as_bytes()).unwrap().name(),
-                    nodes[i].name()
-                );
-            }
-        }
-
-        // Test adding case
-        consistent_hash.add(node, num_replicas);
-        for (i, key) in keys.iter().enumerate() {
-            assert_eq!(
-                consistent_hash.get(key.as_bytes()).unwrap().name(),
-                nodes[i].name()
-            );
-        }
-    }
-
-    #[test]
-    fn test_tolerance() {
-        let (mut consistent_hash, nodes, keys) = prepare_consistent_hash();
-        let (mut node, num_replicas) = 
consistent_hash.remove(nodes[2].name()).unwrap();
-        node.available = false;
-        consistent_hash.add(node, num_replicas);
-        for (i, key) in keys.iter().enumerate() {
-            if i == 2 {
-                assert!(consistent_hash.get(key.as_bytes()).is_none());
-                assert!(
-                    consistent_hash
-                        .get_with_tolerance(key.as_bytes(), 1)
-                        .is_some()
-                );
-            } else {
-                assert_eq!(
-                    consistent_hash.get(key.as_bytes()).unwrap().name(),
-                    nodes[i].name()
-                );
-            }
-        }
-
-        for (i, node) in nodes.iter().enumerate() {
-            if i != 2 && i != 1 {
-                let (mut node, num_replicas) =
-                    consistent_hash.remove(node.name()).unwrap();
-                node.available = false;
-                consistent_hash.add(node, num_replicas);
-            }
-        }
-        for (i, key) in keys.iter().enumerate() {
-            if i == 1 {
-                assert_eq!(
-                    consistent_hash.get(key.as_bytes()).unwrap().name(),
-                    nodes[i].name()
-                );
-            } else {
-                assert!(consistent_hash.get(key.as_bytes()).is_none());
-            }
-            assert_eq!(
-                consistent_hash
-                    .get_with_tolerance(key.as_bytes(), usize::MAX)
-                    .unwrap()
-                    .name(),
-                nodes[1].name()
-            );
-        }
-    }
-
-    #[derive(Clone)]
-    struct ServerNode {
-        name: String,
-        available: bool,
-    }
-
-    impl ServerNode {
-        fn new(host: &str, port: u16) -> Self {
-            Self::new_with_available(host, port, true)
-        }
-
-        fn new_with_available(host: &str, port: u16, available: bool) -> Self {
-            Self {
-                name: format!("{host}:{port}"),
-                available,
-            }
-        }
-    }
-
-    impl Node for ServerNode {
-        fn name(&self) -> &str {
-            &self.name
-        }
-
-        fn is_valid(&self) -> bool {
-            self.available
-        }
-    }
-
-    fn prepare_consistent_hash() -> (
-        ConsistentHash<ServerNode>,
-        Vec<ServerNode>,
-        Vec<&'static str>,
-    ) {
-        let num_replicas = 20usize;
-
-        let nodes = vec![
-            ServerNode::new("localhost", 10000),
-            ServerNode::new("localhost", 10001),
-            ServerNode::new("localhost", 10002),
-            ServerNode::new("localhost", 10003),
-            ServerNode::new("localhost", 10004),
-        ];
-
-        let node_replicas = nodes
-            .iter()
-            .map(|node| (node.clone(), num_replicas))
-            .collect::<Vec<_>>();
-        let consistent_hash = ConsistentHash::new(node_replicas);
-
-        let keys = vec!["1", "4", "5", "3", "2"];
-        for (i, key) in keys.iter().enumerate() {
-            assert_eq!(
-                consistent_hash.get(key.as_bytes()).unwrap().name(),
-                nodes[i].name()
-            );
-        }
-
-        (consistent_hash, nodes, keys)
-    }
-}
diff --git a/ballista/core/src/consistent_hash/node.rs 
b/ballista/core/src/consistent_hash/node.rs
deleted file mode 100644
index 8e995bc09..000000000
--- a/ballista/core/src/consistent_hash/node.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Node trait for consistent hashing.
-
-/// A trait representing a node in a consistent hash ring.
-pub trait Node {
-    /// Returns the unique name identifier for this node.
-    fn name(&self) -> &str;
-
-    /// Returns whether this node is currently valid and can receive requests.
-    fn is_valid(&self) -> bool;
-}
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index c9c4ef1d5..7211772f5 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -35,8 +35,6 @@ pub fn print_version() {
 pub mod client;
 /// Configuration options and settings for Ballista components.
 pub mod config;
-/// Consistent hashing implementation for data distribution.
-pub mod consistent_hash;
 /// Utilities for generating execution plan diagrams.
 pub mod diagram;
 /// Error types and result definitions for Ballista operations.
diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index f3f70e99c..721d068f9 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -17,9 +17,7 @@
 
 use crate::cluster::{
     BoundTask, ClusterState, ExecutorSlot, JobState, JobStateEvent, 
JobStateEventStream,
-    JobStatus, TaskDistributionPolicy, TopologyNode, bind_task_bias,
-    bind_task_consistent_hash, bind_task_round_robin, get_scan_files,
-    is_skip_consistent_hash,
+    JobStatus, TaskDistributionPolicy, bind_task_bias, bind_task_round_robin,
 };
 use crate::state::execution_graph::ExecutionGraphBox;
 use async_trait::async_trait;
@@ -39,14 +37,12 @@ use crate::scheduler_server::{SessionBuilder, 
timestamp_millis, timestamp_secs};
 use crate::state::session_manager::create_datafusion_context;
 use crate::state::task_manager::JobInfoCache;
 use ballista_core::serde::protobuf::job_status::Status;
-use log::{error, info, warn};
+use log::{error, warn};
 use std::collections::{HashMap, HashSet};
 use std::ops::DerefMut;
 
-use ballista_core::consistent_hash::node::Node;
-use datafusion::physical_plan::ExecutionPlan;
 use std::sync::Arc;
-use tokio::sync::{Mutex, MutexGuard};
+use tokio::sync::Mutex;
 
 use super::{ClusterStateEvent, ClusterStateEventStream};
 
@@ -67,44 +63,6 @@ pub struct InMemoryClusterState {
     cluster_event_sender: ClusterEventSender<ClusterStateEvent>,
 }
 
-impl InMemoryClusterState {
-    /// Get the topology nodes of the cluster for consistent hashing
-    fn get_topology_nodes(
-        &self,
-        guard: &MutexGuard<HashMap<String, AvailableTaskSlots>>,
-        executors: Option<HashSet<String>>,
-    ) -> HashMap<String, TopologyNode> {
-        let mut nodes: HashMap<String, TopologyNode> = HashMap::new();
-        for (executor_id, slots) in guard.iter() {
-            if let Some(executors) = executors.as_ref()
-                && !executors.contains(executor_id)
-            {
-                continue;
-            }
-            if let Some(executor) = self.executors.get(&slots.executor_id) {
-                let node = TopologyNode::new(
-                    &executor.host,
-                    executor.port,
-                    &slots.executor_id,
-                    self.heartbeats
-                        .get(&executor.id)
-                        .map(|heartbeat| heartbeat.timestamp)
-                        .unwrap_or(0),
-                    slots.slots,
-                );
-                if let Some(existing_node) = nodes.get(node.name()) {
-                    if existing_node.last_seen_ts < node.last_seen_ts {
-                        nodes.insert(node.name().to_string(), node);
-                    }
-                } else {
-                    nodes.insert(node.name().to_string(), node);
-                }
-            }
-        }
-        nodes
-    }
-}
-
 #[async_trait]
 impl ClusterState for InMemoryClusterState {
     async fn bind_schedulable_tasks(
@@ -134,51 +92,6 @@ impl ClusterState for InMemoryClusterState {
             TaskDistributionPolicy::RoundRobin => {
                 bind_task_round_robin(available_slots, active_jobs, |_| 
false).await
             }
-            TaskDistributionPolicy::ConsistentHash {
-                num_replicas,
-                tolerance,
-            } => {
-                let mut bound_tasks = bind_task_round_robin(
-                    available_slots,
-                    active_jobs.clone(),
-                    |stage_plan: Arc<dyn ExecutionPlan>| {
-                        if let Ok(scan_files) = get_scan_files(stage_plan) {
-                            // Should be opposite to consistent hash ones.
-                            !is_skip_consistent_hash(&scan_files)
-                        } else {
-                            false
-                        }
-                    },
-                )
-                .await;
-                info!("{} tasks bound by round robin policy", 
bound_tasks.len());
-                let (bound_tasks_consistent_hash, ch_topology) =
-                    bind_task_consistent_hash(
-                        self.get_topology_nodes(&guard, executors),
-                        num_replicas,
-                        tolerance,
-                        active_jobs,
-                        |_, plan| get_scan_files(plan),
-                    )
-                    .await?;
-                info!(
-                    "{} tasks bound by consistent hashing policy",
-                    bound_tasks_consistent_hash.len()
-                );
-                if !bound_tasks_consistent_hash.is_empty() {
-                    bound_tasks.extend(bound_tasks_consistent_hash);
-                    // Update the available slots
-                    let ch_topology = ch_topology.unwrap();
-                    for node in ch_topology.nodes() {
-                        if let Some(data) = guard.get_mut(&node.id) {
-                            data.slots = node.available_slots;
-                        } else {
-                            error!("Fail to find executor data for {}", 
&node.id);
-                        }
-                    }
-                }
-                bound_tasks
-            }
             TaskDistributionPolicy::Custom(ref policy) => {
                 policy.bind_tasks(available_slots, active_jobs).await?
             }
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 552faa265..58c63530f 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -15,38 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::{HashMap, HashSet};
-use std::pin::Pin;
-use std::sync::Arc;
-
-use datafusion::common::tree_node::TreeNode;
-use datafusion::common::tree_node::TreeNodeRecursion;
-use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::FileScanConfig;
-use datafusion::datasource::source::DataSourceExec;
-use datafusion::error::DataFusionError;
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{SessionConfig, SessionContext};
-use futures::Stream;
-use log::debug;
-
-use ballista_core::consistent_hash::ConsistentHash;
-use ballista_core::error::Result;
-use ballista_core::serde::protobuf::{
-    AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
-};
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, 
PartitionId};
-use ballista_core::utils::{default_config_producer, default_session_builder};
-use ballista_core::{ConfigProducer, JobStatusSubscriber, consistent_hash};
-
 use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};
-
 use crate::config::{SchedulerConfig, TaskDistributionPolicy};
 use crate::scheduler_server::SessionBuilder;
 use crate::state::execution_graph::{
     ExecutionGraphBox, TaskDescription, create_task_info,
 };
 use crate::state::task_manager::JobInfoCache;
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::{
+    AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata, 
PartitionId};
+use ballista_core::utils::{default_config_producer, default_session_builder};
+use ballista_core::{ConfigProducer, JobStatusSubscriber};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use futures::Stream;
+use log::debug;
+use std::collections::{HashMap, HashSet};
+use std::pin::Pin;
+use std::sync::Arc;
 
 /// Event broadcasting and subscription for cluster state changes.
 pub mod event;
@@ -534,12 +523,6 @@ pub(crate) async fn bind_task_round_robin(
     schedulable_tasks
 }
 
-/// Maps execution plan to list of files it scans
-type GetScanFilesFunc = fn(
-    &str,
-    Arc<dyn ExecutionPlan>,
-) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>>;
-
 /// User provided task distribution policy
 #[async_trait::async_trait]
 pub trait DistributionPolicy: std::fmt::Debug + Send + Sync {
@@ -572,208 +555,16 @@ pub trait DistributionPolicy: std::fmt::Debug + Send + 
Sync {
     fn name(&self) -> &str;
 }
 
-pub(crate) async fn bind_task_consistent_hash(
-    topology_nodes: HashMap<String, TopologyNode>,
-    num_replicas: usize,
-    tolerance: usize,
-    running_jobs: Arc<HashMap<String, JobInfoCache>>,
-    get_scan_files: GetScanFilesFunc,
-) -> Result<(Vec<BoundTask>, Option<ConsistentHash<TopologyNode>>)> {
-    let mut total_slots = 0usize;
-    for (_, node) in topology_nodes.iter() {
-        total_slots += node.available_slots as usize;
-    }
-    if total_slots == 0 {
-        debug!(
-            "Not enough available executor slots for binding tasks with 
consistent hashing policy!!!"
-        );
-        return Ok((vec![], None));
-    }
-    debug!("Total slot number for consistent hash binding is {total_slots}");
-
-    let node_replicas = topology_nodes
-        .into_values()
-        .map(|node| (node, num_replicas))
-        .collect::<Vec<_>>();
-    let mut ch_topology: ConsistentHash<TopologyNode> =
-        ConsistentHash::new(node_replicas);
-
-    let mut schedulable_tasks: Vec<BoundTask> = vec![];
-    for (job_id, job_info) in running_jobs.iter() {
-        if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
-            debug!("Job {job_id} is not in running status and will be 
skipped");
-            continue;
-        }
-        let mut graph = job_info.execution_graph.write().await;
-        let session_id = graph.session_id().to_string();
-        let mut black_list = vec![];
-        while let Some((running_stage, task_id_gen)) =
-            graph.fetch_running_stage(&black_list)
-        {
-            let scan_files = get_scan_files(job_id, 
running_stage.plan.clone())?;
-            if is_skip_consistent_hash(&scan_files) {
-                debug!(
-                    "Will skip stage {}/{} for consistent hashing task 
binding",
-                    job_id, running_stage.stage_id
-                );
-                black_list.push(running_stage.stage_id);
-                continue;
-            }
-            let pre_total_slots = total_slots;
-            let scan_files = &scan_files[0];
-            let tolerance_list = vec![0, tolerance];
-            // First round with 0 tolerance consistent hashing policy
-            // Second round with [`tolerance`] tolerance consistent hashing 
policy
-            for tolerance in tolerance_list {
-                let runnable_tasks = running_stage
-                    .task_infos
-                    .iter_mut()
-                    .enumerate()
-                    .filter(|(_partition, info)| info.is_none())
-                    .take(total_slots)
-                    .collect::<Vec<_>>();
-                for (partition_id, task_info) in runnable_tasks {
-                    let partition_files = &scan_files[partition_id];
-                    assert!(!partition_files.is_empty());
-                    // Currently we choose the first file for a task for 
consistent hash.
-                    // Later when splitting files for tasks in datafusion, 
it's better to
-                    // introduce this hash based policy besides the file 
number policy or file size policy.
-                    let file_for_hash = &partition_files[0];
-                    if let Some(node) = ch_topology.get_mut_with_tolerance(
-                        file_for_hash.object_meta.location.as_ref().as_bytes(),
-                        tolerance,
-                    ) {
-                        let executor_id = node.id.clone();
-                        let task_id = *task_id_gen;
-                        *task_id_gen += 1;
-                        *task_info = 
Some(create_task_info(executor_id.clone(), task_id));
-
-                        let partition = PartitionId {
-                            job_id: job_id.clone(),
-                            stage_id: running_stage.stage_id,
-                            partition_id,
-                        };
-                        let task_desc = TaskDescription {
-                            session_id: session_id.clone(),
-                            partition,
-                            stage_attempt_num: running_stage.stage_attempt_num,
-                            task_id,
-                            task_attempt: running_stage.task_failure_numbers
-                                [partition_id],
-                            plan: running_stage.plan.clone(),
-                            session_config: 
running_stage.session_config.clone(),
-                        };
-                        schedulable_tasks.push((executor_id, task_desc));
-
-                        node.available_slots -= 1;
-                        total_slots -= 1;
-                        if total_slots == 0 {
-                            return Ok((schedulable_tasks, Some(ch_topology)));
-                        }
-                    }
-                }
-            }
-            // Since there's no more tasks from this stage which can be bound,
-            // we should skip this stage at the next round.
-            if pre_total_slots == total_slots {
-                black_list.push(running_stage.stage_id);
-            }
-        }
-    }
-
-    Ok((schedulable_tasks, Some(ch_topology)))
-}
-
-// If if there's no plan which needs to scan files, skip it.
-// Or there are multiple plans which need to scan files for a stage, skip it.
-pub(crate) fn is_skip_consistent_hash(scan_files: 
&[Vec<Vec<PartitionedFile>>]) -> bool {
-    scan_files.is_empty() || scan_files.len() > 1
-}
-
-/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
-pub(crate) fn get_scan_files(
-    plan: Arc<dyn ExecutionPlan>,
-) -> std::result::Result<Vec<Vec<Vec<PartitionedFile>>>, DataFusionError> {
-    let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
-    plan.apply(&mut |plan: &Arc<dyn ExecutionPlan>| {
-        let plan_any = plan.as_any();
-
-        if let Some(config) = plan_any
-            .downcast_ref::<DataSourceExec>()
-            .and_then(|c| 
c.data_source().as_any().downcast_ref::<FileScanConfig>())
-        {
-            collector.push(
-                config
-                    .file_groups
-                    .iter()
-                    .map(|f| f.clone().into_inner())
-                    .collect(),
-            );
-            Ok(TreeNodeRecursion::Jump)
-        } else {
-            Ok(TreeNodeRecursion::Continue)
-        }
-    })?;
-    Ok(collector)
-}
-
-/// Represents a node in the cluster topology for consistent hashing.
-#[derive(Clone)]
-pub struct TopologyNode {
-    /// Unique executor ID.
-    pub id: String,
-    /// Host:port name for the node.
-    pub name: String,
-    /// Timestamp of last heartbeat received.
-    pub last_seen_ts: u64,
-    /// Number of available task slots on this node.
-    pub available_slots: u32,
-}
-
-impl TopologyNode {
-    fn new(
-        host: &str,
-        port: u16,
-        id: &str,
-        last_seen_ts: u64,
-        available_slots: u32,
-    ) -> Self {
-        Self {
-            id: id.to_string(),
-            name: format!("{host}:{port}"),
-            last_seen_ts,
-            available_slots,
-        }
-    }
-}
-
-impl consistent_hash::node::Node for TopologyNode {
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn is_valid(&self) -> bool {
-        self.available_slots > 0
-    }
-}
-
 #[cfg(test)]
 mod test {
     use std::collections::HashMap;
     use std::sync::Arc;
 
-    use datafusion::datasource::listing::PartitionedFile;
-    use object_store::ObjectMeta;
-    use object_store::path::Path;
-
     use ballista_core::error::Result;
     use ballista_core::serde::protobuf::AvailableTaskSlots;
     use ballista_core::serde::scheduler::{ExecutorMetadata, 
ExecutorSpecification};
 
-    use crate::cluster::{
-        BoundTask, TopologyNode, bind_task_bias, bind_task_consistent_hash,
-        bind_task_round_robin,
-    };
+    use crate::cluster::{BoundTask, bind_task_bias, bind_task_round_robin};
     use crate::state::execution_graph::{ExecutionGraph, StaticExecutionGraph};
     use crate::state::task_manager::JobInfoCache;
     use crate::test_utils::{
@@ -887,100 +678,6 @@ mod test {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_bind_task_consistent_hash() -> Result<()> {
-        let num_partition = 8usize;
-        let active_jobs = mock_active_jobs(num_partition).await?;
-        let active_jobs = Arc::new(active_jobs);
-        let topology_nodes = mock_topology_nodes();
-        let num_replicas = 31;
-        let tolerance = 0;
-
-        // Check none scan files case
-        {
-            let (bound_tasks, _) = bind_task_consistent_hash(
-                topology_nodes.clone(),
-                num_replicas,
-                tolerance,
-                active_jobs.clone(),
-                |_, _| Ok(vec![]),
-            )
-            .await?;
-            assert_eq!(0, bound_tasks.len());
-        }
-
-        // Check job_b with scan files
-        {
-            let (bound_tasks, _) = bind_task_consistent_hash(
-                topology_nodes,
-                num_replicas,
-                tolerance,
-                active_jobs,
-                |job_id, _| mock_get_scan_files("job_b", job_id, 8),
-            )
-            .await?;
-            assert_eq!(6, bound_tasks.len());
-
-            let result = get_result(bound_tasks);
-
-            let mut expected = HashMap::new();
-            {
-                let mut entry_b = HashMap::new();
-                entry_b.insert("executor_3".to_string(), 2);
-                entry_b.insert("executor_2".to_string(), 3);
-                entry_b.insert("executor_1".to_string(), 1);
-
-                expected.insert("job_b".to_string(), entry_b);
-            }
-            assert!(
-                expected.eq(&result),
-                "The result {result:?} is not as expected {expected:?}"
-            );
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_bind_task_consistent_hash_with_tolerance() -> Result<()> {
-        let num_partition = 8usize;
-        let active_jobs = mock_active_jobs(num_partition).await?;
-        let active_jobs = Arc::new(active_jobs);
-        let topology_nodes = mock_topology_nodes();
-        let num_replicas = 31;
-        let tolerance = 1;
-
-        {
-            let (bound_tasks, _) = bind_task_consistent_hash(
-                topology_nodes,
-                num_replicas,
-                tolerance,
-                active_jobs,
-                |job_id, _| mock_get_scan_files("job_b", job_id, 8),
-            )
-            .await?;
-            assert_eq!(7, bound_tasks.len());
-
-            let result = get_result(bound_tasks);
-
-            let mut expected = HashMap::new();
-            {
-                let mut entry_b = HashMap::new();
-                entry_b.insert("executor_3".to_string(), 3);
-                entry_b.insert("executor_2".to_string(), 3);
-                entry_b.insert("executor_1".to_string(), 1);
-
-                expected.insert("job_b".to_string(), entry_b);
-            }
-            assert!(
-                expected.eq(&result),
-                "The result {result:?} is not as expected {expected:?}"
-            );
-        }
-
-        Ok(())
-    }
-
     fn get_result(
         bound_tasks: Vec<BoundTask>,
     ) -> HashMap<String, HashMap<String, usize>> {
@@ -1061,55 +758,4 @@ mod test {
             },
         ]
     }
-
-    fn mock_topology_nodes() -> HashMap<String, TopologyNode> {
-        let mut topology_nodes = HashMap::new();
-        topology_nodes.insert(
-            "executor_1".to_string(),
-            TopologyNode::new("localhost", 8081, "executor_1", 0, 1),
-        );
-        topology_nodes.insert(
-            "executor_2".to_string(),
-            TopologyNode::new("localhost", 8082, "executor_2", 0, 3),
-        );
-        topology_nodes.insert(
-            "executor_3".to_string(),
-            TopologyNode::new("localhost", 8083, "executor_3", 0, 5),
-        );
-        topology_nodes
-    }
-
-    fn mock_get_scan_files(
-        expected_job_id: &str,
-        job_id: &str,
-        num_partition: usize,
-    ) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>> {
-        Ok(if expected_job_id.eq(job_id) {
-            mock_scan_files(num_partition)
-        } else {
-            vec![]
-        })
-    }
-
-    fn mock_scan_files(num_partition: usize) -> Vec<Vec<Vec<PartitionedFile>>> 
{
-        let mut scan_files = vec![];
-        for i in 0..num_partition {
-            scan_files.push(vec![PartitionedFile {
-                object_meta: ObjectMeta {
-                    location: Path::from(format!("file--{i}")),
-                    last_modified: Default::default(),
-                    size: 1,
-                    e_tag: None,
-                    version: None,
-                },
-                partition_values: vec![],
-                range: None,
-                extensions: None,
-                statistics: None,
-                metadata_size_hint: None,
-                ordering: None,
-            }]);
-        }
-        vec![scan_files]
-    }
 }
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index dd73ee9a1..cefe66dd3 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -435,11 +435,6 @@ pub enum TaskDistribution {
     /// Distribute tasks evenly across executors. This will try and iterate 
through available executors
     /// and assign one task to each executor until all tasks are assigned.
     RoundRobin,
-    /// 1. Firstly, try to bind tasks without scanning source files by 
`RoundRobin` policy.
-    /// 2. Then for a task for scanning source files, firstly calculate a hash 
value based on input files.
-    ///    And then bind it with an execute according to consistent hashing 
policy.
-    /// 3. If needed, work stealing can be enabled based on the tolerance of 
the consistent hashing.
-    ConsistentHash,
 }
 
 impl Display for TaskDistribution {
@@ -447,7 +442,6 @@ impl Display for TaskDistribution {
         match self {
             TaskDistribution::Bias => f.write_str("bias"),
             TaskDistribution::RoundRobin => f.write_str("round-robin"),
-            TaskDistribution::ConsistentHash => f.write_str("consistent-hash"),
         }
     }
 }
@@ -471,16 +465,6 @@ pub enum TaskDistributionPolicy {
     /// Distribute tasks evenly across executors. This will try and iterate 
through available executors
     /// and assign one task to each executor until all tasks are assigned.
     RoundRobin,
-    /// 1. Firstly, try to bind tasks without scanning source files by 
`RoundRobin` policy.
-    /// 2. Then for a task for scanning source files, firstly calculate a hash 
value based on input files.
-    ///    And then bind it with an execute according to consistent hashing 
policy.
-    /// 3. If needed, work stealing can be enabled based on the tolerance of 
the consistent hashing.
-    ConsistentHash {
-        /// Number of virtual nodes per executor on the consistent hash ring.
-        num_replicas: usize,
-        /// Tolerance for work stealing when slots are imbalanced.
-        tolerance: usize,
-    },
     /// User provided task distribution policy
     Custom(Arc<dyn DistributionPolicy>),
 }
@@ -493,14 +477,6 @@ impl TryFrom<Config> for SchedulerConfig {
         let task_distribution = match opt.task_distribution {
             TaskDistribution::Bias => TaskDistributionPolicy::Bias,
             TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin,
-            TaskDistribution::ConsistentHash => {
-                let num_replicas = opt.consistent_hash_num_replicas as usize;
-                let tolerance = opt.consistent_hash_tolerance as usize;
-                TaskDistributionPolicy::ConsistentHash {
-                    num_replicas,
-                    tolerance,
-                }
-            }
         };
 
         let config = SchedulerConfig {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 4e1e9e5b1..3320934ba 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -134,11 +134,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 TaskDistributionPolicy::RoundRobin => {
                     bind_task_round_robin(available_slots, running_jobs, |_| 
false).await
                 }
-                TaskDistributionPolicy::ConsistentHash { .. } => {
-                    return Err(Status::unimplemented(
-                        "ConsistentHash TaskDistribution is not feasible for 
pull-based task scheduling",
-                    ));
-                }
 
                 TaskDistributionPolicy::Custom(ref policy) => policy
                     .bind_tasks(available_slots, running_jobs)
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 56b847e13..dbc996eb4 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -103,6 +103,6 @@ _Example: Specifying configuration options when starting 
the scheduler_
 | -------------------------------------------- | ------ | ----------- | 
--------------------------------------------------------------------------------------------------------------------------
 |
 | scheduler-policy                             | Utf8   | pull-staged | Sets 
the task scheduling policy for the scheduler, possible values: pull-staged, 
push-staged.                              |
 | event-loop-buffer-size                       | UInt32 | 10000       | Sets 
the event loop buffer size. for a system of high throughput, a larger value 
like 1000000 is recommended.              |
-| task-distribution                            | Utf8   | bias        | Sets 
the task distribution policy for the scheduler, possible values: bias, 
round-robin, consistent-hash.                  |
+| task-distribution                            | Utf8   | bias        | Sets 
the task distribution policy for the scheduler, possible values: bias, 
round-robin                                    |
 | finished-job-data-clean-up-interval-seconds  | UInt64 | 300         | Sets 
the delayed interval for cleaning up finished job data, mainly the shuffle 
data, 0 means the cleaning up is disabled. |
 | finished-job-state-clean-up-interval-seconds | UInt64 | 3600        | Sets 
the delayed interval for cleaning up finished job state stored in the backend, 
0 means the cleaning up is disabled.   |


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to