andygrove commented on code in PR #1456: URL: https://github.com/apache/datafusion-comet/pull/1456#discussion_r2002089283
########## native/core/src/execution/planner.rs: ########## @@ -3004,4 +3006,130 @@ mod tests { type_info: None, } } + + #[test] + fn test_create_array() { + let session_ctx = SessionContext::new(); + session_ctx.register_udf(ScalarUDF::from( + datafusion_functions_nested::make_array::MakeArray::new(), + )); + let task_ctx = session_ctx.task_ctx(); + let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + + // Create a plan for + // ProjectionExec: expr=[make_array(col_0@0) as col_0] + // ScanExec: source=[CometScan parquet (unknown)], schema=[col_0: Int32] + let op_scan = Operator { + plan_id: 0, + children: vec![], + op_struct: Some(OpStruct::Scan(spark_operator::Scan { + fields: vec![ + spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }, + spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }, + spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }, + ], + source: "".to_string(), + })), + }; + + let array_col = spark_expression::Expr { + expr_struct: Some(Bound(spark_expression::BoundReference { + index: 0, + datatype: Some(spark_expression::DataType { + type_id: 3, + type_info: None, + }), + })), + }; + + let array_col_1 = spark_expression::Expr { + expr_struct: Some(Bound(spark_expression::BoundReference { + index: 1, + datatype: Some(spark_expression::DataType { + type_id: 3, + type_info: None, + }), + })), + }; + + let projection = Operator { + children: vec![op_scan], + plan_id: 0, + op_struct: Some(OpStruct::Projection(spark_operator::Projection { + project_list: vec![spark_expression::Expr { + expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + func: "make_array".to_string(), + args: vec![array_col, array_col_1], + return_type: None, + })), + }], + })), + }; + + let a = Int32Array::from(vec![0, 3]); + let b = Int32Array::from(vec![1, 4]); + let c = Int32Array::from(vec![2, 5]); + let input_batch = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 2); + + let (mut scans, datafusion_plan) = + planner.create_plan(&projection, &mut vec![], 1).unwrap(); + scans[0].set_input_batch(input_batch); + + let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + let (tx, mut rx) = mpsc::channel(1); + + // Separate thread to send the EOF signal once we've processed the only input batch + runtime.spawn(async move { + // Create a dictionary array with 100 values, and use it as input to the execution. + let a = Int32Array::from(vec![0, 3]); + let b = Int32Array::from(vec![1, 4]); + let c = Int32Array::from(vec![2, 5]); + let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 2); + let input_batch2 = InputBatch::EOF; + + let batches = vec![input_batch1, input_batch2]; + + for batch in batches.into_iter() { + tx.send(batch).await.unwrap(); + } + }); + + runtime.block_on(async move { Review Comment: It's nice to see an end-to-end test like this in the Rust project -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org