Could be a number of reasons

First test reading the file with a cli

aws s3 cp s3a://input/testfile.csv .
cat testfile.csv


Try this code with debug option to diagnose the problem

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

try:
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("S3ReadTest") \
        .config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.6") \
        .config("spark.hadoop.fs.s3a.access.key", "R*************6") \
        .config("spark.hadoop.fs.s3a.secret.key", "1***************e") \
        .config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()

    # Read the CSV file from S3
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("delimiter", " ") \  # ensure this is apace
        .csv("s3a://input/testfile.csv")

    # Show the data
    df.show(n=1)

except AnalysisException as e:
    print(f"AnalysisException: {e}")
except Exception as e:
    print(f"Error: {e}")
finally:
    # Stop the Spark session
    spark.stop()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh <mosayyebza...@gmail.com>
wrote:

> I am trying to read an s3 object from a local S3 storage (Ceph based)
> using Spark 3.5.1. I see it can access the bucket and list the files (I
> have verified it on Ceph side by checking its logs), even returning the
> correct size of the object. But the content is not read.
>
> The object url is:
> s3a://input/testfile.csv (I have also tested a nested bucket:
> s3a://test1/test2/test3/testfile.csv)
>
>
> Object's content:
>
> =====================
> name int1 int2
> first 1 2
> second 3 4
> =====================
>
>
> Here is the config I have set so far:
>
> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
> ("spark.hadoop.fs.s3a.access.key", "R*************6")
> ("spark.hadoop.fs.s3a.secret.key", "1***************e")
> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
> ("spark.hadoop.fs.s3a.path.style.access", "true")
> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>
>
> The outop for my following Pyspark application:
> df = spark.read \
>     .option("header", "true") \
>     .schema(schema) \
>     .csv("s3a://input/testfile.csv", sep=' ')
>
> df.show(n=1)
> ==================================
> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system 
> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming file 
> log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO 
> FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO 
> FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO FileSourceStrategy: 
> Post-Scan Filters:24/05/20 02:35:03 INFO CodeGenerator: Code generated in 
> 176.139675 ms24/05/20 02:35:03 INFO MemoryStore: Block broadcast_0 stored as 
> values in memory (estimated size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 
> INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory 
> (estimated size 54.4 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO 
> BlockManagerInfo: Added broadcast_0_piece0 in memory on master:38197 (size: 
> 54.4 KiB, free: 4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created 
> broadcast 0 from showString at NativeMethodAccessorImpl.java:024/05/20 
> 02:35:03 INFO FileSourceScanExec: Planning scan with bin packing, max size: 
> 4194304 bytes, open cost is considered as scanning 4194304 bytes.
> +----+----+----+
> |name|int1|int2|
> +----+----+----+
> +----+----+----+
> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown 
> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with 
> exitCode 0
> =========================================
>
> Am I missing something here?
>
> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>
>
> Thanks in advance!
>
>

Reply via email to