This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch maint-1.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit d9166a38dc3b323365774dd78e53a89a9f8c8d06 Author: Benjamin Kietzman <[email protected]> AuthorDate: Tue Aug 4 12:21:30 2020 -0400 ARROW-9609: [C++][Dataset] CsvFileFormat reads all virtual columns as null `ConvertOptions::include_missing_columns = true` was insufficient to produce the required behavior with missing columns: we need to read the csv file's header to find the names of columns actually present in the file before instantiating a StreamingReader. Otherwise the StreamingReader will fill absent columns with `null`, which prevents the projector from materializing them correctly later. Closes #7896 from bkietz/9609-csv-empty-virtual Authored-by: Benjamin Kietzman <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]> --- cpp/src/arrow/compute/kernels/scalar_string.cc | 4 +- cpp/src/arrow/dataset/file_csv.cc | 93 +++++++++++++++++++------- cpp/src/arrow/dataset/file_csv_test.cc | 17 +++++ cpp/src/arrow/dataset/file_ipc_test.cc | 18 +++++ r/tests/testthat/test-dataset.R | 17 +++++ 5 files changed, 123 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_string.cc b/cpp/src/arrow/compute/kernels/scalar_string.cc index 0d6b8da..a0ea240 100644 --- a/cpp/src/arrow/compute/kernels/scalar_string.cc +++ b/cpp/src/arrow/compute/kernels/scalar_string.cc @@ -861,10 +861,10 @@ void AddBinaryLength(FunctionRegistry* registry) { applicator::ScalarUnaryNotNull<Int32Type, StringType, BinaryLength>::Exec; ArrayKernelExec exec_offset_64 = applicator::ScalarUnaryNotNull<Int64Type, LargeStringType, BinaryLength>::Exec; - for (const auto input_type : {binary(), utf8()}) { + for (const auto& input_type : {binary(), utf8()}) { DCHECK_OK(func->AddKernel({input_type}, int32(), exec_offset_32)); } - for (const auto input_type : {large_binary(), large_utf8()}) { + for (const auto& input_type : {large_binary(), large_utf8()}) { DCHECK_OK(func->AddKernel({input_type}, int64(), exec_offset_64)); } DCHECK_OK(registry->AddFunction(std::move(func))); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index f077ea2..3df9fa8 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -20,9 +20,11 @@ #include <algorithm> #include <memory> #include <string> +#include <unordered_set> #include <utility> #include "arrow/csv/options.h" +#include "arrow/csv/parser.h" #include "arrow/csv/reader.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" @@ -39,49 +41,92 @@ namespace dataset { using internal::checked_cast; using internal::checked_pointer_cast; +Result<std::unordered_set<std::string>> GetColumnNames( + const csv::ParseOptions& parse_options, util::string_view first_block, + MemoryPool* pool) { + uint32_t parsed_size = 0; + csv::BlockParser parser(pool, parse_options, /*num_cols=*/-1, + /*max_num_rows=*/1); + + RETURN_NOT_OK(parser.Parse(util::string_view{first_block}, &parsed_size)); + + if (parser.num_rows() != 1) { + return Status::Invalid( + "Could not read first row from CSV file, either " + "file is truncated or header is larger than block size"); + } + + if (parser.num_cols() == 0) { + return Status::Invalid("No columns in CSV file"); + } + + std::unordered_set<std::string> column_names; + + RETURN_NOT_OK( + parser.VisitLastRow([&](const uint8_t* data, uint32_t size, bool quoted) -> Status { + util::string_view view{reinterpret_cast<const char*>(data), size}; + if (column_names.emplace(view.to_string()).second) { + return Status::OK(); + } + return Status::Invalid("CSV file contained multiple columns named ", view); + })); + + return column_names; +} + static inline Result<csv::ConvertOptions> GetConvertOptions( - const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options) { - auto options = csv::ConvertOptions::Defaults(); - if (scan_options != nullptr) { - // This is set to true to match behavior with other formats; a missing column - // will be materialized as null. - options.include_missing_columns = true; - - for (const auto& field : scan_options->schema()->fields()) { - options.column_types[field->name()] = field->type(); - options.include_columns.push_back(field->name()); - } + const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options, + const Buffer& first_block, MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE( + auto column_names, + GetColumnNames(format.parse_options, util::string_view{first_block}, pool)); + + auto convert_options = csv::ConvertOptions::Defaults(); + + for (const auto& field : scan_options->schema()->fields()) { + if (column_names.find(field->name()) == column_names.end()) continue; + convert_options.column_types[field->name()] = field->type(); + convert_options.include_columns.push_back(field->name()); + } - // FIXME(bkietz) also acquire types of fields materialized but not projected. - for (auto&& name : FieldsInExpression(scan_options->filter)) { - ARROW_ASSIGN_OR_RAISE(auto match, - FieldRef(name).FindOneOrNone(*scan_options->schema())); - if (match.indices().empty()) { - options.include_columns.push_back(std::move(name)); - } + // FIXME(bkietz) also acquire types of fields materialized but not projected. + for (auto&& name : FieldsInExpression(scan_options->filter)) { + ARROW_ASSIGN_OR_RAISE(auto match, + FieldRef(name).FindOneOrNone(*scan_options->schema())); + if (match.indices().empty()) { + convert_options.include_columns.push_back(std::move(name)); } } - return options; + return convert_options; } static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { - auto options = csv::ReadOptions::Defaults(); + auto read_options = csv::ReadOptions::Defaults(); // Multithreaded conversion of individual files would lead to excessive thread // contention when ScanTasks are also executed in multiple threads, so we disable it // here. - options.use_threads = false; - return options; + read_options.use_threads = false; + return read_options; } static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader( const FileSource& source, const CsvFileFormat& format, - const std::shared_ptr<ScanOptions>& options = nullptr, + const std::shared_ptr<ScanOptions>& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); auto reader_options = GetReadOptions(format); + ARROW_ASSIGN_OR_RAISE(auto first_block, input->ReadAt(0, reader_options.block_size)); + RETURN_NOT_OK(input->Seek(0)); + const auto& parse_options = format.parse_options; - ARROW_ASSIGN_OR_RAISE(auto convert_options, GetConvertOptions(format, options)); + + ARROW_ASSIGN_OR_RAISE( + auto convert_options, + scan_options == nullptr + ? ToResult(csv::ConvertOptions::Defaults()) + : GetConvertOptions(format, scan_options, *first_block, pool)); + auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options, parse_options, convert_options); if (!maybe_reader.ok()) { diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 9c54eb4..5c189f0 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -83,6 +83,23 @@ TEST_F(TestCsvFileFormat, ScanRecordBatchReader) { ASSERT_EQ(row_count, 3); } +TEST_F(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { + auto source = GetFileSource(); + + opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())})); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + int64_t row_count = 0; + + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + AssertSchemaEqual(*batch->schema(), *schema_); + row_count += batch->num_rows(); + } + + ASSERT_EQ(row_count, 3); +} + TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) { auto source = GetFileSource(""); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"), diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 747dccf..c557621 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -128,6 +128,24 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { ASSERT_EQ(row_count, kNumRows); } +TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { + auto reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + + opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())})); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + int64_t row_count = 0; + + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + AssertSchemaEqual(*batch->schema(), *schema_); + row_count += batch->num_rows(); + } + + ASSERT_EQ(row_count, kNumRows); +} + TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { std::shared_ptr<RecordBatchReader> reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index f806866..e78dfd3 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -112,6 +112,12 @@ test_that("Simple interface for datasets", { filter(integer > 6) %>% summarize(mean = mean(integer)) ) + + # Collecting virtual partition column works + expect_equal( + collect(ds) %>% pull(part), + c(rep(1, 10), rep(2, 10)) + ) }) test_that("dim method returns the correct number of rows and columns",{ @@ -219,6 +225,12 @@ test_that("IPC/Feather format data", { filter(integer > 6) %>% summarize(mean = mean(integer)) ) + + # Collecting virtual partition column works + expect_equal( + collect(ds) %>% pull(part), + c(rep(3, 10), rep(4, 10)) + ) }) test_that("CSV dataset", { @@ -239,6 +251,11 @@ test_that("CSV dataset", { filter(integer > 6) %>% summarize(mean = mean(integer)) ) + # Collecting virtual partition column works + expect_equal( + collect(ds) %>% pull(part), + c(rep(5, 10), rep(6, 10)) + ) }) test_that("Other text delimited dataset", {
