Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-05 Thread Gourav Sengupta
Hi,

SPARK is just one of the technologies out there now, there are several
other technologies far outperforming SPARK or at least as good as SPARK.



Regards,
Gourav

On Sat, Jul 2, 2022 at 7:42 PM Sid  wrote:

> So as per the discussion, shuffle stages output is also stored on disk and
> not in memory?
>
> On Sat, Jul 2, 2022 at 8:44 PM krexos  wrote:
>
>>
>> thanks a lot!
>>
>> --- Original Message ---
>> On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen 
>> wrote:
>>
>> I think that is more accurate yes. Though, shuffle files are local, not
>> on distributed storage too, which is an advantage. MR also had map only
>> transforms and chained mappers, but harder to use. Not impossible but you
>> could also say Spark just made it easier to do the more efficient thing.
>>
>> On Sat, Jul 2, 2022, 9:34 AM krexos 
>> wrote:
>>
>>>
>>> You said Spark performs IO only when reading data and writing final data
>>> to the disk. I though by that you meant that it only reads the input files
>>> of the job and writes the output of the whole job to the disk, but in
>>> reality spark does store intermediate results on disk, just in less places
>>> than MR
>>>
>>> --- Original Message ---
>>> On Saturday, July 2nd, 2022 at 5:27 PM, Sid 
>>> wrote:
>>>
>>> I have explained the same thing in a very layman's terms. Go through it
>>> once.
>>>
>>> On Sat, 2 Jul 2022, 19:45 krexos,  wrote:
>>>

 I think I understand where Spark saves IO.

 in MR we have map -> reduce -> map -> reduce -> map -> reduce ...

 which writes results do disk at the end of each such "arrow",

 on the other hand in spark we have

 map -> reduce + map -> reduce + map -> reduce ...

 which saves about 2 times the IO

 thanks everyone,
 krexos

 --- Original Message ---
 On Saturday, July 2nd, 2022 at 1:35 PM, krexos 
 wrote:

 Hello,

 One of the main "selling points" of Spark is that unlike Hadoop
 map-reduce that persists intermediate results of its computation to HDFS
 (disk), Spark keeps all its results in memory. I don't understand this as
 in reality when a Spark stage finishes it writes all of the data into
 shuffle files stored on the disk
 .
 How then is this an improvement on map-reduce?

 Image from https://youtu.be/7ooZ4S7Ay6Y


 thanks!



>>>
>>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-05 Thread Apostolos N. Papadopoulos
First of all, define "far outperforming". For sure, there is no GOD 
system that does everything perfectly.


In which use-cases are you referring to? It would be interesting to the 
community to see some comparisons.


a.


On 5/7/22 12:29, Gourav Sengupta wrote:

Hi,

SPARK is just one of the technologies out there now, there are several 
other technologies far outperforming SPARK or at least as good as SPARK.




Regards,
Gourav

On Sat, Jul 2, 2022 at 7:42 PM Sid  wrote:

So as per the discussion, shuffle stages output is also stored on
disk and not in memory?

On Sat, Jul 2, 2022 at 8:44 PM krexos  wrote:


thanks a lot!

--- Original Message ---
On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen
 wrote:


I think that is more accurate yes. Though, shuffle files are
local, not on distributed storage too, which is an advantage.
MR also had map only transforms and chained mappers, but
harder to use. Not impossible but you could also say Spark
just made it easier to do the more efficient thing.

On Sat, Jul 2, 2022, 9:34 AM krexos
 wrote:


You said Spark performs IO only when reading data and
writing final data to the disk. I though by that you
meant that it only reads the input files of the job and
writes the output of the whole job to the disk, but in
reality spark does store intermediate results on disk,
just in less places than MR

--- Original Message ---
On Saturday, July 2nd, 2022 at 5:27 PM, Sid
 wrote:


I have explained the same thing in a very layman's
terms. Go through it once.

On Sat, 2 Jul 2022, 19:45 krexos,
 wrote:


I think I understand where Spark saves IO.

in MR we have map -> reduce -> map -> reduce -> map
-> reduce ...

which writes results do disk at the end of each such
"arrow",

on the other hand in spark we have

map -> reduce + map -> reduce + map -> reduce ...

which saves about 2 times the IO

thanks everyone,
krexos

--- Original Message ---
On Saturday, July 2nd, 2022 at 1:35 PM, krexos
 wrote:


Hello,

One of the main "selling points" of Spark is that
unlike Hadoop map-reduce that persists intermediate
results of its computation to HDFS (disk), Spark
keeps all its results in memory. I don't understand
this as in reality when a Spark stage finishesit
writes all of the data into shuffle files stored on
the disk

.
How then is this an improvement on map-reduce?

Image from https://youtu.be/7ooZ4S7Ay6Y


thanks!







--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Reading snappy/lz4 compressed csv/json files

2022-07-05 Thread Yeachan Park
Hi all,

We are trying to read csv/json files that have been snappy/lz4 compressed
with spark. Files were compressed with the lz4 command line tool and the
python snappy library.

Both did not succeed, while other formats (bzip2 & gzip) worked fine.

I've read in some places that the codec is not fully compatible between
different implementations. Has anyone else had success with this? Would be
happy to hear how you went about it.

Thanks,
Yeachan


Re: How reading works?

2022-07-05 Thread Sid
Hi Team,

I still need help in understanding how reading works exactly?

Thanks,
Sid

On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:

> Hi Team,
>
> Can somebody help?
>
> Thanks,
> Sid
>
> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>
>> Hi,
>>
>> I already have a partitioned JSON dataset in s3 like the below:
>>
>> edl_timestamp=2022090800
>>
>> Now, the problem is, in the earlier 10 days of data collection there was
>> a duplicate columns issue due to which we couldn't read the data.
>>
>> Now the latest 10 days of data are proper. So, I am trying to do
>> something like the below:
>>
>>
>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>
>> but I am getting the issue of the duplicate column which was present in
>> the old dataset. So, I am trying to understand how the spark reads the
>> data. Does it full dataset and filter on the basis of the last saved
>> timestamp or does it filter only what is required? If the second case is
>> true, then it should have read the data since the latest data is correct.
>>
>> So just trying to understand. Could anyone help here?
>>
>> Thanks,
>> Sid
>>
>>
>>


Re: How reading works?

2022-07-05 Thread Bjørn Jørgensen
"*but I am getting the issue of the duplicate column which was present in
the old dataset.*"

So you have answered your question!

spark.read.option("multiline","true").json("path").filter(
col("edl_timestamp")>last_saved_timestamp) As you have figured out, spark
read all the json files in "path" then filter.

There are some file formats that can have filters before reading files. The
one that I know about is Parquet. Like this link explains Spark: Understand
the Basic of Pushed Filter and Partition Filter Using Parquet File






tir. 5. jul. 2022 kl. 21:21 skrev Sid :

> Hi Team,
>
> I still need help in understanding how reading works exactly?
>
> Thanks,
> Sid
>
> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>
>> Hi Team,
>>
>> Can somebody help?
>>
>> Thanks,
>> Sid
>>
>> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>>
>>> Hi,
>>>
>>> I already have a partitioned JSON dataset in s3 like the below:
>>>
>>> edl_timestamp=2022090800
>>>
>>> Now, the problem is, in the earlier 10 days of data collection there was
>>> a duplicate columns issue due to which we couldn't read the data.
>>>
>>> Now the latest 10 days of data are proper. So, I am trying to do
>>> something like the below:
>>>
>>>
>>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>>
>>> but I am getting the issue of the duplicate column which was present in
>>> the old dataset. So, I am trying to understand how the spark reads the
>>> data. Does it full dataset and filter on the basis of the last saved
>>> timestamp or does it filter only what is required? If the second case is
>>> true, then it should have read the data since the latest data is correct.
>>>
>>> So just trying to understand. Could anyone help here?
>>>
>>> Thanks,
>>> Sid
>>>
>>>
>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How reading works?

2022-07-05 Thread Bjørn Jørgensen
Ehh.. What is "*duplicate column*" ? I don't think Spark supports that.

duplicate column = duplicate rows


tir. 5. jul. 2022 kl. 22:13 skrev Bjørn Jørgensen :

> "*but I am getting the issue of the duplicate column which was present in
> the old dataset.*"
>
> So you have answered your question!
>
> spark.read.option("multiline","true").json("path").filter(
> col("edl_timestamp")>last_saved_timestamp) As you have figured out, spark
> read all the json files in "path" then filter.
>
> There are some file formats that can have filters before reading files.
> The one that I know about is Parquet. Like this link explains Spark:
> Understand the Basic of Pushed Filter and Partition Filter Using Parquet
> File
> 
>
>
>
>
>
> tir. 5. jul. 2022 kl. 21:21 skrev Sid :
>
>> Hi Team,
>>
>> I still need help in understanding how reading works exactly?
>>
>> Thanks,
>> Sid
>>
>> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> Can somebody help?
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>>>
 Hi,

 I already have a partitioned JSON dataset in s3 like the below:

 edl_timestamp=2022090800

 Now, the problem is, in the earlier 10 days of data collection there
 was a duplicate columns issue due to which we couldn't read the data.

 Now the latest 10 days of data are proper. So, I am trying to do
 something like the below:


 spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)

 but I am getting the issue of the duplicate column which was present in
 the old dataset. So, I am trying to understand how the spark reads the
 data. Does it full dataset and filter on the basis of the last saved
 timestamp or does it filter only what is required? If the second case is
 true, then it should have read the data since the latest data is correct.

 So just trying to understand. Could anyone help here?

 Thanks,
 Sid



>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Reading parquet strips non-nullability from schema

2022-07-05 Thread Greg Kopff
Hi.

I’ve spent the last couple of hours trying to chase down an issue with 
writing/reading parquet files.  I was trying to save (and then read in) a 
parquet file with a schema that sets my non-nullability details correctly.  
After having no success for some time, I posted to Stack Overflow about it:

https://stackoverflow.com/q/72877780/1212960 


If you read the question, you’ll see that I can trace writing a schema with the 
correct (non-)nullability information.  And I can even see that when reading in 
the parquet file, the correct nullability information is found.  However, by 
the time the data frame gets handed back to me, the (non-)nullability 
information has been thrown away.

Only after posting the question did I find this sentence in the Parquet Files 
(https://spark.apache.org/docs/3.3.0/sql-data-sources-parquet.html 
) 
documentation: 

When reading Parquet files, all columns are automatically converted to be 
nullable for compatibility reasons.

It seems odd to me that:

 - Spark tracks nullable and non-nullable columns
 - Parquet tracks nullable and non-nullable columns
 - Spark can write a parquet file with correctly annotated non-nullable columns
 - Spark can correctly read a parquet file and identify non-nullable columns

… and yet, Spark deliberately discards this information.  

I understand (from this article 
)
 that Spark itself doesn’t enforce the non-nullability - it’s simply 
information used by the Catalyst optimiser.  And I also understand that Bad 
Things™ would happen if you inserted null values when the schema says it’s 
non-null.

But I don’t understand why I can’t accept those caveats and have Spark retain 
my schema properly, with the non-nullability information maintained.  What are 
these compatibility reasons that the documentation alludes to?

Is this behaviour configurable?  How can I create a dataframe from a parquet 
file that uses the schema as-is (i.e. honouring the nullability information)?  
I want the Catalyst optimiser to have this information available, and I control 
my data to ensure that it meets the nullability constraints.

Thanks for your time.

Kindest regards,

—
Greg