ariel-miculas commented on code in PR #19687:
URL: https://github.com/apache/datafusion/pull/19687#discussion_r2832884751
##########
datafusion/datasource-json/src/source.rs:
##########
@@ -188,23 +187,59 @@ impl FileOpener for JsonOpener {
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
- let calculated_range =
- calculate_range(&partitioned_file, &store, None).await?;
+ let file_size = partitioned_file.object_meta.size;
+ let location = &partitioned_file.object_meta.location;
- let range = match calculated_range {
- RangeCalculation::Range(None) => None,
- RangeCalculation::Range(Some(range)) => Some(range.into()),
- RangeCalculation::TerminateEarly => {
+ let file_range = if file_compression_type.is_compressed() {
+ None
+ } else {
+ partitioned_file.range.clone()
+ };
+
+ if let Some(file_range) = file_range.as_ref() {
+ let raw_start = u64::try_from(file_range.start).map_err(|_| {
+ DataFusionError::Internal(
+ "file range start must be non-negative".to_string(),
+ )
+ })?;
+ let raw_end = u64::try_from(file_range.end).map_err(|_| {
+ DataFusionError::Internal(
+ "file range end must be non-negative".to_string(),
+ )
+ })?;
+ let aligned_bytes = get_aligned_bytes(
+ &store,
+ location,
+ raw_start,
+ raw_end,
+ file_size,
+ b'\n',
+ DEFAULT_BOUNDARY_WINDOW,
+ )
+ .await?;
+
+ let Some(bytes) = aligned_bytes else {
+ return Ok(
+ futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
+ );
+ };
+
+ if bytes.is_empty() {
return Ok(
futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
);
}
- };
- let options = GetOptions {
- range,
- ..Default::default()
- };
+ let reader = ReaderBuilder::new(schema)
Review Comment:
using `get_range` in `boundary_utils.rs` means all the data will be read
eagerly instead of relying on the FileStream implementation that polls the
bytes from the stream
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]