Please find the sample source code (fully working, except for aws keys and
s3 paths for parquet files)

#include <iostream>

#include <memory>
#include <string>
#include <vector>
#include <arrow/io/interfaces.h>
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <boost/timer/timer.hpp>
#include <arrow/filesystem/s3fs.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3ServiceClientModel.h>
#include <aws/s3/model/Bucket.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/Object.h>
#include <aws/s3/model/PutObjectRequest.h>
#if 0
#include <Poco/Environment.h>
#include <Poco/Util/IniFileConfiguration.h>
#endif

using namespace std;

::arrow::fs::S3Options
getS3Options(const std::string& pAccessKey, const std::string& pSecretKey)
{
    auto status = arrow::fs::EnsureS3Initialized();
    if (!status.ok()) {
        std::cerr << "Unable to initialize s3 api: " << status.message() <<
std::endl;
    }

    auto s3Options = ::arrow::fs::S3Options::FromAccessKey(pAccessKey,
pSecretKey);

    s3Options.connect_timeout = 30.0;
    s3Options.request_timeout = 60.0;
    s3Options.region          = "us-east-1"s;

    return s3Options;
}

::arrow::fs::S3Options
// std::shared_ptr<Aws::S3::S3Client>
initAws()
{
#if 0
    auto home            = Poco::Environment::get("HOME");
    auto credentialsFile = home + "/.aws/credentials";
    auto conf = Poco::AutoPtr<Poco::Util::IniFileConfiguration>(new
Poco::Util::IniFileConfiguration(credentialsFile));

    auto accessKey = conf->getString("default.aws_access_key_id");
    auto secretKey = conf->getString("default.aws_secret_access_key");
#endif

    // populate aws access and secret keys
    std::string accessKey, secretKey;
    return getS3Options(accessKey, secretKey);
}

std::unique_ptr<parquet::arrow::FileReader>
getParquetReader(std::shared_ptr<arrow::fs::S3FileSystem> fileSystem, const
std::string& pFileName)
{
    std::unique_ptr<parquet::arrow::FileReader> parquetReader;
    auto                                        readRes =
fileSystem->OpenInputFile(pFileName);
    if (!readRes.ok()) {
        std::cerr << "Error 1: " << readRes.status().message() << std::endl;
        return nullptr;
    }

    auto&                               readableFile = *readRes;
    ::parquet::arrow::FileReaderBuilder builder;
    auto                                status = builder.Open(readableFile);

    auto props = ::parquet::ArrowReaderProperties(true);
    props.set_pre_buffer(true);
    props.set_batch_size(1024 * 1024);
    status = builder.properties(props)->Build(&parquetReader);
    if (!status.ok()) {
        std::cerr << "Error 2: " << status.message() << std::endl;
        return nullptr;
    }
    return std::move(parquetReader);
}

int
main(void)
{
    Aws::SDKOptions options;
    Aws::InitAPI(options);

    auto fns = std::vector<std::string>{"bucket/small_file_key.parquet"s,
"bucket/large_file_key.parquet"s};

    auto s3Options = initAws();
    auto res       = arrow::fs::S3FileSystem::Make(s3Options);
    for (auto&& fn : fns) {
        auto reader = getParquetReader(*res, fn);
        if (reader) {
            boost::timer::cpu_timer       cpu_timer;
            std::shared_ptr<arrow::Table> ret;
            auto                          status = reader->ReadTable(&ret);
            std::cout << "Time taken for " << fn << " is " <<
cpu_timer.format() << std::endl;
        } else {
            std::cerr << "No read happened" << std::endl;
        }
    }

    static_cast<void>(arrow::fs::FinalizeS3());
    Aws::ShutdownAPI(options);
    return 0;
}

CMakeLists.txt file

cmake_minimum_required(VERSION 3.24.0)
cmake_policy(SET CMP0003 NEW)
cmake_policy(SET CMP0074 NEW)
cmake_policy(SET CMP0012 NEW)
project(aws_test LANGUAGES C CXX)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(Poco COMPONENTS Foundation Util)
find_package(aws-cpp-sdk-core)
find_package(aws-cpp-sdk-s3 REQUIRED)
find_package(Boost COMPONENTS timer)
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)
find_package(ArrowAcero REQUIRED)
find_package(ArrowDataset REQUIRED)

add_executable(aws-test main.cpp)

target_link_libraries(aws-test aws-cpp-sdk-core aws-cpp-sdk-s3
  Arrow::arrow_shared ArrowAcero::arrow_acero_shared
Parquet::parquet_shared Poco::Foundation Poco::Util Boost::timer
)


Here I've tested with two files
bucket/small_file_key.parquet is parquet file of size 206kb (1048576 rows,
570 columns)
and
bucket/large_file_key.parquet is parquet file of size 89.9 MB (1048576
rows, 570 columns)

(rows and columns are same in each file, but the data in them is different)

When timed with boost::cpu timer, I got these times.
--------
with arrow 12.0.1,
aws-1.11.107 (cpp sdk)

gcc-12

Time taken for bucket/small_file_key.parquet is  0.619616s wall, 2.290000s
user + 4.180000s system = 6.470000s CPU (1044.2%)
Time taken for bucket/large_file_key.parquet is  63.701571s wall, 3.730000s
user + 5.070000s system = 8.800000s CPU (13.8%)
--------
with arrow 16.1.0,
aws-1.11.316 (cpp sdk)

gcc-13
Time taken for bucket/small_file.parquet is  0.890000s wall, 2.920000s user
+ 3.760000s system = 6.680000s CPU (750.6%)
Time taken for bucket/large_file.parquet is  119.010000s wall, 4.050000s
user + 4.530000s system = 8.580000s CPU (7.2%)
---------

There were times where the large file took more than 3mins.

Thanks,
Surya




On Thu, Nov 28, 2024 at 6:48 PM Surya Kiran Gullapalli <
suryakiran.gullapa...@gmail.com> wrote:

> Thanks for the quick response.
> When the file sizes are small (less than 10MB), I'm not seeing much
> difference (not noticeable). But beyond that I'm seeing difference. I'll
> send a snippet in due course.
>
> Surya
>
> On Thu, Nov 28, 2024 at 6:37 PM Raúl Cumplido <raulcumpl...@gmail.com>
> wrote:
>
>> Thanks for raising the issue.
>>
>> Could you share a snippet of the code you are using on how are you
>> reading the file?
>> Is your decrease on performance also happening with different file-sizes
>> or is the file-size related to your issue?
>>
>> Thanks,
>>
>> Raúl
>>
>> El jue, 28 nov 2024, 13:58, Surya Kiran Gullapalli <
>> suryakiran.gullapa...@gmail.com> escribió:
>>
>>> Hello all,
>>> Trying to read a parquet file from s3 (50MB file) and it is taking much
>>> more time than arrow 12.0.1. I've enabled threads (use_threads=true) and
>>> batch size is set to 1024*1024. Also set the IOThreadPoolCapacity to 32.
>>>
>>> When I time the parquet read from s3 using boost timer shows cpu usage
>>> for file read is 2-5%. And I think multithreaded reading was not happening.
>>>
>>> Reading same parquet file from local disk is fine. And reading the same
>>> parquet file from s3 using arrow 12 is also fine. Am I missing any setting
>>> related to reading parquet with threads or any aws setting ?
>>>
>>> This is the setting:
>>> C++
>>> Apache arrow 16.1
>>> Ubuntu linux 22.04
>>> gcc-13.2
>>>
>>> Thanks,
>>> Surya
>>>
>>>
>>>

Reply via email to