alamb commented on code in PR #19666:
URL: https://github.com/apache/datafusion/pull/19666#discussion_r2668090587
##########
datafusion/sqllogictest/src/engines/postgres_engine/mod.rs:
##########
@@ -299,8 +297,15 @@ impl sqllogictest::AsyncDB for Postgres {
self.pb.inc(1);
return Ok(DBOutput::StatementComplete(0));
}
+ let statement = self.get_client().prepare(sql).await?;
+ let types: Vec<Type> = statement
+ .columns()
+ .iter()
+ .map(|c| c.type_().clone())
+ .collect();
+
let start = Instant::now();
- let rows = self.get_client().query(sql, &[]).await?;
+ let messages = self.get_client().simple_query(sql).await?;
Review Comment:
I played around with this more -- the reason there are two queries now is
that the "simple_query" API returns only text (no types) but the sqllogictest
harness needs to get type information
However, if we use the prepared statement API that returns types, we must
also then have code to parse the postgres binary numeric representation (what I
think we are using the rust_decimal crate for).
On the balance, I think planning the query twice, while it does have
additional overhead, is better than a bunch of custom numeric parsing code.
Given the small number of queries that are run in this mode I suspect we won't
be able to even tell the difference.
I will add comments explaining this rationale
In case we want to avoid the second planning, here is the code that I wrote
with codex (I am not 100% sure it correctly implements numeric parsing):
```rust
fn convert_rows(rows: &[Row]) -> Vec<Vec<String>> {
rows.iter()
.map(|row| {
row.columns()
.iter()
.enumerate()
.map(|(idx, column)| cell_to_string(row, column, idx))
.collect::<Vec<String>>()
})
.collect::<Vec<_>>()
}
fn cell_to_string(row: &Row, column: &Column, idx: usize) -> String {
// Decode binary values by Postgres type to keep normalization aligned
with
// the DataFusion engine output.
match column.type_().clone() {
Type::CHAR => {
let value: Option<i8> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::INT2 => {
let value: Option<i16> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::INT4 => {
let value: Option<i32> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::INT8 => {
let value: Option<i64> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::NUMERIC => {
let value: Option<PgNumeric> = row.get(idx);
match value {
Some(PgNumeric(PgNumericValue::Number(value))) =>
decimal_to_str(value),
Some(PgNumeric(PgNumericValue::NaN)) => "NaN".to_string(),
None => NULL_STR.to_string(),
}
}
Type::DATE => {
let value: Option<NaiveDate> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::TIME => {
let value: Option<NaiveTime> = row.get(idx);
value
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::TIMESTAMP => {
let value: Option<NaiveDateTime> = row.get(idx);
value
.map(|value| format!("{value:?}"))
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::BOOL => {
let value: Option<bool> = row.get(idx);
value.map(bool_to_str).unwrap_or_else(|| NULL_STR.to_string())
}
Type::BPCHAR | Type::VARCHAR | Type::TEXT => {
let value: Option<&str> = row.get(idx);
value
.map(varchar_to_str)
.unwrap_or_else(|| NULL_STR.to_string())
}
Type::FLOAT4 => {
let value: Option<f32> = row.get(idx);
value.map(f32_to_str).unwrap_or_else(|| NULL_STR.to_string())
}
Type::FLOAT8 => {
let value: Option<f64> = row.get(idx);
value.map(f64_to_str).unwrap_or_else(|| NULL_STR.to_string())
}
Type::REGTYPE => {
let value: Option<u32> = row.get(idx);
value
.and_then(Type::from_oid)
.map(|value| value.to_string())
.unwrap_or_else(|| NULL_STR.to_string())
}
_ => unimplemented!("Unsupported type: {}", column.type_().name()),
}
}
/// Minimal `NUMERIC` decoder for Postgres' binary format.
///
/// We use it in sqllogictest to avoid pulling in `rust_decimal` while still
/// converting `NUMERIC` values into `BigDecimal` for normalized output.
///
/// Verification:
/// - The binary layout and field meanings come from the Postgres protocol
docs:
///
https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-NUMERIC
/// - You can sanity-check correctness by comparing decoded values against
the
/// server's text format, e.g. run `SELECT value::numeric, value::text` in
/// Postgres and ensure this parser produces the same normalized string.
enum PgNumericValue {
Number(BigDecimal),
NaN,
}
/// Wrapper type that implements `FromSql` for Postgres `NUMERIC` values.
///
/// This is intentionally scoped to the sqllogictest engine so we can parse
/// binary `NUMERIC` results in a single query without extra dependencies.
struct PgNumeric(PgNumericValue);
impl<'a> postgres_types::FromSql<'a> for PgNumeric {
fn from_sql(
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
if *ty != Type::NUMERIC {
return Err("invalid type".into());
}
let mut buf = raw;
let ndigits = read_i16(&mut buf)? as i64;
let weight = read_i16(&mut buf)? as i64;
let sign = read_i16(&mut buf)?;
let _dscale = read_i16(&mut buf)?;
const NUMERIC_POS: i16 = 0x0000;
const NUMERIC_NEG: i16 = 0x4000;
const NUMERIC_NAN: i16 = 0xC000u16 as i16;
if sign == NUMERIC_NAN {
return Ok(PgNumeric(PgNumericValue::NaN));
}
let mut bigint = BigInt::from(0);
for _ in 0..ndigits {
let digit = read_i16(&mut buf)? as i64;
bigint = bigint * 10000 + digit;
}
if sign == NUMERIC_NEG {
bigint = -bigint;
} else if sign != NUMERIC_POS {
return Err(format!("invalid numeric sign {sign}").into());
}
let scale = if ndigits == 0 {
0
} else {
(ndigits - (weight + 1)) * 4
};
let value = BigDecimal::from_bigint(bigint, scale);
Ok(PgNumeric(PgNumericValue::Number(value)))
}
fn accepts(ty: &Type) -> bool {
*ty == Type::NUMERIC
}
}
fn read_i16(buf: &mut &[u8]) -> Result<i16, Box<dyn std::error::Error + Sync
+ Send>> {
if buf.len() < 2 {
return Err("invalid buffer size".into());
}
let val = i16::from_be_bytes([buf[0], buf[1]]);
*buf = &buf[2..];
Ok(val)
}
fn convert_types(types: Vec<Type>) -> Vec<DFColumnType> {
types
.into_iter()
.map(|t| match t {
Type::BOOL => DFColumnType::Boolean,
Type::INT2 | Type::INT4 | Type::INT8 => DFColumnType::Integer,
Type::BPCHAR | Type::VARCHAR | Type::TEXT => DFColumnType::Text,
Type::FLOAT4 | Type::FLOAT8 | Type::NUMERIC =>
DFColumnType::Float,
Type::DATE | Type::TIME => DFColumnType::DateTime,
Type::TIMESTAMP => DFColumnType::Timestamp,
_ => DFColumnType::Another,
})
.collect()
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]