jayzhan211 opened a new issue, #13673:
URL: https://github.com/apache/datafusion/issues/13673

   ### Describe the bug
   
   The input schema and schema for projection are different in HashJoin cases 
because we might add additional alias project for hash join. If the schema has 
no *additional projected column* that the input schema has, it is possible to 
cause schema mismatch. In `stats_projection()`, because we ignore the error so 
we don't encounter such issue but I think we should not eat the error but fix 
the schema and column index instead.
   
   
https://github.com/apache/datafusion/blob/2464703c84c400a09cc59277018813f0e797bb4e/datafusion/physical-plan/src/projection.rs#L278-L283
   
   
   ### To Reproduce
   
   Take this query for example,
   
   ```
   statement count 0
   create table t1(name varchar, id int) as values ('df', 1), ('rs', 2), ('po', 
3);
   
   statement ok
   create table t2(product varchar, sid bigint) as values ('er', 1), ('sd', 2), 
('ge', 3);
   
   query TT
   select t1.name, t2.product from t1 inner join t2 on t1.id = t2.sid;
   ----
   df er
   rs sd
   po ge
   ```
   
   If we add the code to `ProjectExec::try_new()`
   ```rust
           // Construct a map from the input expressions to the output 
expression of the Projection
           let projection_mapping = ProjectionMapping::try_new(&expr, 
&input_schema)?;
           let cache =
               Self::compute_properties(&input, &projection_mapping, 
Arc::clone(&schema))?;
           
           println!("schema: {:?}", schema);
           println!("input schema: {:?}", input_schema);
           for (e, _) in expr.iter() {
               // panic here
               let _dt = e.data_type(&schema).unwrap();
           }
   
           Ok(Self {
               expr,
               schema,
               input,
               metrics: ExecutionPlanMetricsSet::new(),
               cache,
           })
   ```
   
   we can see that `input_schema` has additional column `CAST(t1.id AS Int64)`
   ```
   Schema {
     fields: [
       Field {
         name: "name",
         data_type: Utf8,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "id",
         data_type: Int32,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "product",
         data_type: Utf8,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "sid",
         data_type: Int64,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       }
     ],
     metadata: {}
   }
   
   Input Schema {
     fields: [
       Field {
         name: "name",
         data_type: Utf8,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "id",
         data_type: Int32,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "CAST(t1.id AS Int64)",
         data_type: Int64,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "product",
         data_type: Utf8,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       },
       Field {
         name: "sid",
         data_type: Int64,
         nullable: true,
         dict_id: 0,
         dict_is_ordered: false,
         metadata: {}
       }
     ],
     metadata: {}
   }
   
   ```
   
   If we use the `schema` for projection, we have the column mismatch index 
that could cause Column's `bounds_check`.
   
   The error is because of the out of bound index. `sid` is created as index 4 
for `input_schema` but the schema we used in projection has only 4 columns.
   ```
   thread 'tokio-runtime-worker' panicked at 
datafusion/physical-plan/src/projection.rs:102:44:
   called `Result::unwrap()` on an `Err` value: Internal("PhysicalExpr Column 
references column 'sid' at index 4 (zero-based) but input schema only has 4 
columns: [\"name\", \"id\", \"product\", \"sid\"]")
   ```
   
   ### Expected behavior
   
   I think we might either recreate expressions with the new index that match 
the `schema` or use the `input_schema` for projection. I'm not sure which is 
the right choice yet.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to