is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can't find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.


Re: is dataframe thread safe?

2017-02-12 Thread Jörn Franke
I am not sure what you are trying to achieve here. Spark is taking care of 
executing the transformations in a distributed fashion. This means you must not 
use threads - it does not make sense. Hence, you do not find documentation 
about it.

> On 12 Feb 2017, at 09:06, Mendelson, Assaf  wrote:
> 
> Hi,
> I was wondering if dataframe is considered thread safe. I know the spark 
> session and spark context are thread safe (and actually have tools to manage 
> jobs from different threads) but the question is, can I use the same 
> dataframe in both threads.
> The idea would be to create a dataframe in the main thread and then in two 
> sub threads do different transformations and actions on it.
> I understand that some things might not be thread safe (e.g. if I unpersist 
> in one thread it would affect the other. Checkpointing would cause similar 
> issues), however, I can’t find any documentation as to what operations (if 
> any) are thread safe.
>  
> Thanks,
> Assaf.


RE: is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
I know spark takes care of executing everything in a distributed manner, 
however, spark also supports having multiple threads on the same spark 
session/context and knows (Through fair scheduler) to distribute the tasks from 
them in a round robin.

The question is, can those two actions (with a different set of 
transformations) be applied to the SAME dataframe.

Let’s say I want to do something like:



Val df = ???
df.cache()
df.count()

def f1(df: DataFrame): Unit = {
  val df1 = df.groupby(something).agg(some aggs)
  df1.write.parquet(“some path”)
}

def f2(df: DataFrame): Unit = {
  val df2 = df.groupby(something else).agg(some different aggs)
  df2.write.parquet(“some path 2”)
}

f1(df)
f2(df)

df.unpersist()

if the aggregations do not use the full cluster (e.g. because of data skewness, 
because there aren’t enough partitions or any other reason) then this would 
leave the cluster under utilized.

However, if I would call f1 and f2 on different threads, then df2 can use free 
resources f1 has not consumed and the overall utilization would improve.

Of course, I can do this only if the operations on the dataframe are thread 
safe. For example, if I would do a cache in f1 and an unpersist in f2 I would 
get an inconsistent result. So my question is, what, if any are the legal 
operations to use on a dataframe so I could do the above.

Thanks,
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 10:39 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: is dataframe thread safe?

I am not sure what you are trying to achieve here. Spark is taking care of 
executing the transformations in a distributed fashion. This means you must not 
use threads - it does not make sense. Hence, you do not find documentation 
about it.

On 12 Feb 2017, at 09:06, Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can’t find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.



Re: Remove dependence on HDFS

2017-02-12 Thread Jörn Franke
You're have to carefully choose if your strategy makes sense given your users 
workloads. Hence, I am not sure your reasoning makes sense.

However, You can , for example, install openstack swift  as an object store and 
use this as storage. HDFS in this case can be used as a temporary store and/or 
for checkpointing. Alternatively you can do this fully in-memory with ignite or 
alluxio.

S3 is the cloud storage provided by Amazon - this is not on premise. You can do 
the same here as a described above, but using s3 instead of swift.

> On 12 Feb 2017, at 05:28, Benjamin Kim  wrote:
> 
> Has anyone got some advice on how to remove the reliance on HDFS for storing 
> persistent data. We have an on-premise Spark cluster. It seems like a waste 
> of resources to keep adding nodes because of a lack of storage space only. I 
> would rather add more powerful nodes due to the lack of processing power at a 
> less frequent rate, than add less powerful nodes at a more frequent rate just 
> to handle the ever growing data. Can anyone point me in the right direction? 
> Is Alluxio a good solution? S3? I would like to hear your thoughts.
> 
> Cheers,
> Ben 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Remove dependence on HDFS

2017-02-12 Thread Sean Owen
Data has to live somewhere -- how do you not add storage but store more
data?  Alluxio is not persistent storage, and S3 isn't on your premises.

On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim  wrote:

> Has anyone got some advice on how to remove the reliance on HDFS for
> storing persistent data. We have an on-premise Spark cluster. It seems like
> a waste of resources to keep adding nodes because of a lack of storage
> space only. I would rather add more powerful nodes due to the lack of
> processing power at a less frequent rate, than add less powerful nodes at a
> more frequent rate just to handle the ever growing data. Can anyone point
> me in the right direction? Is Alluxio a good solution? S3? I would like to
> hear your thoughts.
>
> Cheers,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Remove dependence on HDFS

2017-02-12 Thread ayan guha
How about adding more NFS storage?

On Sun, 12 Feb 2017 at 8:14 pm, Sean Owen  wrote:

> Data has to live somewhere -- how do you not add storage but store more
> data?  Alluxio is not persistent storage, and S3 isn't on your premises.
>
> On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim  wrote:
>
> Has anyone got some advice on how to remove the reliance on HDFS for
> storing persistent data. We have an on-premise Spark cluster. It seems like
> a waste of resources to keep adding nodes because of a lack of storage
> space only. I would rather add more powerful nodes due to the lack of
> processing power at a less frequent rate, than add less powerful nodes at a
> more frequent rate just to handle the ever growing data. Can anyone point
> me in the right direction? Is Alluxio a good solution? S3? I would like to
> hear your thoughts.
>
> Cheers,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: is dataframe thread safe?

2017-02-12 Thread Jörn Franke
I think you should have a look at the spark documentation. It has something 
called scheduler who does exactly this. In more sophisticated environments yarn 
or mesos do this for you.

Using threads for transformations does not make sense. 

> On 12 Feb 2017, at 09:50, Mendelson, Assaf  wrote:
> 
> I know spark takes care of executing everything in a distributed manner, 
> however, spark also supports having multiple threads on the same spark 
> session/context and knows (Through fair scheduler) to distribute the tasks 
> from them in a round robin.
>  
> The question is, can those two actions (with a different set of 
> transformations) be applied to the SAME dataframe.
>  
> Let’s say I want to do something like:
>  
>  
>  
> Val df = ???
> df.cache()
> df.count()
>  
> def f1(df: DataFrame): Unit = {
>   val df1 = df.groupby(something).agg(some aggs)
>   df1.write.parquet(“some path”)
> }
>  
> def f2(df: DataFrame): Unit = {
>   val df2 = df.groupby(something else).agg(some different aggs)
>   df2.write.parquet(“some path 2”)
> }
>  
> f1(df)
> f2(df)
>  
> df.unpersist()
>  
> if the aggregations do not use the full cluster (e.g. because of data 
> skewness, because there aren’t enough partitions or any other reason) then 
> this would leave the cluster under utilized.
>  
> However, if I would call f1 and f2 on different threads, then df2 can use 
> free resources f1 has not consumed and the overall utilization would improve.
>  
> Of course, I can do this only if the operations on the dataframe are thread 
> safe. For example, if I would do a cache in f1 and an unpersist in f2 I would 
> get an inconsistent result. So my question is, what, if any are the legal 
> operations to use on a dataframe so I could do the above.
>  
> Thanks,
> Assaf.
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Sunday, February 12, 2017 10:39 AM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: is dataframe thread safe?
>  
> I am not sure what you are trying to achieve here. Spark is taking care of 
> executing the transformations in a distributed fashion. This means you must 
> not use threads - it does not make sense. Hence, you do not find 
> documentation about it.
> 
> On 12 Feb 2017, at 09:06, Mendelson, Assaf  wrote:
> 
> Hi,
> I was wondering if dataframe is considered thread safe. I know the spark 
> session and spark context are thread safe (and actually have tools to manage 
> jobs from different threads) but the question is, can I use the same 
> dataframe in both threads.
> The idea would be to create a dataframe in the main thread and then in two 
> sub threads do different transformations and actions on it.
> I understand that some things might not be thread safe (e.g. if I unpersist 
> in one thread it would affect the other. Checkpointing would cause similar 
> issues), however, I can’t find any documentation as to what operations (if 
> any) are thread safe.
>  
> Thanks,
> Assaf.
>  


Re: is dataframe thread safe?

2017-02-12 Thread Sean Owen
No this use case is perfectly sensible. Yes it is thread safe.

On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:

> I think you should have a look at the spark documentation. It has
> something called scheduler who does exactly this. In more sophisticated
> environments yarn or mesos do this for you.
>
> Using threads for transformations does not make sense.
>
> On 12 Feb 2017, at 09:50, Mendelson, Assaf 
> wrote:
>
> I know spark takes care of executing everything in a distributed manner,
> however, spark also supports having multiple threads on the same spark
> session/context and knows (Through fair scheduler) to distribute the tasks
> from them in a round robin.
>
>
>
> The question is, can those two actions (with a different set of
> transformations) be applied to the SAME dataframe.
>
>
>
> Let’s say I want to do something like:
>
>
>
>
>
>
>
> Val df = ???
>
> df.cache()
>
> df.count()
>
>
>
> def f1(df: DataFrame): Unit = {
>
>   val df1 = df.groupby(something).agg(some aggs)
>
>   df1.write.parquet(“some path”)
>
> }
>
>
>
> def f2(df: DataFrame): Unit = {
>
>   val df2 = df.groupby(something else).agg(some different aggs)
>
>   df2.write.parquet(“some path 2”)
>
> }
>
>
>
> f1(df)
>
> f2(df)
>
>
>
> df.unpersist()
>
>
>
> if the aggregations do not use the full cluster (e.g. because of data
> skewness, because there aren’t enough partitions or any other reason) then
> this would leave the cluster under utilized.
>
>
>
> However, if I would call f1 and f2 on different threads, then df2 can use
> free resources f1 has not consumed and the overall utilization would
> improve.
>
>
>
> Of course, I can do this only if the operations on the dataframe are
> thread safe. For example, if I would do a cache in f1 and an unpersist in
> f2 I would get an inconsistent result. So my question is, what, if any are
> the legal operations to use on a dataframe so I could do the above.
>
>
>
> Thanks,
>
> Assaf.
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com ]
> *Sent:* Sunday, February 12, 2017 10:39 AM
> *To:* Mendelson, Assaf
> *Cc:* user
> *Subject:* Re: is dataframe thread safe?
>
>
>
> I am not sure what you are trying to achieve here. Spark is taking care of
> executing the transformations in a distributed fashion. This means you must
> not use threads - it does not make sense. Hence, you do not find
> documentation about it.
>
>
> On 12 Feb 2017, at 09:06, Mendelson, Assaf 
> wrote:
>
> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be thread safe (e.g. if I
> unpersist in one thread it would affect the other. Checkpointing would
> cause similar issues), however, I can’t find any documentation as to what
> operations (if any) are thread safe.
>
>
>
> Thanks,
>
> Assaf.
>
>
>
>


Etl with spark

2017-02-12 Thread Sam Elamin
Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam


Re: Etl with spark

2017-02-12 Thread ayan guha
You can store the list of keys (I believe you use them in source file path,
right?) in a file, one key per line. Then you can read the file using
sc.textFile (So you will get a RDD of file paths) and then apply your
function as a map.

r = sc.textFile(list_file).map(your_function)

HTH

On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin 
wrote:

> Hey folks
>
> Really simple question here. I currently have an etl pipeline that reads
> from s3 and saves the data to an endstore
>
>
> I have to read from a list of keys in s3 but I am doing a raw extract then
> saving. Only some of the extracts have a simple transformation but overall
> the code looks the same
>
>
> I abstracted away this logic into a method that takes in an s3 path does
> the common transformations and saves to source
>
>
> But the job takes about 10 mins or so because I'm iteratively going down a
> list of keys
>
> Is it possible to asynchronously do this?
>
> FYI I'm using spark.read.json to read from s3 because it infers my schema
>
> Regards
> Sam
>



-- 
Best Regards,
Ayan Guha


Re: is dataframe thread safe?

2017-02-12 Thread Yan Facai
DataFrame is immutable, so it should be thread safe, right?

On Sun, Feb 12, 2017 at 6:45 PM, Sean Owen  wrote:

> No this use case is perfectly sensible. Yes it is thread safe.
>
>
> On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:
>
>> I think you should have a look at the spark documentation. It has
>> something called scheduler who does exactly this. In more sophisticated
>> environments yarn or mesos do this for you.
>>
>> Using threads for transformations does not make sense.
>>
>> On 12 Feb 2017, at 09:50, Mendelson, Assaf 
>> wrote:
>>
>> I know spark takes care of executing everything in a distributed manner,
>> however, spark also supports having multiple threads on the same spark
>> session/context and knows (Through fair scheduler) to distribute the tasks
>> from them in a round robin.
>>
>>
>>
>> The question is, can those two actions (with a different set of
>> transformations) be applied to the SAME dataframe.
>>
>>
>>
>> Let’s say I want to do something like:
>>
>>
>>
>>
>>
>>
>>
>> Val df = ???
>>
>> df.cache()
>>
>> df.count()
>>
>>
>>
>> def f1(df: DataFrame): Unit = {
>>
>>   val df1 = df.groupby(something).agg(some aggs)
>>
>>   df1.write.parquet(“some path”)
>>
>> }
>>
>>
>>
>> def f2(df: DataFrame): Unit = {
>>
>>   val df2 = df.groupby(something else).agg(some different aggs)
>>
>>   df2.write.parquet(“some path 2”)
>>
>> }
>>
>>
>>
>> f1(df)
>>
>> f2(df)
>>
>>
>>
>> df.unpersist()
>>
>>
>>
>> if the aggregations do not use the full cluster (e.g. because of data
>> skewness, because there aren’t enough partitions or any other reason) then
>> this would leave the cluster under utilized.
>>
>>
>>
>> However, if I would call f1 and f2 on different threads, then df2 can use
>> free resources f1 has not consumed and the overall utilization would
>> improve.
>>
>>
>>
>> Of course, I can do this only if the operations on the dataframe are
>> thread safe. For example, if I would do a cache in f1 and an unpersist in
>> f2 I would get an inconsistent result. So my question is, what, if any are
>> the legal operations to use on a dataframe so I could do the above.
>>
>>
>>
>> Thanks,
>>
>> Assaf.
>>
>>
>>
>> *From:* Jörn Franke [mailto:jornfra...@gmail.com ]
>> *Sent:* Sunday, February 12, 2017 10:39 AM
>> *To:* Mendelson, Assaf
>> *Cc:* user
>> *Subject:* Re: is dataframe thread safe?
>>
>>
>>
>> I am not sure what you are trying to achieve here. Spark is taking care
>> of executing the transformations in a distributed fashion. This means you
>> must not use threads - it does not make sense. Hence, you do not find
>> documentation about it.
>>
>>
>> On 12 Feb 2017, at 09:06, Mendelson, Assaf 
>> wrote:
>>
>> Hi,
>>
>> I was wondering if dataframe is considered thread safe. I know the spark
>> session and spark context are thread safe (and actually have tools to
>> manage jobs from different threads) but the question is, can I use the same
>> dataframe in both threads.
>>
>> The idea would be to create a dataframe in the main thread and then in
>> two sub threads do different transformations and actions on it.
>>
>> I understand that some things might not be thread safe (e.g. if I
>> unpersist in one thread it would affect the other. Checkpointing would
>> cause similar issues), however, I can’t find any documentation as to what
>> operations (if any) are thread safe.
>>
>>
>>
>> Thanks,
>>
>> Assaf.
>>
>>
>>
>>


Re: is dataframe thread safe?

2017-02-12 Thread Jörn Franke
I did not doubt that the submission of several jobs of one application makes 
sense. However, he want to create threads within maps etc., which looks like 
calling for issues (not only for running the application itself, but also for 
operating it in production within a shared cluster). I would rely for parallel 
execution of the transformations on the out-of-the-box functionality within 
Spark.

For me he looks for a solution that can be achieved by a simple configuration 
of the scheduler in Spark, yarn or mesos. In this way the application would be 
more maintainable in production.

> On 12 Feb 2017, at 11:45, Sean Owen  wrote:
> 
> No this use case is perfectly sensible. Yes it is thread safe. 
> 
>> On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:
>> I think you should have a look at the spark documentation. It has something 
>> called scheduler who does exactly this. In more sophisticated environments 
>> yarn or mesos do this for you.
>> 
>> Using threads for transformations does not make sense. 
>> 
>>> On 12 Feb 2017, at 09:50, Mendelson, Assaf  wrote:
>>> 
>>> I know spark takes care of executing everything in a distributed manner, 
>>> however, spark also supports having multiple threads on the same spark 
>>> session/context and knows (Through fair scheduler) to distribute the tasks 
>>> from them in a round robin.
>>> 
>>>  
>>> 
>>> The question is, can those two actions (with a different set of 
>>> transformations) be applied to the SAME dataframe.
>>> 
>>>  
>>> 
>>> Let’s say I want to do something like:
>>> 
>>>  
>>> 
>>>  
>>> 
>>>  
>>> 
>>> Val df = ???
>>> 
>>> df.cache()
>>> 
>>> df.count()
>>> 
>>>  
>>> 
>>> def f1(df: DataFrame): Unit = {
>>> 
>>>   val df1 = df.groupby(something).agg(some aggs)
>>> 
>>>   df1.write.parquet(“some path”)
>>> 
>>> }
>>> 
>>>  
>>> 
>>> def f2(df: DataFrame): Unit = {
>>> 
>>>   val df2 = df.groupby(something else).agg(some different aggs)
>>> 
>>>   df2.write.parquet(“some path 2”)
>>> 
>>> }
>>> 
>>>  
>>> 
>>> f1(df)
>>> 
>>> f2(df)
>>> 
>>>  
>>> 
>>> df.unpersist()
>>> 
>>>  
>>> 
>>> if the aggregations do not use the full cluster (e.g. because of data 
>>> skewness, because there aren’t enough partitions or any other reason) then 
>>> this would leave the cluster under utilized.
>>> 
>>>  
>>> 
>>> However, if I would call f1 and f2 on different threads, then df2 can use 
>>> free resources f1 has not consumed and the overall utilization would 
>>> improve.
>>> 
>>>  
>>> 
>>> Of course, I can do this only if the operations on the dataframe are thread 
>>> safe. For example, if I would do a cache in f1 and an unpersist in f2 I 
>>> would get an inconsistent result. So my question is, what, if any are the 
>>> legal operations to use on a dataframe so I could do the above.
>>> 
>>>  
>>> 
>>> Thanks,
>>> 
>>> Assaf.
>>> 
>>>  
>>> 
>>> From: Jörn Franke [mailto:jornfra...@gmail.com] 
>>> Sent: Sunday, February 12, 2017 10:39 AM
>>> To: Mendelson, Assaf
>>> Cc: user
>>> Subject: Re: is dataframe thread safe?
>>> 
>>>  
>>> 
>>> I am not sure what you are trying to achieve here. Spark is taking care of 
>>> executing the transformations in a distributed fashion. This means you must 
>>> not use threads - it does not make sense. Hence, you do not find 
>>> documentation about it.
>>> 
>>> 
>>> On 12 Feb 2017, at 09:06, Mendelson, Assaf  wrote:
>>> 
>>> Hi,
>>> 
>>> I was wondering if dataframe is considered thread safe. I know the spark 
>>> session and spark context are thread safe (and actually have tools to 
>>> manage jobs from different threads) but the question is, can I use the same 
>>> dataframe in both threads.
>>> 
>>> The idea would be to create a dataframe in the main thread and then in two 
>>> sub threads do different transformations and actions on it.
>>> 
>>> I understand that some things might not be thread safe (e.g. if I unpersist 
>>> in one thread it would affect the other. Checkpointing would cause similar 
>>> issues), however, I can’t find any documentation as to what operations (if 
>>> any) are thread safe.
>>> 
>>>  
>>> 
>>> Thanks,
>>> 
>>> Assaf.
>>> 
>>>  


Re: is dataframe thread safe?

2017-02-12 Thread Jörn Franke
Cf. also https://spark.apache.org/docs/latest/job-scheduling.html

> On 12 Feb 2017, at 11:30, Jörn Franke  wrote:
> 
> I think you should have a look at the spark documentation. It has something 
> called scheduler who does exactly this. In more sophisticated environments 
> yarn or mesos do this for you.
> 
> Using threads for transformations does not make sense. 
> 
>> On 12 Feb 2017, at 09:50, Mendelson, Assaf  wrote:
>> 
>> I know spark takes care of executing everything in a distributed manner, 
>> however, spark also supports having multiple threads on the same spark 
>> session/context and knows (Through fair scheduler) to distribute the tasks 
>> from them in a round robin.
>>  
>> The question is, can those two actions (with a different set of 
>> transformations) be applied to the SAME dataframe.
>>  
>> Let’s say I want to do something like:
>>  
>>  
>>  
>> Val df = ???
>> df.cache()
>> df.count()
>>  
>> def f1(df: DataFrame): Unit = {
>>   val df1 = df.groupby(something).agg(some aggs)
>>   df1.write.parquet(“some path”)
>> }
>>  
>> def f2(df: DataFrame): Unit = {
>>   val df2 = df.groupby(something else).agg(some different aggs)
>>   df2.write.parquet(“some path 2”)
>> }
>>  
>> f1(df)
>> f2(df)
>>  
>> df.unpersist()
>>  
>> if the aggregations do not use the full cluster (e.g. because of data 
>> skewness, because there aren’t enough partitions or any other reason) then 
>> this would leave the cluster under utilized.
>>  
>> However, if I would call f1 and f2 on different threads, then df2 can use 
>> free resources f1 has not consumed and the overall utilization would improve.
>>  
>> Of course, I can do this only if the operations on the dataframe are thread 
>> safe. For example, if I would do a cache in f1 and an unpersist in f2 I 
>> would get an inconsistent result. So my question is, what, if any are the 
>> legal operations to use on a dataframe so I could do the above.
>>  
>> Thanks,
>> Assaf.
>>  
>> From: Jörn Franke [mailto:jornfra...@gmail.com] 
>> Sent: Sunday, February 12, 2017 10:39 AM
>> To: Mendelson, Assaf
>> Cc: user
>> Subject: Re: is dataframe thread safe?
>>  
>> I am not sure what you are trying to achieve here. Spark is taking care of 
>> executing the transformations in a distributed fashion. This means you must 
>> not use threads - it does not make sense. Hence, you do not find 
>> documentation about it.
>> 
>> On 12 Feb 2017, at 09:06, Mendelson, Assaf  wrote:
>> 
>> Hi,
>> I was wondering if dataframe is considered thread safe. I know the spark 
>> session and spark context are thread safe (and actually have tools to manage 
>> jobs from different threads) but the question is, can I use the same 
>> dataframe in both threads.
>> The idea would be to create a dataframe in the main thread and then in two 
>> sub threads do different transformations and actions on it.
>> I understand that some things might not be thread safe (e.g. if I unpersist 
>> in one thread it would affect the other. Checkpointing would cause similar 
>> issues), however, I can’t find any documentation as to what operations (if 
>> any) are thread safe.
>>  
>> Thanks,
>> Assaf.
>>  


RE: is dataframe thread safe?

2017-02-12 Thread Mendelson, Assaf
There is no threads within maps here. The idea is to have two jobs on two 
different threads which use the same dataframe (which is cached btw).
This does not override spark’s parallel execution of transformation or any 
such. The documentation (job scheduling) actually hints at this option but 
doesn’t say specifically if it is supported when the same dataframe is used.
As for configuring the scheduler, this would not work. First it would mean that 
the same cached dataframe cannot be used, I would have to add some additional 
configuration such as alluxio (and it would still have to 
serialize/deserialize) as opposed to using the cached data. Furthermore, 
multi-tenancy between applications is limited to either dividing the cluster 
between the applications or using dynamic allocation (which has its own 
overheads).

Therefore Sean’s answer is what I was looking for (and hoping for…)
Assaf

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 2:46 PM
To: Sean Owen
Cc: Mendelson, Assaf; user
Subject: Re: is dataframe thread safe?

I did not doubt that the submission of several jobs of one application makes 
sense. However, he want to create threads within maps etc., which looks like 
calling for issues (not only for running the application itself, but also for 
operating it in production within a shared cluster). I would rely for parallel 
execution of the transformations on the out-of-the-box functionality within 
Spark.

For me he looks for a solution that can be achieved by a simple configuration 
of the scheduler in Spark, yarn or mesos. In this way the application would be 
more maintainable in production.

On 12 Feb 2017, at 11:45, Sean Owen 
mailto:so...@cloudera.com>> wrote:
No this use case is perfectly sensible. Yes it is thread safe.
On Sun, Feb 12, 2017, 10:30 Jörn Franke 
mailto:jornfra...@gmail.com>> wrote:
I think you should have a look at the spark documentation. It has something 
called scheduler who does exactly this. In more sophisticated environments yarn 
or mesos do this for you.

Using threads for transformations does not make sense.

On 12 Feb 2017, at 09:50, Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
I know spark takes care of executing everything in a distributed manner, 
however, spark also supports having multiple threads on the same spark 
session/context and knows (Through fair scheduler) to distribute the tasks from 
them in a round robin.

The question is, can those two actions (with a different set of 
transformations) be applied to the SAME dataframe.

Let’s say I want to do something like:



Val df = ???
df.cache()
df.count()

def f1(df: DataFrame): Unit = {
  val df1 = df.groupby(something).agg(some aggs)
  df1.write.parquet(“some path”)
}

def f2(df: DataFrame): Unit = {
  val df2 = df.groupby(something else).agg(some different aggs)
  df2.write.parquet(“some path 2”)
}

f1(df)
f2(df)

df.unpersist()

if the aggregations do not use the full cluster (e.g. because of data skewness, 
because there aren’t enough partitions or any other reason) then this would 
leave the cluster under utilized.

However, if I would call f1 and f2 on different threads, then df2 can use free 
resources f1 has not consumed and the overall utilization would improve.

Of course, I can do this only if the operations on the dataframe are thread 
safe. For example, if I would do a cache in f1 and an unpersist in f2 I would 
get an inconsistent result. So my question is, what, if any are the legal 
operations to use on a dataframe so I could do the above.

Thanks,
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Sunday, February 12, 2017 10:39 AM
To: Mendelson, Assaf
Cc: user
Subject: Re: is dataframe thread safe?

I am not sure what you are trying to achieve here. Spark is taking care of 
executing the transformations in a distributed fashion. This means you must not 
use threads - it does not make sense. Hence, you do not find documentation 
about it.

On 12 Feb 2017, at 09:06, Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
Hi,
I was wondering if dataframe is considered thread safe. I know the spark 
session and spark context are thread safe (and actually have tools to manage 
jobs from different threads) but the question is, can I use the same dataframe 
in both threads.
The idea would be to create a dataframe in the main thread and then in two sub 
threads do different transformations and actions on it.
I understand that some things might not be thread safe (e.g. if I unpersist in 
one thread it would affect the other. Checkpointing would cause similar 
issues), however, I can’t find any documentation as to what operations (if any) 
are thread safe.

Thanks,
Assaf.



Re: Getting exit code of pipe()

2017-02-12 Thread Xuchen Yao
Cool that's exactly what I was looking for! Thanks!

How does one output the status into stdout? I mean, how does one capture
the status output of pipe() command?

On Sat, Feb 11, 2017 at 9:50 AM, Felix Cheung 
wrote:

> Do you want the job to fail if there is an error exit code?
>
> You could set checkCode to True
> spark.apache.org/docs/latest/api/python/pyspark.html?
> highlight=pipe#pyspark.RDD.pipe
>
> Otherwise maybe you want to output the status into stdout so you could
> process it individually.
>
>
> _
> From: Xuchen Yao 
> Sent: Friday, February 10, 2017 11:18 AM
> Subject: Getting exit code of pipe()
> To: 
>
>
>
> Hello Community,
>
> I have the following Python code that calls an external command:
>
> rdd.pipe('run.sh', env=os.environ).collect()
>
> run.sh can either exit with status 1 or 0, how could I get the exit code
> from Python? Thanks!
>
> Xuchen
>
>
>


Unsubscribe

2017-02-12 Thread Vitásek , Ladislav



Add hive-site.xml at runtime

2017-02-12 Thread Shivam Sharma
Hi,

I have multiple hive configurations(hive-site.xml) and because of that only
I am not able to add any hive configuration in spark *conf* directory. I
want to add this configuration file at start of any *spark-submit* or
*spark-shell*. This conf file is huge so *--conf* is not a option for me.

Thanks

-- 
Shivam Sharma


Re: Etl with spark

2017-02-12 Thread Sam Elamin
thanks Ayan but i was hoping to remove the dependency on a file and just
use in memory list or dictionary

So from the reading I've done today it seems.the concept of a bespoke async
method doesn't really apply in spsrk since the cluster deals with
distributing the work load


Am I mistaken?

Regards
Sam
On Sun, 12 Feb 2017 at 12:13, ayan guha  wrote:

You can store the list of keys (I believe you use them in source file path,
right?) in a file, one key per line. Then you can read the file using
sc.textFile (So you will get a RDD of file paths) and then apply your
function as a map.

r = sc.textFile(list_file).map(your_function)

HTH

On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin 
wrote:

Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam




-- 
Best Regards,
Ayan Guha


Re: Etl with spark

2017-02-12 Thread Miguel Morales
You can parallelize the collection of s3 keys and then pass that to your map 
function so that files are read in parallel.

Sent from my iPhone

> On Feb 12, 2017, at 9:41 AM, Sam Elamin  wrote:
> 
> thanks Ayan but i was hoping to remove the dependency on a file and just use 
> in memory list or dictionary 
> 
> So from the reading I've done today it seems.the concept of a bespoke async 
> method doesn't really apply in spsrk since the cluster deals with 
> distributing the work load 
> 
> 
> Am I mistaken?
> 
> Regards 
> Sam 
> On Sun, 12 Feb 2017 at 12:13, ayan guha  wrote:
> You can store the list of keys (I believe you use them in source file path, 
> right?) in a file, one key per line. Then you can read the file using 
> sc.textFile (So you will get a RDD of file paths) and then apply your 
> function as a map.
> 
> r = sc.textFile(list_file).map(your_function)
> 
> HTH
> 
> On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin  wrote:
> Hey folks 
> 
> Really simple question here. I currently have an etl pipeline that reads from 
> s3 and saves the data to an endstore 
> 
> 
> I have to read from a list of keys in s3 but I am doing a raw extract then 
> saving. Only some of the extracts have a simple transformation but overall 
> the code looks the same 
> 
> 
> I abstracted away this logic into a method that takes in an s3 path does the 
> common transformations and saves to source 
> 
> 
> But the job takes about 10 mins or so because I'm iteratively going down a 
> list of keys 
> 
> Is it possible to asynchronously do this?
> 
> FYI I'm using spark.read.json to read from s3 because it infers my schema 
> 
> Regards 
> Sam 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


Re: Etl with spark

2017-02-12 Thread Sam Elamin
Yup I ended up doing just that thank you both
On Sun, 12 Feb 2017 at 18:33, Miguel Morales 
wrote:

> You can parallelize the collection of s3 keys and then pass that to your
> map function so that files are read in parallel.
>
> Sent from my iPhone
>
> On Feb 12, 2017, at 9:41 AM, Sam Elamin  wrote:
>
> thanks Ayan but i was hoping to remove the dependency on a file and just
> use in memory list or dictionary
>
> So from the reading I've done today it seems.the concept of a bespoke
> async method doesn't really apply in spsrk since the cluster deals with
> distributing the work load
>
>
> Am I mistaken?
>
> Regards
> Sam
> On Sun, 12 Feb 2017 at 12:13, ayan guha  wrote:
>
> You can store the list of keys (I believe you use them in source file
> path, right?) in a file, one key per line. Then you can read the file using
> sc.textFile (So you will get a RDD of file paths) and then apply your
> function as a map.
>
> r = sc.textFile(list_file).map(your_function)
>
> HTH
>
> On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin 
> wrote:
>
> Hey folks
>
> Really simple question here. I currently have an etl pipeline that reads
> from s3 and saves the data to an endstore
>
>
> I have to read from a list of keys in s3 but I am doing a raw extract then
> saving. Only some of the extracts have a simple transformation but overall
> the code looks the same
>
>
> I abstracted away this logic into a method that takes in an s3 path does
> the common transformations and saves to source
>
>
> But the job takes about 10 mins or so because I'm iteratively going down a
> list of keys
>
> Is it possible to asynchronously do this?
>
> FYI I'm using spark.read.json to read from s3 because it infers my schema
>
> Regards
> Sam
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: is dataframe thread safe?

2017-02-12 Thread Timur Shenkao
Hello,

I suspect that your need isn't parallel execution but parallel data access.
In that case, use Alluxio or Ignite.

Or, more exotic, one Spark job writes to Kafka and the other ones read from
Kafka.

Sincerely yours, Timur

On Sun, Feb 12, 2017 at 2:30 PM, Mendelson, Assaf 
wrote:

> There is no threads within maps here. The idea is to have two jobs on two
> different threads which use the same dataframe (which is cached btw).
>
> This does not override spark’s parallel execution of transformation or any
> such. The documentation (job scheduling) actually hints at this option but
> doesn’t say specifically if it is supported when the same dataframe is used.
>
> As for configuring the scheduler, this would not work. First it would mean
> that the same cached dataframe cannot be used, I would have to add some
> additional configuration such as alluxio (and it would still have to
> serialize/deserialize) as opposed to using the cached data. Furthermore,
> multi-tenancy between applications is limited to either dividing the
> cluster between the applications or using dynamic allocation (which has its
> own overheads).
>
>
>
> Therefore Sean’s answer is what I was looking for (and hoping for…)
>
> Assaf
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com]
> *Sent:* Sunday, February 12, 2017 2:46 PM
> *To:* Sean Owen
> *Cc:* Mendelson, Assaf; user
>
> *Subject:* Re: is dataframe thread safe?
>
>
>
> I did not doubt that the submission of several jobs of one application
> makes sense. However, he want to create threads within maps etc., which
> looks like calling for issues (not only for running the application itself,
> but also for operating it in production within a shared cluster). I would
> rely for parallel execution of the transformations on the out-of-the-box
> functionality within Spark.
>
>
>
> For me he looks for a solution that can be achieved by a simple
> configuration of the scheduler in Spark, yarn or mesos. In this way the
> application would be more maintainable in production.
>
>
> On 12 Feb 2017, at 11:45, Sean Owen  wrote:
>
> No this use case is perfectly sensible. Yes it is thread safe.
>
> On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:
>
> I think you should have a look at the spark documentation. It has
> something called scheduler who does exactly this. In more sophisticated
> environments yarn or mesos do this for you.
>
>
>
> Using threads for transformations does not make sense.
>
>
> On 12 Feb 2017, at 09:50, Mendelson, Assaf 
> wrote:
>
> I know spark takes care of executing everything in a distributed manner,
> however, spark also supports having multiple threads on the same spark
> session/context and knows (Through fair scheduler) to distribute the tasks
> from them in a round robin.
>
>
>
> The question is, can those two actions (with a different set of
> transformations) be applied to the SAME dataframe.
>
>
>
> Let’s say I want to do something like:
>
>
>
>
>
>
>
> Val df = ???
>
> df.cache()
>
> df.count()
>
>
>
> def f1(df: DataFrame): Unit = {
>
>   val df1 = df.groupby(something).agg(some aggs)
>
>   df1.write.parquet(“some path”)
>
> }
>
>
>
> def f2(df: DataFrame): Unit = {
>
>   val df2 = df.groupby(something else).agg(some different aggs)
>
>   df2.write.parquet(“some path 2”)
>
> }
>
>
>
> f1(df)
>
> f2(df)
>
>
>
> df.unpersist()
>
>
>
> if the aggregations do not use the full cluster (e.g. because of data
> skewness, because there aren’t enough partitions or any other reason) then
> this would leave the cluster under utilized.
>
>
>
> However, if I would call f1 and f2 on different threads, then df2 can use
> free resources f1 has not consumed and the overall utilization would
> improve.
>
>
>
> Of course, I can do this only if the operations on the dataframe are
> thread safe. For example, if I would do a cache in f1 and an unpersist in
> f2 I would get an inconsistent result. So my question is, what, if any are
> the legal operations to use on a dataframe so I could do the above.
>
>
>
> Thanks,
>
> Assaf.
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com ]
> *Sent:* Sunday, February 12, 2017 10:39 AM
> *To:* Mendelson, Assaf
> *Cc:* user
> *Subject:* Re: is dataframe thread safe?
>
>
>
> I am not sure what you are trying to achieve here. Spark is taking care of
> executing the transformations in a distributed fashion. This means you must
> not use threads - it does not make sense. Hence, you do not find
> documentation about it.
>
>
> On 12 Feb 2017, at 09:06, Mendelson, Assaf 
> wrote:
>
> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be

Re: Getting exit code of pipe()

2017-02-12 Thread Felix Cheung
I mean if you are running a script instead of exiting with a code it could 
print out something.

Sounds like checkCode is what you want though.


_
From: Xuchen Yao mailto:yaoxuc...@gmail.com>>
Sent: Sunday, February 12, 2017 8:33 AM
Subject: Re: Getting exit code of pipe()
To: Felix Cheung mailto:felixcheun...@hotmail.com>>
Cc: mailto:user@spark.apache.org>>


Cool that's exactly what I was looking for! Thanks!

How does one output the status into stdout? I mean, how does one capture the 
status output of pipe() command?

On Sat, Feb 11, 2017 at 9:50 AM, Felix Cheung 
mailto:felixcheun...@hotmail.com>> wrote:
Do you want the job to fail if there is an error exit code?

You could set checkCode to True
spark.apache.org/docs/latest/api/python/pyspark.html?highlight=pipe#pyspark.RDD.pipe

Otherwise maybe you want to output the status into stdout so you could process 
it individually.


_
From: Xuchen Yao mailto:yaoxuc...@gmail.com>>
Sent: Friday, February 10, 2017 11:18 AM
Subject: Getting exit code of pipe()
To: mailto:user@spark.apache.org>>



Hello Community,

I have the following Python code that calls an external command:

rdd.pipe('run.sh', env=os.environ).collect()

run.sh can either exit with status 1 or 0, how could I get the exit code from 
Python? Thanks!

Xuchen







Repartition function duplicates data

2017-02-12 Thread F. Amara
Hi,

In my spark streaming application I'm trying to partition a data stream into
multiple substreams. I read data from a Kafka producer and process the data
received real-time. The data is taken in through JavaInputDStream as a
directStream. Data is received without any loss. The need is to partition
the data I receive. So I used repartition() to do the above task but this
resulted in duplicating the same set of data across all partitions rather
than dividing (i.e., hashing) it according to the number of partitions
specified. Could anyone please explain a solution? I have shown a sample
code snippet below for your reference.



//
JavaStreamingContext ssc = null;

System.setProperty("spark.executor.memory", "512m");
System.setProperty("spark.streaming.unpersist", "false");
HashMap map = new HashMap();
map.put("spark.executor.memory", "512m");
map.put("spark.streaming.unpersist", "false");

try {
ssc = new JavaStreamingContext("spark://host1:7077", "app",
new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(Benchmark.class),
map);
} catch (Exception e) {
e.printStackTrace();
}

Map kafkaParams = new HashMap();
//

Set topic = Collections.singleton("inputStream");

final JavaInputDStream> stream
=
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topic, kafkaParams)
);


JavaDStream incrementStream1 =
stream.map(incrementFunction2);
JavaDStream stream2 = incrementStream1.repartition(2); 

//



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Repartition-function-duplicates-data-tp28383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-12 Thread nguyen duc Tuan
After all, I switched back to LSH implementation that I used before (
https://github.com/karlhigley/spark-neighbors ). I can run on my dataset
now. If someone has any suggestion, please tell me.
Thanks.

2017-02-12 9:25 GMT+07:00 nguyen duc Tuan :

> Hi Timur,
> 1) Our data is transformed to dataset of Vector already.
> 2) If I use RandomSignProjectLSH, the job dies after I call
> approximateSimilarJoin. I tried to use Minhash instead, the job is still
> slow. I don't thinks the problem is related to the GC. The time for GC is
> small compare with the time for computation. Here is some screenshots of my
> job.
> Thanks
>
> 2017-02-12 8:01 GMT+07:00 Timur Shenkao :
>
>> Hello,
>>
>> 1) Are you sure that your data is "clean"?  No unexpected missing values?
>> No strings in unusual encoding? No additional or missing columns ?
>> 2) How long does your job run? What about garbage collector parameters?
>> Have you checked what happens with jconsole / jvisualvm ?
>>
>> Sincerely yours, Timur
>>
>> On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
>> wrote:
>>
>>> Hi Nick,
>>> Because we use *RandomSignProjectionLSH*, there is only one parameter
>>> for LSH is the number of hashes. I try with small number of hashes (2) but
>>> the error is still happens. And it happens when I call similarity join.
>>> After transformation, the size of  dataset is about 4G.
>>>
>>> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>>>
 What other params are you using for the lsh transformer?

 Are the issues occurring during transform or during the similarity join?


 On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
 wrote:

> hi Das,
> In general, I will apply them to larger datasets, so I want to use
> LSH, which is more scaleable than the approaches as you suggested. Have 
> you
> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
> parameters/configuration to make it work ?
> Thanks.
>
> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>
> If it is 7m rows and 700k features (or say 1m features) brute force
> row similarity will run fine as well...check out spark-4823...you can
> compare quality with approximate variant...
> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan" 
> wrote:
>
> Hi everyone,
> Since spark 2.1.0 introduces LSH (http://spark.apache.org/docs/
> latest/ml-features.html#locality-sensitive-hashing), we want to use
> LSH to find approximately nearest neighbors. Basically, We have dataset
> with about 7M rows. we want to use cosine distance to meassure the
> similarity between items, so we use *RandomSignProjectionLSH* (
> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db)
> instead of *BucketedRandomProjectionLSH*. I try to tune some
> configurations such as serialization, memory fraction, executor memory
> (~6G), number of executors ( ~20), memory overhead ..., but nothing works.
> I often get error "java.lang.OutOfMemoryError: Java heap space" while
> running. I know that this implementation is done by engineer at Uber but I
> don't know right configurations,.. to run the algorithm at scale. Do they
> need very big memory to run it?
>
> Any help would be appreciated.
> Thanks
>
>
>
>>>
>>
>


How to measure IO time in Spark over S3

2017-02-12 Thread Gili Nachum
Hi!

How can I tell IO duration for a Spark application doing R/W from S3 (using
S3 as a filesystem sc.textFile("s3a://...")?
I would like to know the % of time doing IO of the overall app execution
time.

Gili.