janbraunsdorff opened a new issue, #1268:
URL: https://github.com/apache/datafusion-ballista/issues/1268

   **Describe the bug**
   Hello,
   I try to remove Spark with Ballista. The Usecase is to read a Delta Table 
from S3, do some stuff, and wirte it back to S3. Unfortunately I am stuck on 
the scheduler. No matter what I try, i alwy get:  `Could not parse logical plan 
protobuf: Internal error: Error encoding delta table`. I run this in 
standalone, too, and everything works as aspected. Can someone help me or has 
some working example?
    
   
   **To Reproduce**
   Enviroment: Kubernets Cluster with 1 Scheduler and 1 Executor in version 
46.0.1
   I extendet the scheduler and executor as mention in 
https://github.com/milenkovicm/ballista_delta and 
https://github.com/apache/datafusion-ballista/issues/1241.
   
   Scheduler:
   ```rust
   // bin/main.rs
   let config = config
           
.with_override_config_producer(Arc::new(session_config_with_s3_support))
           
.with_override_session_builder(Arc::new(session_state_with_s3_support))
           
.wih_override_logical_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
           
.wih_override_physical_codec(Arc::new(BallistaDeltaPhysicalCodec::default()));
   
   // config.rs
   impl SchedulerConfig {
       ....
       pub fn wih_override_logical_codec(
           mut self,
           override_logical_codec: Arc<dyn LogicalExtensionCodec>
       ) -> Self {
           self.override_logical_codec = Some(override_logical_codec);
           self
       }
   
       pub fn wih_override_physical_codec(
           mut self,
           override_physical_codec: Arc<dyn PhysicalExtensionCodec>
       ) -> Self {
           self.override_physical_codec = Some(override_physical_codec);
           self
       }
   }
   ```
   
   Executor:
   ```rust
   let mut config: ExecutorProcessConfig = opt.try_into()?;
       config.override_config_producer = 
Some(Arc::new(session_config_with_s3_support));
       config.override_runtime_producer = 
Some(Arc::new(runtime_env_with_s3_support));
       config.override_logical_codec = 
Some(Arc::new(BallistaDeltaLogicalCodec::default()));
       config.override_physical_codec = 
Some(Arc::new(BallistaDeltaPhysicalCodec::default()));
   ```
   
   Client Main
   ```rust
   let config = SessionConfig::new_with_ballista()
       .with_information_schema(true)
       
.with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
       
.with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default()))
       .with_option_extension(S3Options::default());
   
   let runtime_env = RuntimeEnvBuilder::new().build()?;
   runtime_env.register_object_store(
       &format!("s3://{}", BUCKET)
           .as_str()
           .try_into()
           .unwrap(),
       Arc::new(create_s3_store()?),
   );
   
   let state = SessionStateBuilder::new()
       .with_runtime_env(runtime_env.into())
       .with_config(config)
       .with_default_features()
       .with_table_factory("DELTATABLE".to_string(), Arc::new(DeltaTableFactory 
{}))
       .build();
   
   let ctx: SessionContext = 
SessionContext::remote_with_state("df://localhost:50050", state).await?;
   
   // using SQL
   ctx.register_object_store(&format!("s3://{}", BUCKET).as_str().try_into()?, 
Arc::new(create_s3_store()?));
   ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID)).await?;
   ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY)).await?;
   ctx.sql(&format!("SET s3.endpoint = '{}://{}'", PROTOCOLL, HOST)).await?;  
   ctx.sql("SET s3.allow_http = true").await?;
   
   println!("register table remote");
   ctx.register_parquet(
       "test",
       "s3://bucket/foo.parquet",
       ParquetReadOptions::default(),
   ).await?;
   
   // using delta-rs and delta lake
   deltalake::aws::register_handlers(None);
   let table = 
open_table("s3a://ka-etu-dih-001-datafusion-001/data/customer").await?;
   ctx.register_table("customer", Arc::new(table))?;
   ```
   
   
   **Expected behavior**
   Scheduler can parse Delta Table and distribute plans to the executors
   
   **Additional context**
   ```sh
   Error: Arrow error: External error: Execution error: Fail to execute query 
due to ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not 
parse logical plan protobuf: Internal error: Error encoding delta table.\nThis 
was likely caused by a bug in DataFusion's code and we would welcome that you 
file an bug report in our issue tracker")) }
   
   Caused by:
       0: External error: Execution error: Fail to execute query due to 
ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse 
logical plan protobuf: Internal error: Error encoding delta table.\nThis was 
likely caused by a bug in DataFusion's code and we would welcome that you file 
an bug report in our issue tracker")) }
       1: Execution error: Fail to execute query due to 
ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse 
logical plan protobuf: Internal error: Error encoding delta table.\nThis was 
likely caused by a bug in DataFusion's code and we would welcome that you file 
an bug report in our issue tracker")) }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to