milenkovicm commented on code in PR #1337:
URL:
https://github.com/apache/datafusion-ballista/pull/1337#discussion_r2462614075
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -274,10 +275,11 @@ async fn execute_query(
session_id: String,
query: ExecuteQueryParams,
max_message_size: usize,
+ config: BallistaConfig,
Review Comment:
we already have gpc_max_message_size as a parameter can we pass other needed
values as parameters rather than passing config
##########
ballista/executor/src/executor_server.rs:
##########
@@ -114,7 +117,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
tokio::spawn(async move {
let shutdown_signal = grpc_shutdown.recv();
- let grpc_server_future = create_grpc_server()
+ let grpc_server_future = create_grpc_server(&ballista_config)
Review Comment:
this configuration should be provided as part of executor binary
configuration, not ballista condif
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -238,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
self.session_id.clone(),
query,
self.config.default_grpc_client_max_message_size(),
+ self.config.clone(),
Review Comment:
we already have `gpc_max_message_size` as a parameter can we pass other
needed values as parameters rather than passing config
##########
ballista/core/src/utils.rs:
##########
@@ -106,31 +107,50 @@ pub async fn collect_stream(
pub async fn create_grpc_client_connection<D>(
dst: D,
+ config: &BallistaConfig,
Review Comment:
as i mentioned before, IMHO it makes more sense to pass individual
parameters, or we can derive some kind of `GprcConfig` if there are too many of
them
##########
ballista/core/src/utils.rs:
##########
@@ -106,31 +107,50 @@ pub async fn collect_stream(
pub async fn create_grpc_client_connection<D>(
dst: D,
+ config: &BallistaConfig,
) -> std::result::Result<Channel, Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let endpoint = tonic::transport::Endpoint::new(dst)?
- .connect_timeout(Duration::from_secs(20))
- .timeout(Duration::from_secs(20))
+ .connect_timeout(Duration::from_secs(
+ config.default_grpc_client_connect_timeout_seconds() as u64,
+ ))
+ .timeout(Duration::from_secs(
+ config.default_grpc_client_timeout_seconds() as u64,
+ ))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
- .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
- .http2_keep_alive_interval(Duration::from_secs(300))
- .keep_alive_timeout(Duration::from_secs(20))
+ .tcp_keepalive(Option::Some(Duration::from_secs(
+ config.default_grpc_client_tcp_keepalive_seconds() as u64,
+ )))
+ .http2_keep_alive_interval(Duration::from_secs(
+ config.default_grpc_client_http2_keepalive_interval_seconds() as
u64,
+ ))
+ .keep_alive_timeout(Duration::from_secs(
+ config.default_grpc_client_keepalive_timeout_seconds() as u64,
+ ))
.keep_alive_while_idle(true);
endpoint.connect().await
}
-pub fn create_grpc_server() -> Server {
+pub fn create_grpc_server(config: &BallistaConfig) -> Server {
Review Comment:
as i mentioned before, IMHO it makes more sense to pass individual
parameters, or we can derive some kind of GprcConfig if there are too many of
them
##########
ballista/core/src/config.rs:
##########
@@ -39,6 +39,28 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";
+// gRPC client timeout configurations
+pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
+ "ballista.grpc.client.connect_timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
+ "ballista.grpc.client.timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
+ "ballista.grpc.client.tcp_keepalive_seconds";
+pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
+ "ballista.grpc.client.http2_keepalive_interval_seconds";
+pub const BALLISTA_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS: &str =
+ "ballista.grpc.client.keepalive_timeout_seconds";
+
+// gRPC server timeout configurations
Review Comment:
this config is relevant to `standalone` version only can we prefix it
somehow to identify it as standalone option?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]