This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1b40ffae5 Review use of panic in datafusion-proto crate (#3365)
1b40ffae5 is described below
commit 1b40ffae5a606eb35f3f8686ec33668706b71411
Author: comphead <[email protected]>
AuthorDate: Thu Sep 8 12:43:25 2022 -0700
Review use of panic in datafusion-proto crate (#3365)
* removed panics from proto crate
* fixes
* fixed comments
* fix build
* fixed comments
* Fixing proto json
* formatting
* fix build
* addresed comments
---
datafusion/proto/build.rs | 13 +-
datafusion/proto/src/bytes/mod.rs | 3 +-
datafusion/proto/src/from_proto.rs | 13 +-
datafusion/proto/src/logical_plan.rs | 11 +-
datafusion/proto/src/to_proto.rs | 223 +++++++++++++++++------------------
5 files changed, 134 insertions(+), 129 deletions(-)
diff --git a/datafusion/proto/build.rs b/datafusion/proto/build.rs
index 7e59107ea..38ae9a962 100644
--- a/datafusion/proto/build.rs
+++ b/datafusion/proto/build.rs
@@ -32,7 +32,9 @@ fn main() -> Result<(), String> {
fn build() -> Result<(), String> {
use std::io::Write;
- let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
+ let out = std::path::PathBuf::from(
+ std::env::var("OUT_DIR").expect("Cannot find OUT_DIR environment
vairable"),
+ );
let descriptor_path = out.join("proto_descriptor.bin");
prost_build::Config::new()
@@ -42,10 +44,15 @@ fn build() -> Result<(), String> {
.compile_protos(&["proto/datafusion.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {}", e))?;
- let descriptor_set = std::fs::read(descriptor_path).unwrap();
+ let descriptor_set = std::fs::read(&descriptor_path)
+ .expect(&*format!("Cannot read {:?}", &descriptor_path));
+
pbjson_build::Builder::new()
.register_descriptors(&descriptor_set)
- .unwrap()
+ .expect(&*format!(
+ "Cannot register descriptors {:?}",
+ &descriptor_set
+ ))
.build(&[".datafusion"])
.map_err(|e| format!("pbjson compilation failed: {}", e))?;
diff --git a/datafusion/proto/src/bytes/mod.rs
b/datafusion/proto/src/bytes/mod.rs
index 6ddaecae0..1dd8a5c1a 100644
--- a/datafusion/proto/src/bytes/mod.rs
+++ b/datafusion/proto/src/bytes/mod.rs
@@ -137,7 +137,8 @@ pub fn logical_plan_to_bytes_with_extension_codec(
/// Deserialize a LogicalPlan from json
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) ->
Result<LogicalPlan> {
- let back: protobuf::LogicalPlanNode = serde_json::from_str(json).unwrap();
+ let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
+ .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{}", e)))?;
let extension_codec = DefaultExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec)
}
diff --git a/datafusion/proto/src/from_proto.rs
b/datafusion/proto/src/from_proto.rs
index 35112a966..e0db97c0c 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -378,11 +378,13 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan
{
plan_type: match stringified_plan
.plan_type
.as_ref()
- .unwrap()
- .plan_type_enum
- .as_ref()
- .unwrap()
- {
+ .and_then(|pt| pt.plan_type_enum.as_ref())
+ .unwrap_or_else(|| {
+ panic!(
+ "Cannot create protobuf::StringifiedPlan from {:?}",
+ stringified_plan
+ )
+ }) {
InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name
}) => {
PlanType::OptimizedLogicalPlan {
@@ -1341,6 +1343,7 @@ impl From<protobuf::IntervalUnit> for IntervalUnit {
}
}
+// panic here because no better way to convert from Vec to Array
fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
v.try_into().unwrap_or_else(|v: Vec<T>| {
panic!("Expected a Vec of length {} but it was {}", N, v.len())
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index 2c45591ff..3e6d36ab9 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -381,12 +381,12 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(..) =>
Arc::new(AvroFormat::default()),
};
- // let table_path = ListingTableUrl::parse(&scan.paths)?;
let table_paths = &scan
.paths
.iter()
- .map(|p| ListingTableUrl::parse(p).unwrap())
- .collect::<Vec<ListingTableUrl>>();
+ .map(ListingTableUrl::parse)
+ .collect::<Result<Vec<_>, _>>()?;
+
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
@@ -635,7 +635,10 @@ impl AsLogicalPlan for LogicalPlanNode {
)));
}
- let mut builder =
LogicalPlanBuilder::from(input_plans.pop().unwrap());
+ let first = input_plans.pop().ok_or_else(||
DataFusionError::Internal(String::from(
+ "Protobuf deserialization error, Union was require at
least two input.",
+ )))?;
+ let mut builder = LogicalPlanBuilder::from(first);
for plan in input_plans {
builder = builder.union(plan)?;
}
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index e64096a4e..ef7e5bf21 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -224,10 +224,12 @@ impl From<&DataType> for
protobuf::arrow_type::ArrowTypeEnum {
fractional: *fractional as u64,
}),
DataType::Decimal256(_, _) => {
- unimplemented!("The Decimal256 data type is not yet supported")
+ unimplemented!("Proto serialization error: The Decimal256 data
type is not yet supported")
}
DataType::Map(_, _) => {
- unimplemented!("The Map data type is not yet supported")
+ unimplemented!(
+ "Proto serialization error: The Map data type is not yet
supported"
+ )
}
}
}
@@ -612,7 +614,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr_type: Some(ExprType::AggregateExpr(aggregate_expr)),
}
}
- Expr::ScalarVariable(_, _) => unimplemented!(),
+ Expr::ScalarVariable(_, _) => return Err(Error::General("Proto
serialization error: Scalar Variable not supported".to_string())),
Expr::ScalarFunction { ref fun, ref args } => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let args: Vec<Self> = args
@@ -820,7 +822,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists {
.. } => {
// we would need to add logical plan operators to
datafusion.proto to support this
// see discussion in
https://github.com/apache/arrow-datafusion/issues/2565
- unimplemented!("subquery expressions are not supported yet")
+ return Err(Error::General("Proto serialization error:
Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists { .. } not
supported".to_string()))
}
Expr::GetIndexedField { key, expr } => Self {
expr_type: Some(ExprType::GetIndexedField(Box::new(
@@ -865,7 +867,8 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
})),
},
- Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } =>
unimplemented!(),
+ Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } =>
+ return Err(Error::General("Proto serialization error:
Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } not
supported".to_string())),
};
Ok(expr_node)
@@ -945,127 +948,115 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
Value::LargeUtf8Value(s.to_owned())
})
}
- scalar::ScalarValue::List(value, boxed_field) => {
- println!("Current field of list: {:?}", boxed_field);
- match value {
- Some(values) => {
- if values.is_empty() {
- protobuf::ScalarValue {
- value:
Some(protobuf::scalar_value::Value::ListValue(
- protobuf::ScalarListValue {
- field:
Some(boxed_field.as_ref().into()),
- values: Vec::new(),
- },
- )),
+ scalar::ScalarValue::List(value, boxed_field) => match value {
+ Some(values) => {
+ if values.is_empty() {
+ protobuf::ScalarValue {
+ value:
Some(protobuf::scalar_value::Value::ListValue(
+ protobuf::ScalarListValue {
+ field: Some(boxed_field.as_ref().into()),
+ values: Vec::new(),
+ },
+ )),
+ }
+ } else {
+ let scalar_type = match boxed_field.data_type() {
+ DataType::List(field) =>
field.as_ref().data_type(),
+ unsupported => {
+ return Err(Error::General(format!("Proto
serialization error: {:?} not supported to convert to DataType::List",
unsupported)));
}
- } else {
- let scalar_type = match boxed_field.data_type() {
- DataType::List(field) =>
field.as_ref().data_type(),
- unsupported => {
- todo!("Proper error handling {}",
unsupported)
- }
- };
- println!("Current scalar type for list: {:?}",
scalar_type);
+ };
- let type_checked_values:
Vec<protobuf::ScalarValue> = values
- .iter()
- .map(|scalar| match (scalar, scalar_type) {
- (
- scalar::ScalarValue::List(_,
list_type),
- DataType::List(field),
- ) => {
- if let DataType::List(list_field) =
- list_type.data_type()
+ let type_checked_values: Vec<protobuf::ScalarValue> =
values
+ .iter()
+ .map(|scalar| match (scalar, scalar_type) {
+ (
+ scalar::ScalarValue::List(_, list_type),
+ DataType::List(field),
+ ) => {
+ if let DataType::List(list_field) =
+ list_type.data_type()
+ {
+ let scalar_datatype =
field.data_type();
+ let list_datatype =
list_field.data_type();
+ if
std::mem::discriminant(list_datatype)
+ !=
std::mem::discriminant(scalar_datatype)
{
- let scalar_datatype =
field.data_type();
- let list_datatype =
list_field.data_type();
- if
std::mem::discriminant(list_datatype)
- !=
std::mem::discriminant(scalar_datatype)
- {
- return Err(
-
Error::inconsistent_list_typing(
- list_datatype,
- scalar_datatype,
- ),
- );
- }
- scalar.try_into()
- } else {
-
Err(Error::inconsistent_list_designated(
- scalar,
- boxed_field.data_type(),
- ))
+ return
Err(Error::inconsistent_list_typing(
+ list_datatype,
+ scalar_datatype,
+ ));
}
- }
- (
- scalar::ScalarValue::Boolean(_),
- DataType::Boolean,
- ) => scalar.try_into(),
- (
- scalar::ScalarValue::Float32(_),
- DataType::Float32,
- ) => scalar.try_into(),
- (
- scalar::ScalarValue::Float64(_),
- DataType::Float64,
- ) => scalar.try_into(),
- (scalar::ScalarValue::Int8(_),
DataType::Int8) => {
scalar.try_into()
+ } else {
+
Err(Error::inconsistent_list_designated(
+ scalar,
+ boxed_field.data_type(),
+ ))
}
- (scalar::ScalarValue::Int16(_),
DataType::Int16) => {
- scalar.try_into()
- }
- (scalar::ScalarValue::Int32(_),
DataType::Int32) => {
- scalar.try_into()
- }
- (scalar::ScalarValue::Int64(_),
DataType::Int64) => {
- scalar.try_into()
- }
- (scalar::ScalarValue::UInt8(_),
DataType::UInt8) => {
- scalar.try_into()
- }
- (
- scalar::ScalarValue::UInt16(_),
- DataType::UInt16,
- ) => scalar.try_into(),
- (
- scalar::ScalarValue::UInt32(_),
- DataType::UInt32,
- ) => scalar.try_into(),
- (
- scalar::ScalarValue::UInt64(_),
- DataType::UInt64,
- ) => scalar.try_into(),
- (scalar::ScalarValue::Utf8(_),
DataType::Utf8) => {
- scalar.try_into()
- }
- (
- scalar::ScalarValue::LargeUtf8(_),
- DataType::LargeUtf8,
- ) => scalar.try_into(),
- _ =>
Err(Error::inconsistent_list_designated(
- scalar,
- boxed_field.data_type(),
- )),
- })
- .collect::<Result<Vec<_>, _>>()?;
- protobuf::ScalarValue {
- value:
Some(protobuf::scalar_value::Value::ListValue(
- protobuf::ScalarListValue {
- field:
Some(boxed_field.as_ref().into()),
- values: type_checked_values,
- },
+ }
+ (scalar::ScalarValue::Boolean(_),
DataType::Boolean) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Float32(_),
DataType::Float32) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Float64(_),
DataType::Float64) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Int8(_), DataType::Int8)
=> {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Int16(_),
DataType::Int16) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Int32(_),
DataType::Int32) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Int64(_),
DataType::Int64) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::UInt8(_),
DataType::UInt8) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::UInt16(_),
DataType::UInt16) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::UInt32(_),
DataType::UInt32) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::UInt64(_),
DataType::UInt64) => {
+ scalar.try_into()
+ }
+ (scalar::ScalarValue::Utf8(_), DataType::Utf8)
=> {
+ scalar.try_into()
+ }
+ (
+ scalar::ScalarValue::LargeUtf8(_),
+ DataType::LargeUtf8,
+ ) => scalar.try_into(),
+ _ => Err(Error::inconsistent_list_designated(
+ scalar,
+ boxed_field.data_type(),
)),
- }
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ protobuf::ScalarValue {
+ value:
Some(protobuf::scalar_value::Value::ListValue(
+ protobuf::ScalarListValue {
+ field: Some(boxed_field.as_ref().into()),
+ values: type_checked_values,
+ },
+ )),
}
}
- None => protobuf::ScalarValue {
- value:
Some(protobuf::scalar_value::Value::NullListValue(
- boxed_field.as_ref().try_into()?,
- )),
- },
}
- }
+ None => protobuf::ScalarValue {
+ value: Some(protobuf::scalar_value::Value::NullListValue(
+ boxed_field.as_ref().try_into()?,
+ )),
+ },
+ },
datafusion::scalar::ScalarValue::Date32(val) => {
create_proto_scalar(val, PrimitiveScalarType::Date32, |s| {
Value::Date32Value(*s)