janbraunsdorff opened a new issue, #1268: URL: https://github.com/apache/datafusion-ballista/issues/1268
**Describe the bug** Hello, I try to remove Spark with Ballista. The Usecase is to read a Delta Table from S3, do some stuff, and wirte it back to S3. Unfortunately I am stuck on the scheduler. No matter what I try, i alwy get: `Could not parse logical plan protobuf: Internal error: Error encoding delta table`. I run this in standalone, too, and everything works as aspected. Can someone help me or has some working example? **To Reproduce** Enviroment: Kubernets Cluster with 1 Scheduler and 1 Executor in version 46.0.1 I extendet the scheduler and executor as mention in https://github.com/milenkovicm/ballista_delta and https://github.com/apache/datafusion-ballista/issues/1241. Scheduler: ```rust // bin/main.rs let config = config .with_override_config_producer(Arc::new(session_config_with_s3_support)) .with_override_session_builder(Arc::new(session_state_with_s3_support)) .wih_override_logical_codec(Arc::new(BallistaDeltaLogicalCodec::default())) .wih_override_physical_codec(Arc::new(BallistaDeltaPhysicalCodec::default())); // config.rs impl SchedulerConfig { .... pub fn wih_override_logical_codec( mut self, override_logical_codec: Arc<dyn LogicalExtensionCodec> ) -> Self { self.override_logical_codec = Some(override_logical_codec); self } pub fn wih_override_physical_codec( mut self, override_physical_codec: Arc<dyn PhysicalExtensionCodec> ) -> Self { self.override_physical_codec = Some(override_physical_codec); self } } ``` Executor: ```rust let mut config: ExecutorProcessConfig = opt.try_into()?; config.override_config_producer = Some(Arc::new(session_config_with_s3_support)); config.override_runtime_producer = Some(Arc::new(runtime_env_with_s3_support)); config.override_logical_codec = Some(Arc::new(BallistaDeltaLogicalCodec::default())); config.override_physical_codec = Some(Arc::new(BallistaDeltaPhysicalCodec::default())); ``` Client Main ```rust let config = SessionConfig::new_with_ballista() .with_information_schema(true) .with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default())) .with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default())) .with_option_extension(S3Options::default()); let runtime_env = RuntimeEnvBuilder::new().build()?; runtime_env.register_object_store( &format!("s3://{}", BUCKET) .as_str() .try_into() .unwrap(), Arc::new(create_s3_store()?), ); let state = SessionStateBuilder::new() .with_runtime_env(runtime_env.into()) .with_config(config) .with_default_features() .with_table_factory("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})) .build(); let ctx: SessionContext = SessionContext::remote_with_state("df://localhost:50050", state).await?; // using SQL ctx.register_object_store(&format!("s3://{}", BUCKET).as_str().try_into()?, Arc::new(create_s3_store()?)); ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID)).await?; ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY)).await?; ctx.sql(&format!("SET s3.endpoint = '{}://{}'", PROTOCOLL, HOST)).await?; ctx.sql("SET s3.allow_http = true").await?; println!("register table remote"); ctx.register_parquet( "test", "s3://bucket/foo.parquet", ParquetReadOptions::default(), ).await?; // using delta-rs and delta lake deltalake::aws::register_handlers(None); let table = open_table("s3a://ka-etu-dih-001-datafusion-001/data/customer").await?; ctx.register_table("customer", Arc::new(table))?; ``` **Expected behavior** Scheduler can parse Delta Table and distribute plans to the executors **Additional context** ```sh Error: Arrow error: External error: Execution error: Fail to execute query due to ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse logical plan protobuf: Internal error: Error encoding delta table.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker")) } Caused by: 0: External error: Execution error: Fail to execute query due to ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse logical plan protobuf: Internal error: Error encoding delta table.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker")) } 1: Execution error: Fail to execute query due to ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse logical plan protobuf: Internal error: Error encoding delta table.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker")) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org