slbotbm commented on code in PR #3196:
URL: https://github.com/apache/iggy/pull/3196#discussion_r3242468559


##########
core/connectors/sources/postgres_source/src/lib.rs:
##########
@@ -1026,6 +1172,50 @@ fn format_offset_value(value: &str) -> String {
     }
 }
 
+fn format_pg_interval(interval: &PgInterval) -> String {
+    let mut parts = Vec::new();
+
+    let years = interval.months / 12;
+    let months = interval.months % 12;
+
+    if years != 0 {
+        parts.push(format!(
+            "{years} year{}",
+            if years.abs() != 1 { "s" } else { "" }
+        ));
+    }
+    if months != 0 {
+        parts.push(format!(
+            "{months} mon{}",
+            if months.abs() != 1 { "s" } else { "" }
+        ));
+    }
+    if interval.days != 0 {
+        parts.push(format!(
+            "{} day{}",
+            interval.days,
+            if interval.days.abs() != 1 { "s" } else { "" }
+        ));
+    }
+    if interval.microseconds != 0 || parts.is_empty() {
+        let total_secs = interval.microseconds / 1_000_000;
+        let remaining_us = (interval.microseconds % 1_000_000).unsigned_abs();
+        let hours = total_secs / 3600;
+        let mins = (total_secs % 3600) / 60;
+        let secs = total_secs % 60;
+        if remaining_us != 0 {
+            parts.push(format!(
+                "{:02}:{:02}:{:02}.{:06}",
+                hours, mins, secs, remaining_us
+            ));

Review Comment:
   This part formats negative numbers incorrectly. For example, the above code 
would convert `-1.5 seconds` into `00:00:-1.500000`



##########
core/connectors/sources/postgres_source/src/lib.rs:
##########
@@ -911,7 +913,7 @@ impl PostgresSource {
                     .map(|v| serde_json::Value::from(v as i64))
                     .unwrap_or(serde_json::Value::Null))
             }
-            "INT4" => {
+            "INT4" | "OID" => {
                 let value: Option<i32> = row
                     .try_get(column_index)
                     .map_err(|_| Error::InvalidRecord)?;

Review Comment:
   the code current treats `OID` as i32, but in postgres, that is u32, which 
can lead batch processing failures. I feel decoding this to an Oid type would 
be a good approach. 
   
   I also noticed you're decoding individual `OID`'s, but not but not `Vec<OID>`



##########
core/connectors/sources/postgres_source/src/lib.rs:
##########
@@ -952,14 +954,30 @@ impl PostgresSource {
                     .map(serde_json::Value::from)
                     .unwrap_or(serde_json::Value::Null))
             }
-            "VARCHAR" | "TEXT" | "CHAR" => {
+            "VARCHAR" | "TEXT" | "CHAR" | "NAME" | "BPCHAR" => {
                 let value: Option<String> = row
                     .try_get(column_index)
                     .map_err(|_| Error::InvalidRecord)?;
                 Ok(value
                     .map(serde_json::Value::String)
                     .unwrap_or(serde_json::Value::Null))
             }
+            "DATE" => {
+                let value: Option<NaiveDate> = row
+                    .try_get(column_index)
+                    .map_err(|_| Error::InvalidRecord)?;
+                Ok(value
+                    .map(|d| serde_json::Value::String(d.to_string()))
+                    .unwrap_or(serde_json::Value::Null))
+            }
+            "TIME" => {
+                let value: Option<NaiveTime> = row
+                    .try_get(column_index)
+                    .map_err(|_| Error::InvalidRecord)?;
+                Ok(value
+                    .map(|t| serde_json::Value::String(t.to_string()))
+                    .unwrap_or(serde_json::Value::Null))
+            }
             "TIMESTAMP" | "TIMESTAMPTZ" => {
                 let value: Option<DateTime<Utc>> = row
                     .try_get(column_index)

Review Comment:
   though not related to your changes, in sqlx, `TIMESTAMP` is 
`chrono::NaiveDateTime`, while `TIMESTAMPZ` is `chrono::DateTime<Utc>`. If the 
user passes `TIMESTAMP`, the conversion will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to