Hi Mich,

Thank you for the help and sorry about the late reply.
I ran your provided but I got "exitCode 0". Here is the complete output:

===============================


24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
24/05/30 01:23:38 INFO ResourceUtils:
==============================================================
24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
spark.driver.
24/05/30 01:23:38 INFO ResourceUtils:
==============================================================
24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
, memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
name: cpus, amount: 1.0)
24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: ubuntu; groups
with view permissions: EMPTY; users with modify permissions: ubuntu; groups
with modify permissions: EMPTY
24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' on
port 46321.
24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
GiB
24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
MOC-R4PAC08U33-S1C
24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:39 INFO Executor: Java version 11.0.22
24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
(userClassPathFirst = false): ''
24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
24/05/30 01:23:39 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on
MOC-R4PAC08U33-S1C:39343
24/05/30 01:23:39 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block
manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver,
MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir.
24/05/30 01:23:39 INFO SharedState: Warehouse path is
'file:/home/ubuntu/tpch-spark/spark-warehouse'.
24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 10 second(s).
24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system
started
24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing:
24/05/30 01:23:43 INFO FileSourceStrategy: Pushed Filters:
24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan Filters:
24/05/30 01:23:43 INFO CodeGenerator: Code generated in 188.932153 ms
24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 201.3 KiB, free 2.8 GiB)
24/05/30 01:23:43 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 34.8 KiB, free 2.8 GiB)
24/05/30 01:23:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on MOC-R4PAC08U33-S1C:39343 (size: 34.8 KiB, free: 2.8 GiB)
24/05/30 01:23:43 INFO SparkContext: Created broadcast 0 from showString at
NativeMethodAccessorImpl.java:0
24/05/30 01:23:43 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
+----+----+----+
|name|int1|int2|
+----+----+----+
+----+----+----+

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.
24/05/30 01:23:43 INFO SparkUI: Stopped Spark web UI at
http://MOC-R4PAC08U33-S1C:4040
24/05/30 01:23:43 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
24/05/30 01:23:43 INFO MemoryStore: MemoryStore cleared
24/05/30 01:23:43 INFO BlockManager: BlockManager stopped
24/05/30 01:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped
24/05/30 01:23:43 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
24/05/30 01:23:43 INFO SparkContext: Successfully stopped SparkContext
24/05/30 01:23:44 INFO ShutdownHookManager: Shutdown hook called
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-f26cd915-aeb6-4efc-8960-56ca51ac1a7d
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6
24/05/30 01:23:44 INFO ShutdownHookManager: Deleting directory
/tmp/spark-91b984d6-62fe-4c6b-9996-36d6873ff5d6/pyspark-bcca58a9-38dc-4359-85d1-81b728d6cf82


Best,
Amin



On Thu, May 23, 2024 at 4:20 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Could be a number of reasons
>
> First test reading the file with a cli
>
> aws s3 cp s3a://input/testfile.csv .
> cat testfile.csv
>
>
> Try this code with debug option to diagnose the problem
>
> from pyspark.sql import SparkSession
> from pyspark.sql.utils import AnalysisException
>
> try:
>     # Initialize Spark session
>     spark = SparkSession.builder \
>         .appName("S3ReadTest") \
>         .config("spark.jars.packages",
> "org.apache.hadoop:hadoop-aws:3.3.6") \
>         .config("spark.hadoop.fs.s3a.access.key", "R*************6") \
>         .config("spark.hadoop.fs.s3a.secret.key", "1***************e") \
>         .config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
>         .config("spark.hadoop.fs.s3a.path.style.access", "true") \
>         .config("spark.hadoop.fs.s3a.impl",
> "org.apache.hadoop.fs.s3a.S3AFileSystem") \
>         .getOrCreate()
>
>     # Read the CSV file from S3
>     df = spark.read \
>         .option("header", "true") \
>         .option("inferSchema", "true") \
>         .option("delimiter", " ") \  # ensure this is apace
>         .csv("s3a://input/testfile.csv")
>
>     # Show the data
>     df.show(n=1)
>
> except AnalysisException as e:
>     print(f"AnalysisException: {e}")
> except Exception as e:
>     print(f"Error: {e}")
> finally:
>     # Stop the Spark session
>     spark.stop()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh <mosayyebza...@gmail.com>
> wrote:
>
>> I am trying to read an s3 object from a local S3 storage (Ceph based)
>> using Spark 3.5.1. I see it can access the bucket and list the files (I
>> have verified it on Ceph side by checking its logs), even returning the
>> correct size of the object. But the content is not read.
>>
>> The object url is:
>> s3a://input/testfile.csv (I have also tested a nested bucket:
>> s3a://test1/test2/test3/testfile.csv)
>>
>>
>> Object's content:
>>
>> =====================
>> name int1 int2
>> first 1 2
>> second 3 4
>> =====================
>>
>>
>> Here is the config I have set so far:
>>
>> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
>> ("spark.hadoop.fs.s3a.access.key", "R*************6")
>> ("spark.hadoop.fs.s3a.secret.key", "1***************e")
>> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
>> ("spark.hadoop.fs.s3a.path.style.access", "true")
>> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>
>>
>> The outop for my following Pyspark application:
>> df = spark.read \
>>     .option("header", "true") \
>>     .schema(schema) \
>>     .csv("s3a://input/testfile.csv", sep=' ')
>>
>> df.show(n=1)
>> ==================================
>> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system 
>> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming file 
>> log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO 
>> FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO 
>> FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO 
>> FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO CodeGenerator: 
>> Code generated in 176.139675 ms24/05/20 02:35:03 INFO MemoryStore: Block 
>> broadcast_0 stored as values in memory (estimated size 496.6 KiB, free 4.1 
>> GiB)24/05/20 02:35:03 INFO MemoryStore: Block broadcast_0_piece0 stored as 
>> bytes in memory (estimated size 54.4 KiB, free 4.1 GiB)24/05/20 02:35:03 
>> INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on master:38197 
>> (size: 54.4 KiB, free: 4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created 
>> broadcast 0 from showString at NativeMethodAccessorImpl.java:024/05/20 
>> 02:35:03 INFO FileSourceScanExec: Planning scan with bin packing, max size: 
>> 4194304 bytes, open cost is considered as scanning 4194304 bytes.
>> +----+----+----+
>> |name|int1|int2|
>> +----+----+----+
>> +----+----+----+
>> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown 
>> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with 
>> exitCode 0
>> =========================================
>>
>> Am I missing something here?
>>
>> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>>
>>
>> Thanks in advance!
>>
>>

Reply via email to