milenkovicm commented on code in PR #1360:
URL:
https://github.com/apache/datafusion-ballista/pull/1360#discussion_r2659679232
##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -873,4 +873,77 @@ mod test {
assert!(active_executors.is_empty());
Ok(())
}
+ #[tokio::test]
+ async fn test_substrait_compatibility() -> Result<(), BallistaError> {
Review Comment:
test makes sense 👍🏻
##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -873,4 +873,77 @@ mod test {
assert!(active_executors.is_empty());
Ok(())
}
+ #[tokio::test]
+ async fn test_substrait_compatibility() -> Result<(), BallistaError> {
+ let cluster = test_cluster_context();
+
+ let config = SchedulerConfig::default();
+ let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+ SchedulerServer::new(
+ "localhost:50050".to_owned(),
+ cluster.clone(),
+ BallistaCodec::default(),
+ Arc::new(config),
+ default_metrics_collector().unwrap(),
+ );
+ scheduler.init().await?;
+
+ let exec_meta = ExecutorRegistration {
+ id: "abc".to_owned(),
+ host: Some("http://localhost:8080".to_owned()),
+ port: 0,
+ grpc_port: 0,
+ specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
+ };
+
+ let request: Request<RegisterExecutorParams> =
+ Request::new(RegisterExecutorParams {
+ metadata: Some(exec_meta.clone()),
+ });
+ let response = scheduler
+ .register_executor(request)
+ .await
+ .expect("Received error response")
+ .into_inner();
+
+ // registration should success
+ assert!(response.success);
+
+ let state = scheduler.state.clone();
+ // executor should be registered
+ let stored_executor = state
+ .executor_manager
+ .get_executor_metadata("abc")
+ .await
+ .expect("getting executor");
+
+ assert_eq!(stored_executor.grpc_port, 0);
+ assert_eq!(stored_executor.port, 0);
+ assert_eq!(stored_executor.specification.task_slots, 2);
+ assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
+
+ // Context strictly used for values-based query serialization to avoid
+ // needing to register tables and keep them in sync with the scheduler
instance.
+ // We only truly desire to test proper reception of a Substrait plan,
not explicit
+ // SubstraitPlan -> LogicalPlan conversions.
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+ let serialized_substrait_plan =
+ serialize_bytes("SELECT a, b FROM (VALUES (1, 2), (3, 4)) AS t(a,
b)", &ctx)
Review Comment:
maybe we could add another column in select like `abs(a) + abs(b)` or
similar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]