Re: Edge AI with Spark

2020-09-24 Thread ayan guha
Too broad a question 😀 and the short answer is yes and long answer is it
depends.

Essentially spark is a compute engine so it can be wrapped into any
containerized model and deployed at the edge. I believe there are various
implemntation available



On Thu, 24 Sep 2020 at 5:19 pm, Marco Sassarini 
wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Hi,
>
>
> I'd like to know if Spark supports edge AI: can Spark
>
> run on physical device such as mobile devices running Android/iOS?
>
>
>
>
>
> Best regards,
>
>
> Marco Sassarini
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Marco SassariniArtificial Intelligence Department*
>
>
>
>
>
>
> office: +39 0434 562 978
>
>
>
> www.overit.it
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
Best Regards,
Ayan Guha


https://issues.apache.org/jira/browse/SPARK-18381

2020-09-24 Thread ayan guha
Anyone aware of any workaround for
https://issues.apache.org/jira/browse/SPARK-18381

Other than upgrade to Spark 3 I mean,,,

-- 
Best Regards,
Ayan Guha


Re: Scala vs Python for ETL with Spark

2020-10-10 Thread ayan guha
I have one observation: is "python udf is slow due to deserialization
penulty" still relevant? Even after arrow is used as in memory data mgmt
and so heavy investment from spark dev community on making pandas first
class citizen including Udfs.

As I work with multiple clients, my exp is org culture and available people
are most imp driver for this choice regardless the use case. Use case is
relevant only when there is a feature imparity

On Sun, 11 Oct 2020 at 7:39 am, Gourav Sengupta 
wrote:

> Not quite sure how meaningful this discussion is, but in case someone is
> really faced with this query the question still is 'what is the use case'?
> I am just a bit confused with the one size fits all deterministic approach
> here thought that those days were over almost 10 years ago.
> Regards
> Gourav
>
> On Sat, 10 Oct 2020, 21:24 Stephen Boesch,  wrote:
>
>> I agree with Wim's assessment of data engineering / ETL vs Data Science.
>>   I wrote pipelines/frameworks for large companies and scala was a much
>> better choice. But for ad-hoc work interfacing directly with data science
>> experiments pyspark presents less friction.
>>
>> On Sat, 10 Oct 2020 at 13:03, Mich Talebzadeh 
>> wrote:
>>
>>> Many thanks everyone for their valuable contribution.
>>>
>>> We all started with Spark a few years ago where Scala was the talk
>>> of the town. I agree with the note that as long as Spark stayed nish and
>>> elite, then someone with Scala knowledge was attracting premiums. In
>>> fairness in 2014-2015, there was not much talk of Data Science input (I may
>>> be wrong). But the world has moved on so to speak. Python itself has been
>>> around a long time (long being relative here). Most people either knew UNIX
>>> Shell, C, Python or Perl or a combination of all these. I recall we had a
>>> director a few years ago who asked our Hadoop admin for root password to
>>> log in to the edge node. Later he became head of machine learning
>>> somewhere else and he loved C and Python. So Python was a gift in disguise.
>>> I think Python appeals to those who are very familiar with CLI and shell
>>> programming (Not GUI fan). As some members alluded to there are more people
>>> around with Python knowledge. Most managers choose Python as the unifying
>>> development tool because they feel comfortable with it. Frankly I have not
>>> seen a manager who feels at home with Scala. So in summary it is a bit
>>> disappointing to abandon Scala and switch to Python just for the sake of it.
>>>
>>> Disclaimer: These are opinions and not facts so to speak :)
>>>
>>> Cheers,
>>>
>>>
>>> Mich
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, 9 Oct 2020 at 21:56, Mich Talebzadeh 
>>> wrote:
>>>
>>>> I have come across occasions when the teams use Python with Spark for
>>>> ETL, for example processing data from S3 buckets into Snowflake with Spark.
>>>>
>>>> The only reason I think they are choosing Python as opposed to Scala is
>>>> because they are more familiar with Python. Since Spark is written in
>>>> Scala, itself is an indication of why I think Scala has an edge.
>>>>
>>>> I have not done one to one comparison of Spark with Scala vs Spark with
>>>> Python. I understand for data science purposes most libraries like
>>>> TensorFlow etc. are written in Python but I am at loss to understand the
>>>> validity of using Python with Spark for ETL purposes.
>>>>
>>>> These are my understanding but they are not facts so I would like to
>>>> get some informed views on this if I can?
>>>>
>>>> Many thanks,
>>>>
>>>> Mich
>>>>
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>> --
Best Regards,
Ayan Guha


Re: Scala vs Python for ETL with Spark

2020-10-11 Thread ayan guha
But when you have fairly large volume of data that is where spark comes in
the party. And I assume the requirement of using spark is already
established in the original qs and the discussion is to use python vs
scala/java.

On Sun, 11 Oct 2020 at 10:51 pm, Sasha Kacanski  wrote:

> If org has folks that can do python seriously why then spark in the first
> place. You can do workflow on your own, streaming or batch or what ever you
> want.
> I would not do anything else aside from python, but that is me.
>
> On Sat, Oct 10, 2020, 9:42 PM ayan guha  wrote:
>
>> I have one observation: is "python udf is slow due to deserialization
>> penulty" still relevant? Even after arrow is used as in memory data mgmt
>> and so heavy investment from spark dev community on making pandas first
>> class citizen including Udfs.
>>
>> As I work with multiple clients, my exp is org culture and available
>> people are most imp driver for this choice regardless the use case. Use
>> case is relevant only when there is a feature imparity
>>
>> On Sun, 11 Oct 2020 at 7:39 am, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Not quite sure how meaningful this discussion is, but in case someone is
>>> really faced with this query the question still is 'what is the use case'?
>>> I am just a bit confused with the one size fits all deterministic
>>> approach here thought that those days were over almost 10 years ago.
>>> Regards
>>> Gourav
>>>
>>> On Sat, 10 Oct 2020, 21:24 Stephen Boesch,  wrote:
>>>
>>>> I agree with Wim's assessment of data engineering / ETL vs Data
>>>> Science.I wrote pipelines/frameworks for large companies and scala was
>>>> a much better choice. But for ad-hoc work interfacing directly with data
>>>> science experiments pyspark presents less friction.
>>>>
>>>> On Sat, 10 Oct 2020 at 13:03, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Many thanks everyone for their valuable contribution.
>>>>>
>>>>> We all started with Spark a few years ago where Scala was the talk
>>>>> of the town. I agree with the note that as long as Spark stayed nish and
>>>>> elite, then someone with Scala knowledge was attracting premiums. In
>>>>> fairness in 2014-2015, there was not much talk of Data Science input (I 
>>>>> may
>>>>> be wrong). But the world has moved on so to speak. Python itself has been
>>>>> around a long time (long being relative here). Most people either knew 
>>>>> UNIX
>>>>> Shell, C, Python or Perl or a combination of all these. I recall we had a
>>>>> director a few years ago who asked our Hadoop admin for root password to
>>>>> log in to the edge node. Later he became head of machine learning
>>>>> somewhere else and he loved C and Python. So Python was a gift in 
>>>>> disguise.
>>>>> I think Python appeals to those who are very familiar with CLI and shell
>>>>> programming (Not GUI fan). As some members alluded to there are more 
>>>>> people
>>>>> around with Python knowledge. Most managers choose Python as the unifying
>>>>> development tool because they feel comfortable with it. Frankly I have not
>>>>> seen a manager who feels at home with Scala. So in summary it is a bit
>>>>> disappointing to abandon Scala and switch to Python just for the sake of 
>>>>> it.
>>>>>
>>>>> Disclaimer: These are opinions and not facts so to speak :)
>>>>>
>>>>> Cheers,
>>>>>
>>>>>
>>>>> Mich
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 9 Oct 2020 at 21:56, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> I have come across occasions when the teams use Python with Spark for
>>>>>> ETL, for example processing data from S3 buckets into Snowflake with 
>>>>>> Spark.
>>>>>>
>>>>>> The only reason I think they are choosing Python as opposed to Scala
>>>>>> is because they are more familiar with Python. Since Spark is written in
>>>>>> Scala, itself is an indication of why I think Scala has an edge.
>>>>>>
>>>>>> I have no

Re: Count distinct and driver memory

2020-10-19 Thread ayan guha
Do not do collect. This brings results back to driver. instead do count
distinct and write it out.

On Tue, 20 Oct 2020 at 6:43 am, Nicolas Paris 
wrote:

> > I was caching it because I didn't want to re-execute the DAG when I
> > ran the count query. If you have a spark application with multiple
> > actions, Spark reexecutes the entire DAG for each action unless there
> > is a cache in between. I was trying to avoid reloading 1/2 a terabyte
> > of data.  Also, cache should use up executor memory, not driver
> > memory.
> why not counting the parquet file instead? writing/reading a parquet
> files is more efficients than caching in my experience.
> if you really need caching you could choose a better strategy such
> DISK.
>
> Lalwani, Jayesh  writes:
>
> > I was caching it because I didn't want to re-execute the DAG when I ran
> the count query. If you have a spark application with multiple actions,
> Spark reexecutes the entire DAG for each action unless there is a cache in
> between. I was trying to avoid reloading 1/2 a terabyte of data.  Also,
> cache should use up executor memory, not driver memory.
> >
> > As it turns out cache was the problem. I didn't expect cache to take
> Executor memory and spill over to disk. I don't know why it's taking driver
> memory. The input data has millions of partitions which results in millions
> of tasks. Perhaps the high memory usage is a side effect of caching the
> results of lots of tasks.
> >
> > On 10/19/20, 1:27 PM, "Nicolas Paris"  wrote:
> >
> > CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
> >
> >
> >
> > > Before I write the data frame to parquet, I do df.cache. After
> writing
> > > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> > if you write the df to parquet, why would you also cache it ?
> caching by
> > default loads the memory. this might affect  later use, such
> > collect. the resulting GC can be explained by both caching and
> collect
> >
> >
> > Lalwani, Jayesh  writes:
> >
> > > I have a Dataframe with around 6 billion rows, and about 20
> columns. First of all, I want to write this dataframe out to parquet. The,
> Out of the 20 columns, I have 3 columns of interest, and I want to find how
> many distinct values of the columns are there in the file. I don’t need the
> actual distinct values. I just need the count. I knoe that there are around
> 10-16million distinct values
> > >
> > > Before I write the data frame to parquet, I do df.cache. After
> writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> > >
> > > When I run this, I see that the memory usage on my driver steadily
> increases until it starts getting future time outs. I guess it’s spending
> time in GC. Does countDistinct cause this behavior? Does Spark try to get
> all 10 million distinct values into the driver? Is countDistinct not
> recommended for data frames with large number of distinct values?
> > >
> > > What’s the solution? Should I use approx._count_distinct?
> >
> >
> > --
> > nicolas paris
> >
> > -
> >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> --
> nicolas paris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Why spark-submit works with package not with jar

2020-10-20 Thread ayan guha
Hi

One way to think of this is --packages is better when you have third party
dependency and --jars is better when you have custom in-house built jars.

On Wed, 21 Oct 2020 at 3:44 am, Mich Talebzadeh 
wrote:

> Thanks Sean and Russell. Much appreciated.
>
> Just to clarify recently I had issues with different versions of Google
> Guava jar files in building Uber jar file (to evict the unwanted ones).
> These used to work a year and half ago using Google Dataproc compute
> engines (comes with Spark preloaded) and I could create an Uber jar file.
>
> Unfortunately this has become problematic now so tried to use spark-submit
> instead as follows:
>
> ${SPARK_HOME}/bin/spark-submit \
> --master yarn \
> --deploy-mode client \
> --conf spark.executor.memoryOverhead=3000 \
> --class org.apache.spark.repl.Main \
> --name "Spark shell on Yarn" "$@"
> --driver-class-path /home/hduser/jars/ddhybrid.jar \
> --jars /home/hduser/jars/spark-bigquery-latest.jar, \
>/home/hduser/jars/ddhybrid.jar \
> --packages com.github.samelamin:spark-bigquery_2.11:0.2.6
>
> Effectively tailored spark-shell. However, I do not think there is a
> mechanism to resolve jar conflicts without  building an Uber jar file
> through SBT?
>
> Cheers
>
>
>
> On Tue, 20 Oct 2020 at 16:54, Russell Spitzer 
> wrote:
>
>> --jar Adds only that jar
>> --package adds the Jar and a it's dependencies listed in maven
>>
>> On Tue, Oct 20, 2020 at 10:50 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a scenario that I use in Spark submit as follows:
>>>
>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar,
>>> */home/hduser/jars/spark-bigquery_2.11-0.2.6.jar*
>>>
>>> As you can see the jar files needed are added.
>>>
>>>
>>> This comes back with error message as below
>>>
>>>
>>> Creating model test.weights_MODEL
>>>
>>> java.lang.NoClassDefFoundError:
>>> com/google/api/client/http/HttpRequestInitializer
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
>>>
>>>   at
>>> com.samelamin.spark.bigquery.BigQuerySQLContext.runDMLQuery(BigQuerySQLContext.scala:105)
>>>
>>>   ... 76 elided
>>>
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.google.api.client.http.HttpRequestInitializer
>>>
>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>
>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>
>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>
>>>
>>>
>>> So there is an issue with finding the class, although the jar file used
>>>
>>>
>>> /home/hduser/jars/spark-bigquery_2.11-0.2.6.jar
>>>
>>> has it.
>>>
>>>
>>> Now if *I remove the above jar file and replace it with the same
>>> version but package* it works!
>>>
>>>
>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar
>>> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6*
>>>
>>>
>>> I have read the write-ups about packages searching the maven
>>> libraries etc. Not convinced why using the package should make so much
>>> difference between a failure and success. In other words, when to use a
>>> package rather than a jar.
>>>
>>>
>>> Any ideas will be appreciated.
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>> --
Best Regards,
Ayan Guha


Re: Apache Spark Connector for SQL Server and Azure SQL

2020-10-26 Thread ayan guha
I would suggest to ask microsoft and databricks, this forum is for apache
spark.

if you are interested please drop me a note separately as I m keen to
understand the issue as we use same setup

Ayan

On Mon, 26 Oct 2020 at 11:53 pm,  wrote:

> Hi,
>
>
>
> In a project where I work with Databricks, we use this connector to read /
> write data to Azure SQL Database. Currently with Spark 2.4.5 and Scala 2.11.
>
>
>
> But those setups are getting old. What happens if we update Spark to 3.0.1
> or higher and Scala 2.12.
>
> This connector does not work according to the versions it supports. What
> should we do? Don't use the connector or is there another way to work?
>
>
>
> I appreciate any type of information that helps me.
>
>
>
> Med vänlig hälsning / Best regards
>
> *Alejandra Lemmo*
>
> *Data Engineer *
> Customer Analytic
>
> Address: Evenemangsgatan 13, 169 56 Solna
> <https://www.google.com/maps/search/Evenemangsgatan+13,+169+56+Solna?entry=gmail&source=g>,
> 16956 Solna
>
> D +46735249832
>
> M +46735249832
>
>
> alejandra.le...@vattenfall.com
> www.vattenfall.se
>
> Please consider the environment before printing this e-mail
>
> Confidentiality: C2 - Internal
>
-- 
Best Regards,
Ayan Guha


Re: How to apply ranger policies on Spark

2020-11-24 Thread ayan guha
AFAIK, Ranger secures Hive (JDBC) server only. Unfortunately Spark does not
interact with HS2, but directly interacts with Metastore. Hence, the only
way to use Ranger policies if you use Hive via JDBC. Another option is HDFS
or Storage ACLs, which are coarse grain control over file path etc. You can
use Ranger to manage HDFS ACLs as well. In such scenario spark will be
bound by those policies.

On Tue, Nov 24, 2020 at 5:26 PM Dennis Suhari 
wrote:

> Hi Joyan,
>
> Spark uses its own metastore. Using Ranger you need to use the Hive
> Metastore. For this you need to point to Hive Metastore and use HiveContext
> in your Spark Code.
>
> Br,
>
> Dennis
>
> Von meinem iPhone gesendet
>
> Am 23.11.2020 um 19:04 schrieb joyan sil :
>
> 
>
> Hi,
>
> We have ranger policies defined on the hive table and authorization works
> as expected when we use hive cli and beeline. But when we access those hive
> tables using spark-shell or spark-submit it does not work.
>
>  Any suggestions to make Ranger work with Spark?
>
>
> Regards
>
> Joyan
>
>

-- 
Best Regards,
Ayan Guha


Re: Rdd - zip with index

2021-03-23 Thread ayan guha
Best case is use dataframe and df.columns will automatically give you
column names. Are you sure your file is indeed in csv? maybe it is easier
if you share the code?

On Wed, 24 Mar 2021 at 2:12 pm, Sean Owen  wrote:

> It would split 10GB of CSV into multiple partitions by default, unless
> it's gzipped. Something else is going on here.
>
> ‪On Tue, Mar 23, 2021 at 10:04 PM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎
>  wrote:‬
>
>> I’m not Spark core developer and do not want to confuse you but it seems
>> logical to me that just reading from single file (no matter what format of
>> the file is used) gives no parallelism unless you do repartition by some
>> column just after csv load, but the if you’re telling you’ve already tried
>> repartition with no luck...
>>
>>
>> > On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed 
>> wrote:
>> >
>> > So spark by default doesn’t split the large 10gb file when loaded?
>> >
>> > Sent from my iPhone
>> >
>> >> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) <
>> yur...@gmail.com> wrote:
>> >>
>> >> Hi, Mohammed
>> >> I think that the reason that only one executor is running and have
>> single partition is because you have single file that might be read/loaded
>> into memory.
>> >>
>> >> In order to achieve better parallelism I’d suggest to split the csv
>> file.
>> >>
>>
>> --
Best Regards,
Ayan Guha


Re: Rdd - zip with index

2021-03-24 Thread ayan guha
etorship
>>>> Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor
>>>> (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company
>>>> Registration No. (2),Proprietorship Category (2),Country Incorporated
>>>> (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2)
>>>> Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship
>>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor
>>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company
>>>> Registration No. (4),Proprietorship Category (4),Country Incorporated
>>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4)
>>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator
>>>>
>>>>
>>>> 10GB is not much of a big CSV file
>>>>
>>>> that will resolve the header anyway.
>>>>
>>>>
>>>> Also how are you running the spark, in a local mode (single jvm) or
>>>> other distributed modes (yarn, standalone) ?
>>>>
>>>>
>>>> HTH
>>>>
>>>

-- 
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi

We are using a haversine distance function for this, and wrapping it in
udf.

from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
from pyspark.sql.types import *

def haversine_distance(long_x, lat_x, long_y, lat_y):
return acos(
sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
cos(toRadians(long_x) - toRadians(long_y))
) * lit(6371.0)

distudf = udf(haversine_distance, FloatType())

in case you just want to use just Spark SQL, you can still utilize the
functions shown above to implement in SQL.

Any reason you do not want to use UDF?

Credit
<https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>


On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:

> Hi All,
>
>
>
> I have a requirement to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
> dataframe *with the help of from *geopy* import *distance *without using
> *UDF* (user defined function)*,*Please help how to achieve this scenario
> and do the needful.
>
>
>
> Thanks,
>
> Ankamma Rao B
>


-- 
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi Sean - absolutely open to suggestions.

My impression was using spark native functions should provide similar perf
as scala ones because serialization penalty should not be there, unlike
native python udfs.

Is it wrong understanding?



On Fri, 9 Apr 2021 at 10:55 pm, Rao Bandaru  wrote:

> Hi All,
>
>
> yes ,i need to add the below scenario based code to the executing spark
> job,while executing this it took lot of time to complete,please suggest
> best way to get below requirement without using UDF
>
>
> Thanks,
>
> Ankamma Rao B
> --
> *From:* Sean Owen 
> *Sent:* Friday, April 9, 2021 6:11 PM
> *To:* ayan guha 
> *Cc:* Rao Bandaru ; User 
> *Subject:* Re: [Spark SQL]:to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk
> dataframe
>
> This can be significantly faster with a pandas UDF, note, because you can
> vectorize the operations.
>
> On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:
>
> Hi
>
> We are using a haversine distance function for this, and wrapping it in
> udf.
>
> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
> from pyspark.sql.types import *
>
> def haversine_distance(long_x, lat_x, long_y, lat_y):
> return acos(
> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
> cos(toRadians(long_x) - toRadians(long_y))
> ) * lit(6371.0)
>
> distudf = udf(haversine_distance, FloatType())
>
> in case you just want to use just Spark SQL, you can still utilize the
> functions shown above to implement in SQL.
>
> Any reason you do not want to use UDF?
>
> Credit
> <https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>
>
>
> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru  wrote:
>
> Hi All,
>
>
>
> I have a requirement to calculate distance between four
> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
> dataframe *with the help of from *geopy* import *distance *without using
> *UDF* (user defined function)*,*Please help how to achieve this scenario
> and do the needful.
>
>
>
> Thanks,
>
> Ankamma Rao B
>
>
>
> --
> Best Regards,
> Ayan Guha
>
> --
Best Regards,
Ayan Guha


Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
Hi - interesting stuff. My stand always was to use spark native functions,
pandas and python native - in this order.

To OP - did you try the code? What kind of perf are you seeing? Just
curious, why do you think UDFs are bad?

On Sat, 10 Apr 2021 at 2:36 am, Sean Owen  wrote:

> Actually, good question, I'm not sure. I don't think that Spark would
> vectorize these operations over rows.
> Whereas in a pandas UDF, given a DataFrame, you can apply operations like
> sin to 1000s of values at once in native code via numpy. It's trivially
> 'vectorizable' and I've seen good wins over, at least, a single-row UDF.
>
> On Fri, Apr 9, 2021 at 9:14 AM ayan guha  wrote:
>
>> Hi Sean - absolutely open to suggestions.
>>
>> My impression was using spark native functions should provide similar
>> perf as scala ones because serialization penalty should not be there,
>> unlike native python udfs.
>>
>> Is it wrong understanding?
>>
>>
>>
>> On Fri, 9 Apr 2021 at 10:55 pm, Rao Bandaru  wrote:
>>
>>> Hi All,
>>>
>>>
>>> yes ,i need to add the below scenario based code to the executing spark
>>> job,while executing this it took lot of time to complete,please suggest
>>> best way to get below requirement without using UDF
>>>
>>>
>>> Thanks,
>>>
>>> Ankamma Rao B
>>> --
>>> *From:* Sean Owen 
>>> *Sent:* Friday, April 9, 2021 6:11 PM
>>> *To:* ayan guha 
>>> *Cc:* Rao Bandaru ; User 
>>> *Subject:* Re: [Spark SQL]:to calculate distance between four
>>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk
>>> dataframe
>>>
>>> This can be significantly faster with a pandas UDF, note, because you
>>> can vectorize the operations.
>>>
>>> On Fri, Apr 9, 2021, 7:32 AM ayan guha  wrote:
>>>
>>> Hi
>>>
>>> We are using a haversine distance function for this, and wrapping it in
>>> udf.
>>>
>>> from pyspark.sql.functions import acos, cos, sin, lit, toRadians, udf
>>> from pyspark.sql.types import *
>>>
>>> def haversine_distance(long_x, lat_x, long_y, lat_y):
>>> return acos(
>>> sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
>>> cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
>>> cos(toRadians(long_x) - toRadians(long_y))
>>> ) * lit(6371.0)
>>>
>>> distudf = udf(haversine_distance, FloatType())
>>>
>>> in case you just want to use just Spark SQL, you can still utilize the
>>> functions shown above to implement in SQL.
>>>
>>> Any reason you do not want to use UDF?
>>>
>>> Credit
>>> <https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark>
>>>
>>>
>>> On Fri, Apr 9, 2021 at 10:19 PM Rao Bandaru 
>>> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> I have a requirement to calculate distance between four
>>> coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the *pysaprk
>>> dataframe *with the help of from *geopy* import *distance *without
>>> using *UDF* (user defined function)*,*Please help how to achieve this
>>> scenario and do the needful.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Ankamma Rao B
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
Best Regards,
Ayan Guha


Re: Python level of knowledge for Spark and PySpark

2021-04-14 Thread ayan guha
The answer always is "it depends". At the outset it seems you are in pretty
good shape and have all the key skills you need. All I can suggest is try
to take inherent benefits of the language and hone your coding practices


On Thu, 15 Apr 2021 at 2:25 am, ashok34...@yahoo.com.INVALID
 wrote:

> Hi gurus,
>
> I have knowledge of Java, Scala and good enough knowledge of Spark, Spark
> SQL and Spark Functional programing with Scala.
>
> I have started using Python with Spark PySpark.
>
> Wondering, in order to be proficient in PySpark, how much good knowledge
> of Python programing is needed? I know the answer may be very good
> knowledge, but in practice how much is good enough. I can write Python in
> IDE like PyCharm similar to the way Scala works and can run the programs.
> Does expert knowledge of Python is prerequisite for PySpark? I also know
> Pandas and am also familiar with plotting routines like matplotlib.
>
> Warmest
>
> Ashok
>
-- 
Best Regards,
Ayan Guha


Spark Streaming with Files

2021-04-23 Thread ayan guha
Hi

In one of the spark summit demo, it is been alluded that we should think
batch jobs in streaming pattern, using "run once" in a schedule.
I find this idea very interesting and I understand how this can be achieved
for sources like kafka, kinesis or similar. in fact we have implemented
this model for cosmos changefeed.

My question is: can this model extend to file based sources? I understand
it can be for append only file  streams. The use case I have is: A CDC tool
like aws dms or shareplex or similar writing changes to a stream of files,
in date based folders. So it just goes on like T1, T2 etc folders. Also,
lets assume files are written every 10 mins, but I want to process them
every 4 hours.
Can I use streaming method so that it can manage checkpoints on its own?

Best - Ayan
-- 
Best Regards,
Ayan Guha


Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread ayan guha
n your writestream add the following line to be able to identify topic
>> name
>>
>> trigger(processingTime='30 seconds'). \
>> *queryName('md'). *\
>>
>> Next the controlling topic (called newtopic)  has the following
>>
>> foreachBatch(*sendToControl*). \
>> trigger(processingTime='2 seconds'). \
>> queryName('newtopic'). \
>>
>> That method sendToControl does what is needed
>>
>> def sendToControl(dfnewtopic, batchId):
>> if(len(dfnewtopic.take(1))) > 0:
>> #print(f"""newtopic batchId is {batchId}""")
>> #dfnewtopic.show(10,False)
>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>
>> if((queue == 'md')) & (status == 'false')):
>>   spark_session = s.spark_session(config['common']['appName'])
>>   active = spark_session.streams.active
>>   for e in active:
>>  #print(e)
>>  name = e.name
>>  if(name == 'md'):
>> print(f"""Terminating streaming process {name}""")
>> e.stop()
>> else:
>> print("DataFrame newtopic is empty")
>>
>> This seems to work as I checked it to ensure that in this case data was
>> written and saved to the target sink (BigQuery table). It will wait until
>> data is written completely meaning the current streaming message is
>> processed and there is a latency there (meaning waiting for graceful
>> completion)
>>
>> This is the output
>>
>> Terminating streaming process md
>> wrote to DB  ## this is the flag  I added to ensure the current
>> micro-bath was completed
>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
>> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>
>> The various termination processes are described in
>>
>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>> (apache.org)
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>
>> This is the idea I came up with which allows ending the streaming process
>> with least cost.
>>
>> HTH
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>> gourav.sengupta.develo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> just thought of reaching out once again and seeking out your kind help
>>> to find out what is the best way to stop SPARK streaming gracefully. Do we
>>> still use the methods of creating a file as in SPARK 2.4.x which is several
>>> years old method or do we have a better approach in SPARK 3.1?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> -- Forwarded message -
>>> From: Gourav Sengupta 
>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>> Subject: Graceful shutdown SPARK Structured Streaming
>>> To: 
>>>
>>>
>>> Dear friends,
>>>
>>> is there any documentation available for gracefully stopping SPARK
>>> Structured Streaming in 3.1.x?
>>>
>>> I am referring to articles which are 4 to 5 years old and was wondering
>>> whether there is a better way available today to gracefully shutdown a
>>> SPARK streaming job.
>>>
>>> Thanks a ton in advance for all your kind help.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>> --
Best Regards,
Ayan Guha


Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-07 Thread ayan guha
Just a consideration:

Is there a value in backup/restore metadata within spark? I would strongly
argue if the metadata is valuable enough and persistent enough, why dont
just use external metastore? It is fairly straightforward process. Also
regardless you are in cloud or not, database bkp is a routine and
established pattern in most organizations.
You can also enhance HA and DR by having replicas across zones and regions
etc etc

Thoughts?




On Sat, 8 May 2021 at 7:02 am, Tianchen Zhang 
wrote:

> For now we are thinking about adding two methods in Catalog API, not SQL
> commands:
> 1. spark.catalog.backup, which backs up the current catalog.
> 2. spark.catalog.restore(file), which reads the DFS file and recreates the
> entities described in that file.
>
> Can you please give an example of exposing client APIs to the end users in
> this approach? The users can only call backup or restore, right?
>
> Thanks,
> Tianchen
>
> On Fri, May 7, 2021 at 12:27 PM Wenchen Fan  wrote:
>
>> If a catalog implements backup/restore, it can easily expose some client
>> APIs to the end-users (e.g. REST API), I don't see a strong reason to
>> expose the APIs to Spark. Do you plan to add new SQL commands in Spark to
>> backup/restore a catalog?
>>
>> On Tue, May 4, 2021 at 2:39 AM Tianchen Zhang 
>> wrote:
>>
>>> Hi all,
>>>
>>> Currently the user-facing Catalog API doesn't support backup/restore
>>> metadata. Our customers are asking for such functionalities. Here is a
>>> usage example:
>>> 1. Read all metadata of one Spark cluster
>>> 2. Save them into a Parquet file on DFS
>>> 3. Read the Parquet file and restore all metadata in another Spark
>>> cluster
>>>
>>> From the current implementation, Catalog API has the list methods
>>> (listDatabases, listFunctions, etc.) but they don't return enough
>>> information in order to restore an entity (for example, listDatabases lose
>>> "properties" of the database and we need "describe database extended" to
>>> get them). And it only supports createTable (not any other entity
>>> creations). The only way we can backup/restore an entity is using Spark SQL.
>>>
>>> We want to introduce the backup and restore from an API level. We are
>>> thinking of doing this simply by adding backup() and restore() in
>>> CatalogImpl, as ExternalCatalog already includes all the methods we need to
>>> retrieve and recreate entities. We are wondering if there is any concern or
>>> drawback of this approach. Please advise.
>>>
>>> Thank you in advance,
>>> Tianchen
>>>
>> --
Best Regards,
Ayan Guha


Re: Merge two dataframes

2021-05-19 Thread ayan guha
Hi Kushagra

 I still think this is a bad idea. By definition data in a dataframe or rdd
is unordered, you are imposing an order where there is none, and if it
works it will be by chance. For example a simple repartition may disrupt
the row ordering. It is just too unpredictable.

I would suggest you fix upstream and add correct identifier to each of the
streams. It will for sure a much better solution.

On Wed, 19 May 2021 at 7:21 pm, Mich Talebzadeh 
wrote:

> That generation of row_number() has to be performed through a window call
> and I don't think there is any way around it without orderBy()
>
> df1 =
> df1.select(F.row_number().over(Window.partitionBy().orderBy(df1['amount_6m'])).alias("row_num"),"amount_6m")
>
> The problem is that without partitionBy() clause data will be skewed
> towards one executor.
>
> WARN window.WindowExec: No Partition Defined for Window operation! Moving
> all data to a single partition, this can cause serious performance
> degradation.
>
> Cheers
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 12 May 2021 at 17:33, Andrew Melo  wrote:
>
>> Hi,
>>
>> In the case where the left and right hand side share a common parent like:
>>
>> df = spark.read.someDataframe().withColumn('rownum', row_number())
>> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
>> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
>> df_joined = df1.join(df2, 'rownum', 'inner')
>>
>> (or maybe replacing row_number() with monotonically_increasing_id())
>>
>> Is there some hint/optimization that can be done to let Spark know
>> that the left and right hand-sides of the join share the same
>> ordering, and a sort/hash merge doesn't need to be done?
>>
>> Thanks
>> Andrew
>>
>> On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
>> >
>> > Yeah I don't think that's going to work - you aren't guaranteed to get
>> 1, 2, 3, etc. I think row_number() might be what you need to generate a
>> join ID.
>> >
>> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not.
>> You could .zip two RDDs you get from DataFrames and manually convert the
>> Rows back to a single Row and back to DataFrame.
>> >
>> >
>> > On Wed, May 12, 2021 at 10:47 AM kushagra deep <
>> kushagra94d...@gmail.com> wrote:
>> >>
>> >> Thanks Raghvendra
>> >>
>> >> Will the ids for corresponding columns  be same always ? Since
>> monotonic_increasing_id() returns a number based on partitionId and the row
>> number of the partition  ,will it be same for corresponding columns? Also
>> is it guaranteed that the two dataframes will be divided into logical spark
>> partitions with the same cardinality for each partition ?
>> >>
>> >> Reg,
>> >> Kushagra Deep
>> >>
>> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh <
>> raghavendr...@gmail.com> wrote:
>> >>>
>> >>> You can add an extra id column and perform an inner join.
>> >>>
>> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
>> >>>
>> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
>> >>>
>> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
>> >>>
>> >>> +-+-+
>> >>>
>> >>> |amount_6m|amount_9m|
>> >>>
>> >>> +-+-+
>> >>>
>> >>> |  100|  500|
>> >>>
>> >>> |  200|  600|
>> >>>
>> >>> |  300|  700|
>> >>>
>> >>> |  400|  800|
>> >>>
>> >>> |  500|  900|
>> >>>
>> >>> +-+-+
>> >>>
>> >>>
>> >>> --
>> >>> Raghavendra
>
>
>> >>>
>> >>>
>> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep <
>> kushagra94d...@gmail.com> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>>
>> >>>> I have two dataframes
>> >>>>
>> >>>> df1
>> >>>>
>> >>>> amount_6m
>> >>>>  100
>> >>>>  200
>> >>>>  300
>> >>>>  400
>> >>>>  500
>> >>>>
>> >>>> And a second data df2 below
>> >>>>
>> >>>>  amount_9m
>> >>>>   500
>> >>>>   600
>> >>>>   700
>> >>>>   800
>> >>>>   900
>> >>>>
>> >>>> The number of rows is same in both dataframes.
>> >>>>
>> >>>> Can I merge the two dataframes to achieve below df
>> >>>>
>> >>>> df3
>> >>>>
>> >>>> amount_6m | amount_9m
>> >>>> 100   500
>> >>>>  200  600
>> >>>>  300  700
>> >>>>  400  800
>> >>>>  500  900
>> >>>>
>> >>>> Thanks in advance
>> >>>>
>> >>>> Reg,
>> >>>> Kushagra Deep
>> >>>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Best Regards,
Ayan Guha


Re: PySpark Write File Container exited with a non-zero exit code 143

2021-05-19 Thread ayan guha
Hi -- Notice the additional "y" in red (as Mich mentioned)

pyspark --conf queue=default --conf executory-memory=24G

On Thu, May 20, 2021 at 12:02 PM Clay McDonald <
stuart.mcdon...@bateswhite.com> wrote:

> How so?
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, May 19, 2021 5:45 PM
> *To:* Clay McDonald 
> *Cc:* user@spark.apache.org
> *Subject:* Re: PySpark Write File Container exited with a non-zero exit
> code 143
>
>
>
> *  *** EXTERNAL EMAIL ***   *
>
>
>
>
>
> Hi Clay,
>
>
>
> Those parameters you are passing are not valid
>
>
>
> pyspark --conf queue=default --conf executory-memory=24G
>
>
>
> Python 3.7.3 (default, Apr  3 2021, 20:42:31)
>
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
>
> Type "help", "copyright", "credits" or "license" for more information.
>
> Warning: Ignoring non-Spark config property: executory-memory
>
> Warning: Ignoring non-Spark config property: queue
>
> 2021-05-19 22:28:20,521 WARN util.NativeCodeLoader: Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
>
> Setting default log level to "WARN".
>
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/__ / .__/\_,_/_/ /_/\_\   version 3.1.1
>
>   /_/
>
>
>
> Using Python version 3.7.3 (default, Apr  3 2021 20:42:31)
>
> Spark context Web UI available at http://rhes75:4040
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1621459701490).
>
> SparkSession available as 'spark'.
>
>
>
> Also
>
>
>
> pyspark dynamic_ARRAY_generator_parquet.py
>
>
>
> Running python applications through 'pyspark' is not supported as of Spark
> 2.0.
>
> Use ./bin/spark-submit 
>
>
>
>
>
> This works
>
>
>
> $SPARK_HOME/bin/spark-submit --master local[4]
> dynamic_ARRAY_generator_parquet.py
>
>
>
>
>
> See
>
>
>
>  https://spark.apache.org/docs/latest/submitting-applications.html
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 19 May 2021 at 20:10, Clay McDonald <
> stuart.mcdon...@bateswhite.com> wrote:
>
> Hello all,
>
>
>
> I’m hoping someone can give me some direction for troubleshooting this
> issue, I’m trying to write from Spark on an HortonWorks(Cloudera) HDP
> cluster. I ssh directly to the first datanode and run PySpark with the
> following command; however, it is always failing no matter what size I set
> memory in Yarn Containers and Yarn Queues. Any suggestions?
>
>
>
>
>
>
>
> pyspark --conf queue=default --conf executory-memory=24G
>
>
>
> --
>
>
>
> HDFS_RAW="/HDFS/Data/Test/Original/MyData_data/"
>
> #HDFS_OUT="/ HDFS/Data/Test/Processed/Convert_parquet/Output"
>
> HDFS_OUT="/tmp"
>
> ENCODING="utf-16"
>
>
>
> fileList1=[
>
> 'Test _2003.txt'
>
> ]
>
> from  pyspark.sql.functions import regexp_replace,col
>
> for f in fileList1:
>
> fname=f
>
> fname_noext=fname.split('.')[0]
>
> df =
> spark.read.option("delimiter","|").option("encoding",ENCODING).option("multiLine",True).option('wholeFile',"true").csv('{}/{}'.format(HDFS_RAW,fname),
> header=True)
>
> lastcol=df.columns[-1]
>
> print('showing {}'.format(fname))
>
> if ('\r' in lastcol):
>
> lastcol=lastcol.replace('\r','')
>
> df=df.withColumn(lastcol,
> regexp_replace(col("{}\r".format(lastcol)), "[\r]",
> "")).drop('{}\r'.format(lastcol))
>
>
> df.write.format('parquet').mode('overwrite').save("{}/{}".format(HDFS_OUT,fname_noext))
>
>
>
>
>
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task
> 0.3 in stage 1.0 (TID 4, DataNode01.mydomain.com, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container marked as failed:
> container_e331_1621375512548_0021_01_06 on host:
> DataNode01.mydomain.com. Exit status: 143. Diagnostics: [2021-05-19
> 18:09:06.392]Container killed on request. Exit code is 143
> [2021-05-19 18:09:06.413]Container exited with a non-zero exit code 143.
> [2021-05-19 18:09:06.414]Killed by external signal
>
>
>
>
>
> THANKS! CLAY
>
>
>
>

-- 
Best Regards,
Ayan Guha


Re: DF blank value fill

2021-05-21 Thread ayan guha
Hi

You can do something like this:

SELECT MainKey, Subkey,
  case when val1 is null then newval1 else val1 end val1,
  case when val2 is null then newval2 else val1 end val2,
  case when val3 is null then newval3 else val1 end val3
 from (select mainkey,subkey, val1,val2, val3,
 first_value() over (partitionby mainkey, subkey order
by val1 nulls last) newval1,
 first_value() over (partitionby mainkey, subkey order
by val2 nulls last) newval2,
 first_value() over (partitionby mainkey, subkey order
by val3 nulls last) newval3
from table) x

On Fri, May 21, 2021 at 9:29 PM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi all,
>
>
>
> My df looks like follows:
>
>
>
> Situation:
>
> MainKey, SubKey, Val1, Val2, Val3, …
>
> 1, 2, a, null, c
>
> 1, 2, null, null, c
>
> 1, 3, null, b, null
>
> 1, 3, a, null, c
>
>
>
>
>
> Desired outcome:
>
> 1, 2, a, b, c
>
> 1, 2, a, b, c
>
> 1, 3, a, b, c
>
> 1, 3, a, b, c
>
>
>
>
>
> How could I populate/synchronize empty cells of all records with the same
> combination of MainKey and SubKey with the respective value of other rows
> with the same key combination?
>
> A certain value, if not null, of a col is guaranteed to be unique within
> the df. If a col exists then there is at least one row with a not-null
> value.
>
>
>
> I am using pyspark.
>
>
>
> Thanks for any hint,
>
> Best
>
> Meikel
>


-- 
Best Regards,
Ayan Guha


Re: Spark-sql can replace Hive ?

2021-06-10 Thread ayan guha
Would you mind expanding the ask? Spark Sql can use hive by itaelf

On Thu, 10 Jun 2021 at 8:58 pm, Battula, Brahma Reddy
 wrote:

> Hi
>
>
>
> Would like know any refences/docs to replace hive with spark-sql
> completely like how migrate the existing data in hive.?
>
>
>
> thanks
>
>
>
>
>
-- 
Best Regards,
Ayan Guha


Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread ayan guha
t;
>>> spark.driver.extraClassPath
>>>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala  wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am using spark to ingest data from file to database Oracle table .
>>>> For one of the fields , the value to be populated is generated from a
>>>> function that is written in database .
>>>>
>>>> The input to the function is one of the fields of data frame
>>>>
>>>> I wanted to use spark.dbc.write to perform the operation, which
>>>> generates the insert query at back end .
>>>>
>>>> For example : It can generate the insert query as :
>>>>
>>>> Insert into table values (?,?, getfunctionvalue(?) )
>>>>
>>>> Please advise if it is possible in spark and if yes , how can it be
>>>> done
>>>>
>>>> This is little urgent for me . So any help is appreciated
>>>>
>>>> Thanks
>>>> Anshul
>>>>
>>> --
Best Regards,
Ayan Guha


Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
s dataframes
>>>>>>>>>
>>>>>>>>> newtopicResult = streamingNewtopic.select( \
>>>>>>>>>  col("newtopic_value.uuid").alias("uuid") \
>>>>>>>>>,
>>>>>>>>> col("newtopic_value.timeissued").alias("timeissued") \
>>>>>>>>>, col("newtopic_value.queue").alias("queue") \
>>>>>>>>>,
>>>>>>>>> col("newtopic_value.status").alias("status")). \
>>>>>>>>>  writeStream. \
>>>>>>>>>  outputMode('append'). \
>>>>>>>>>  option("truncate", "false"). \
>>>>>>>>>   *   foreachBatch(sendToControl). \*
>>>>>>>>>  trigger(processingTime='2 seconds'). \
>>>>>>>>>  queryName(config['MDVariables']['newtopic']).
>>>>>>>>> \
>>>>>>>>>  start()
>>>>>>>>>
>>>>>>>>> result = streamingDataFrame.select( \
>>>>>>>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>>>>>>>, col("parsed_value.ticker").alias("ticker") \
>>>>>>>>>,
>>>>>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>>>>>, col("parsed_value.price").alias("price")). \
>>>>>>>>>  writeStream. \
>>>>>>>>>  outputMode('append'). \
>>>>>>>>>  option("truncate", "false"). \
>>>>>>>>>  *foreachBatch(sendToSink). \*
>>>>>>>>>  trigger(processingTime='30 seconds'). \
>>>>>>>>>  option('checkpointLocation',
>>>>>>>>> checkpoint_path). \
>>>>>>>>>  queryName(config['MDVariables']['topic']). \
>>>>>>>>>  start()
>>>>>>>>> print(result)
>>>>>>>>>
>>>>>>>>> except Exception as e:
>>>>>>>>> print(f"""{e}, quitting""")
>>>>>>>>> sys.exit(1)
>>>>>>>>>
>>>>>>>>> Inside that function say *sendToSink *you can get the df and
>>>>>>>>> batchId
>>>>>>>>>
>>>>>>>>> def sendToSink(df, batchId):
>>>>>>>>> if(len(df.take(1))) > 0:
>>>>>>>>> print(f"""md batchId is {batchId}""")
>>>>>>>>> df.show(100,False)
>>>>>>>>> df. persist()
>>>>>>>>> # write to BigQuery batch table
>>>>>>>>> s.writeTableToBQ(df, "append",
>>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>>>>>>>> df.unpersist()
>>>>>>>>> print(f"""wrote to DB""")
>>>>>>>>> else:
>>>>>>>>> print("DataFrame md is empty")
>>>>>>>>>
>>>>>>>>> And you have created DF from the other topic newtopic
>>>>>>>>>
>>>>>>>>> def sendToControl(dfnewtopic, batchId):
>>>>>>>>> if(len(dfnewtopic.take(1))) > 0:
>>>>>>>>> ..
>>>>>>>>>
>>>>>>>>> Now you have  two dataframe* df* and *dfnewtopic* in the same
>>>>>>>>> session. Will you be able to join these two dataframes through common 
>>>>>>>>> key
>>>>>>>>> value?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello! Sure thing!
>>>>>>>>>>
>>>>>>>>>> I'm reading them *separately*, both are apps written with Scala
>>>>>>>>>> + Spark Structured Streaming.
>>>>>>>>>>
>>>>>>>>>> I feel like I missed some details on my original thread (sorry it
>>>>>>>>>> was past 4 AM) and it was getting frustrating
>>>>>>>>>> Please let me try to clarify some points:
>>>>>>>>>>
>>>>>>>>>> *Transactions Created Consumer*
>>>>>>>>>> ---
>>>>>>>>>> | Kafka trx-created-topic   |   <--- (Scala + SparkStructured
>>>>>>>>>> Streaming) ConsumerApp --->  Sinks to ---> Postgres DB Table
>>>>>>>>>> (Transactions)
>>>>>>>>>> ---
>>>>>>>>>>
>>>>>>>>>> *Transactions Processed Consumer*
>>>>>>>>>> -
>>>>>>>>>> | Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
>>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
>>>>>>>>>> -   2) Selects the
>>>>>>>>>> Ids
>>>>>>>>>> -
>>>>>>>>>> |   Postgres / Trx table |. <--- 3) Fetches the rows w/
>>>>>>>>>> the matching ids that have status 'created (let's call it "b")
>>>>>>>>>> - 4)  Performs an
>>>>>>>>>> intersection between "a" and "b" resulting in a 
>>>>>>>>>> "b_that_needs_sinking" (but
>>>>>>>>>> now there's some "b_leftovers" that were out of the intersection)
>>>>>>>>>>  5)  Sinks
>>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
>>>>>>>>>> unprocessed (not persisted)
>>>>>>>>>>  6) However,
>>>>>>>>>> those "b_leftovers" would, ultimately, be processed at some point 
>>>>>>>>>> (even if
>>>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are
>>>>>>>>>>  pushed
>>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then processed by 
>>>>>>>>>> that
>>>>>>>>>> first consumer
>>>>>>>>>>
>>>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess
>>>>>>>>>> those "b_leftovers" *without *having to restart the app
>>>>>>>>>> Does that make sense?
>>>>>>>>>>
>>>>>>>>>> PS: It doesn't necessarily have to be real streaming, if
>>>>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it 
>>>>>>>>>> would
>>>>>>>>>> technically work (although I keep hearing it's not advisable)
>>>>>>>>>>
>>>>>>>>>> Thank you so much!
>>>>>>>>>>
>>>>>>>>>> Kind regards
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you please clarify if you are reading these two topics
>>>>>>>>>>> separately or within the same scala or python script in Spark 
>>>>>>>>>>> Structured
>>>>>>>>>>> Streaming?
>>>>>>>>>>>
>>>>>>>>>>> HTH
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>view my Linkedin profile
>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>> other
>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>> content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira <
>>>>>>>>>>> bruno.ar...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello guys,
>>>>>>>>>>>>
>>>>>>>>>>>> I've been struggling with this for some days now, without
>>>>>>>>>>>> success, so I would highly appreciate any enlightenment. The 
>>>>>>>>>>>> simplified
>>>>>>>>>>>> scenario is the following:
>>>>>>>>>>>>
>>>>>>>>>>>>- I've got 2 topics in Kafka (it's already like that in
>>>>>>>>>>>>production, can't change it)
>>>>>>>>>>>>   - transactions-created,
>>>>>>>>>>>>   - transaction-processed
>>>>>>>>>>>>- Even though the schema is not exactly the same, they all
>>>>>>>>>>>>share a correlation_id, which is their "transaction_id"
>>>>>>>>>>>>
>>>>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic,
>>>>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them 
>>>>>>>>>>>> w/ Spark
>>>>>>>>>>>> Structured Streaming, btw
>>>>>>>>>>>>
>>>>>>>>>>>> So far so good, the caveat here is:
>>>>>>>>>>>>
>>>>>>>>>>>> - I cannot write a given "*processed" *transaction unless
>>>>>>>>>>>> there is an entry of that same transaction with the status "
>>>>>>>>>>>> *created*".
>>>>>>>>>>>>
>>>>>>>>>>>> - There is *no* guarantee that any transactions in the topic
>>>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id)
>>>>>>>>>>>> in the "transaction-*created*" at the moment the messages are
>>>>>>>>>>>> fetched.
>>>>>>>>>>>>
>>>>>>>>>>>> So the workflow so far is:
>>>>>>>>>>>> - Msgs from the "transaction-created" just get synced to
>>>>>>>>>>>> postgres, no questions asked
>>>>>>>>>>>>
>>>>>>>>>>>> - As for the "transaction-processed", it goes as follows:
>>>>>>>>>>>>
>>>>>>>>>>>>- a) Messages are fetched from the Kafka topic
>>>>>>>>>>>>- b) Select the transaction_id of those...
>>>>>>>>>>>>- c) Fetch all the rows w/ the corresponding id from a
>>>>>>>>>>>>Postgres table AND that have the status "CREATED"
>>>>>>>>>>>>- d) Then, a pretty much do a intersection between the two
>>>>>>>>>>>>datasets, and sink only on "processed" ones that have with step 
>>>>>>>>>>>> c
>>>>>>>>>>>>- e) Persist the resulting dataset
>>>>>>>>>>>>
>>>>>>>>>>>> But the rows (from the 'processed') that were not part of the
>>>>>>>>>>>> intersection get lost afterwards...
>>>>>>>>>>>>
>>>>>>>>>>>> So my question is:
>>>>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT
>>>>>>>>>>>> restarting the app?
>>>>>>>>>>>> - For this scenario, should I fall back to Spark Streaming,
>>>>>>>>>>>> instead of Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and
>>>>>>>>>>>> managed to commit only the ones in the microbatches that were fully
>>>>>>>>>>>> successful (still failed to find a way to "poll" for the 
>>>>>>>>>>>> uncommitted ones
>>>>>>>>>>>> without restarting, though).
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much in advance!
>>>>>>>>>>>>
>>>>>>>>>>>> --
Best Regards,
Ayan Guha


Re: Naming files while saving a Dataframe

2021-07-16 Thread ayan guha
IMHO - this is a bad idea esp in failure scenarios.

How about creating a subfolder each for the jobs?

On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes 
wrote:

> We've two (or more) jobs that write data into the same directory via a
> Dataframe.save method. We need to be able to figure out which job wrote
> which file. Maybe provide a 'prefix' to the file names. I was wondering if
> there's any 'option' that allows us to do this. Googling didn't come up
> with any solution so thought of asking the Spark experts on this mailing
> list.
>
> Thanks in advance.
>
-- 
Best Regards,
Ayan Guha


Re: Naming files while saving a Dataframe

2021-07-17 Thread ayan guha
Hi Eric - yes that maybe the best way to resolve this. I have not seen any
specific way to define names of the actual files written by spark. Finally,
make sure you optimize number of files written.

On Sun, Jul 18, 2021 at 2:39 AM Eric Beabes 
wrote:

> Reason we've two jobs writing to the same directory is that the data is
> partitioned by 'day' (mmdd) but the job runs hourly. Maybe the only way
> to do this is to create an hourly partition (/mmdd/hh). Is that the
> only way to solve this?
>
> On Fri, Jul 16, 2021 at 5:45 PM ayan guha  wrote:
>
>> IMHO - this is a bad idea esp in failure scenarios.
>>
>> How about creating a subfolder each for the jobs?
>>
>> On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes 
>> wrote:
>>
>>> We've two (or more) jobs that write data into the same directory via a
>>> Dataframe.save method. We need to be able to figure out which job wrote
>>> which file. Maybe provide a 'prefix' to the file names. I was wondering if
>>> there's any 'option' that allows us to do this. Googling didn't come up
>>> with any solution so thought of asking the Spark experts on this mailing
>>> list.
>>>
>>> Thanks in advance.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>

-- 
Best Regards,
Ayan Guha


Re: What happens when a partition that holds data under a task fails

2022-01-23 Thread ayan guha
> Okay, so suppose I have 10 records distributed across 5 nodes and the
>>>>> partition of the first node holding 2 records failed. I understand that it
>>>>> will re-process this partition but how will it come to know that XYZ
>>>>> partition was holding XYZ data so that it will pick again only those
>>>>> records and reprocess it? In case of failure of a partition, is there a
>>>>> data loss? or is it stored somewhere?
>>>>>
>>>>> Maybe my question is very naive but I am trying to understand it in a
>>>>> better way.
>>>>>
>>>>> On Fri, Jan 21, 2022 at 11:32 PM Sean Owen  wrote:
>>>>>
>>>>>> In that case, the file exists in parts across machines. No, tasks
>>>>>> won't re-read the whole file; no task does or can do that. Failed
>>>>>> partitions are reprocessed, but as in the first pass, the same partition 
>>>>>> is
>>>>>> processed.
>>>>>>
>>>>>> On Fri, Jan 21, 2022 at 12:00 PM Siddhesh Kalgaonkar <
>>>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello team,
>>>>>>>
>>>>>>> I am aware that in case of memory issues when a task fails, it will
>>>>>>> try to restart 4 times since it is a default number and if it still 
>>>>>>> fails
>>>>>>> then it will cause the entire job to fail.
>>>>>>>
>>>>>>> But suppose if I am reading a file that is distributed across nodes
>>>>>>> in partitions. So, what will happen if a partition fails that holds some
>>>>>>> data? Will it re-read the entire file and get that specific subset of 
>>>>>>> data
>>>>>>> since the driver has the complete information? or will it copy the data 
>>>>>>> to
>>>>>>> the other working nodes or tasks and try to run it?
>>>>>>>
>>>>>>

-- 
Best Regards,
Ayan Guha


Re: How to delete the record

2022-01-27 Thread ayan guha
; Hi Sean,
>>>>>
>>>>> So you mean if I use those file formats it will do the work of CDC
>>>>> automatically or I would have to handle it via code ?
>>>>>
>>>>> Hi Mich,
>>>>>
>>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>>> ingested it. Now suppose if this user id is deleted from the source 
>>>>> itself.
>>>>> Then how can I delete it in my transformed db
>>>>> ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen,  wrote:
>>>>>
>>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>>
>>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Where ETL data is stored?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *But now the main problem is when the record at the source is
>>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>>
>>>>>>>
>>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>>> everyday. Everything works fine.
>>>>>>>>
>>>>>>>> But now the main problem is when the record at the source is
>>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>>
>>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Sid
>>>>>>>>
>>>>>>>> --
Best Regards,
Ayan Guha


Re: add an auto_increment column

2022-02-06 Thread ayan guha
Try this:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html



On Mon, 7 Feb 2022 at 12:27 pm,  wrote:

> For a dataframe object, how to add a column who is auto_increment like
> mysql's behavior?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: add an auto_increment column

2022-02-07 Thread ayan guha
For this req you can rank or dense rank.

On Tue, 8 Feb 2022 at 1:12 pm,  wrote:

> Hello,
>
> For this query:
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |tomato| 9|
> | apple| 6|
> |cherry| 5|
> |orange| 3|
> +--+--+
>
>
> I want to add a column "top", in which the value is: 1,2,3... meaning
> top1, top2, top3...
>
> How can I do it?
>
> Thanks.
>
>
>
>
> On 07/02/2022 21:18, Gourav Sengupta wrote:
> > Hi,
> >
> > can we understand the requirement first?
> >
> > What is that you are trying to achieve by auto increment id? Do you
> > just want different ID's for rows, or you may want to keep track of
> > the record count of a table as well, or do you want to do use them for
> > surrogate keys?
> >
> > If you are going to insert records multiple times in a table, and
> > still have different values?
> >
> > I think without knowing the requirements all the above responses, like
> > everything else where solutions are reached before understanding the
> > problem, has high chances of being wrong.
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj 
> > wrote:
> >
> >> Monotonically_increasing_id() will give the same functionality
> >>
> >> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
> >>
> >>> For a dataframe object, how to add a column who is auto_increment
> >>> like
> >>> mysql's behavior?
> >>>
> >>> Thank you.
> >>>
> >>>
> >>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Does spark support something like the bind function in R?

2022-02-08 Thread ayan guha
Hi

In python, or in general in spark, you can just "read" the files and select
the column. I am assuming you are reading each file individually in
separate dataframes and joining them. Instead, you can read all the files
in single dataframe and select 1 column.

On Wed, Feb 9, 2022 at 2:55 AM Andrew Davidson 
wrote:

> I need to create a single table by selecting one column from thousands of
> files. The columns are all of the same type, have the same number of rows
> and rows names. I am currently using join. I get OOM on mega-mem cluster
> with 2.8 TB.
>
>
>
> Does spark have something like cbind() “Take a sequence of vector, matrix
> or data-frame arguments and combine by *c*olumns or *r*ows,
> respectively. “
>
>
>
> https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/cbind
>
>
>
> Digging through the spark documentation I found a udf example
>
> https://spark.apache.org/docs/latest/sparkr.html#dapply
>
>
>
> ```
>
> *# Convert waiting time from hours to seconds.*
>
> *# Note that we can apply UDF to DataFrame.*
>
> schema <- structType(structField("eruptions", "double"), structField(
> "waiting", "double"),
>
>  structField("waiting_secs", "double"))
>
> df1 <- dapply(df, *function*(x) { x <- cbind(x, x$waiting * 60) }, schema)
>
> head(collect(df1))
>
> *##  eruptions waiting waiting_secs*
>
> *##1 3.600  79 4740*
>
> *##2 1.800  54 3240*
>
> *##3 3.333  74 4440*
>
> *##4 2.283  62 3720*
>
> *##5 4.533  85 5100*
>
> *##6 2.883  55 3300*
>
> ```
>
>
>
> I wonder if this is just a wrapper around join? If so it is probably not
> going to help me out.
>
>
>
> Also I would prefer to work in python
>
>
>
> Any thoughts?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Cast int to string not possible?

2022-02-17 Thread ayan guha
Can you try to cast any other Int field which is NOT a partition column?

On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
wrote:

> Hi,
>
> This appears interesting, casting INT to STRING has never been an issue
> for me.
>
> Can you just help us with the output of : df.printSchema()  ?
>
> I prefer to use SQL, and the method I use for casting is: CAST(< name>> AS STRING) <>.
>
> Regards,
> Gourav
>
>
>
>
>
>
> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
> wrote:
>
>> Here is the code snippet:
>>
>> var df = session.read().parquet(basepath);
>> for(Column partition : partitionColumnsList){
>>   df = df.withColumn(partition.getName(),
>> df.col(partition.getName()).cast(partition.getType()));
>> }
>>
>> Column is a class containing Schema Information, like for example the
>> name of the column and the data type of the column.
>>
>> Best, Rico.
>>
>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>> >
>> > Hi Rico, you have any code snippet? I have no problem casting int to
>> string.
>> >
>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>> >>
>> >> Hi!
>> >>
>> >> I am reading a partitioned dataFrame into spark using automatic type
>> inference for the partition columns. For one partition column the data
>> contains an integer, therefor Spark uses IntegerType for this column. In
>> general this is supposed to be a StringType column. So I tried to cast this
>> column to StringType. But this fails with AnalysisException “cannot cast
>> int to string”.
>> >>
>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>> >>
>> >> I’m using Spark 3.1.1
>> >>
>> >> Best regards
>> >>
>> >> Rico.
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Best Regards,
Ayan Guha


Re: Question about spark.sql min_by

2022-02-21 Thread ayan guha
Why this can not be done by window function? Or is min by is just a short
hand?

On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:

> From the source code, looks like this function was added to pyspark in
> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
> SQL with `spark.sql(...)` in Python though, not hard.
>
> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
> wrote:
>
>> Hello all,
>>
>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>
>> I have a dataframe made of these columns:
>> - productId : int
>> - sellerId : int
>> - price : double
>>
>> For each product, I want to get the seller who sells the product for the
>> cheapest price.
>>
>> Naive approach would be to do this, but I would expect two shuffles:
>>
>> import spark.sql.functions as F
>> cheapest_prices_df  =
>> df.groupby('productId').agg(F.min('price').alias('price'))
>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>> 'price'])
>>
>> I would had loved to do this instead :
>>
>> import spark.sql.functions as F
>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>> F.min_by('sellerId', 'price'))
>>
>> Unfortunately min_by does not seem available in pyspark sql functions,
>> whereas I can see it in the doc :
>> https://spark.apache.org/docs/latest/api/sql/index.html
>>
>> I have managed to use min_by with this approach but it looks slow (maybe
>> because of temp table creation ?):
>>
>> df.createOrReplaceTempView("table")
>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
>> min(price) from table group by productId")
>>
>> Is there a way I can rely on min_by directly in groupby ?
>> Is there some code missing in pyspark wrapper to make min_by visible
>> somehow ?
>>
>> Thank you in advance for your help.
>>
>> Cheers
>> David
>>
> --
Best Regards,
Ayan Guha


Re: Decompress Gzip files from EventHub with Structured Streaming

2022-03-08 Thread ayan guha
Hi

IMHO this is not the best use of spark. I would suggest to use simple azure
function to unzip.

Is there any specific reason to use gzip over event hub?

If you can wait 10-20 sec to process, you can use eventhub capture to write
data to storage and  then process it.

It all depends on compute you are willing to pay, every 3 sec of scheduled
job should not give you any benefit over streaming.

Best
Ayan

On Wed, 9 Mar 2022 at 5:42 am, Data Guy  wrote:

> Hi everyone,
>
> **
>
> Context: I have events coming into Databricks from an Azure Event Hub in a
> Gzip compressed format. Currently, I extract the files with a UDF and send
> the unzipped data into the silver layer in my Delta Lake with .write. Note
> that even though data comes in continuously I do not use .writeStream as of
> now.
>
> I have a few design-related questions that I hope someone with experience
> could help me with!
>
>1. Is there a better way to extract Gzip files than a UDF?
>2. Is Spark Structured Streaming or Batch with Databricks Jobs better?
>(Pipeline runs every 3 hours once, but the data is continuously coming from
>Event Hub)
>3. Should I use Autoloader or just simply stream data into Databricks
>using Event Hubs?
>
> I am especially curious about the trade-offs and the best way forward. I
> don't have massive amounts of data.
>
> Thank you very much in advance!
>
> Best wishes,
> Maurizio Vancho Argall
>
> --
Best Regards,
Ayan Guha


Re: pivoting panda dataframe

2022-03-16 Thread ayan guha
 = pf01.to_spark()
>>>>>
>>>>>
>>>>> Note that I have changed pd to ps here.
>>>>>
>>>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>>>>>
>>>>> df.transform(lambda x: x + 1)
>>>>>
>>>>> You will now see that all numbers are +1
>>>>>
>>>>> You can find more information about pandas API on spark transform
>>>>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>>>>> or in yours notbook
>>>>> df.transform?
>>>>>
>>>>> Signature:
>>>>> df.transform(
>>>>> func: Callable[..., ForwardRef('Series')],
>>>>> axis: Union[int, str] = 0,
>>>>> *args: Any,
>>>>> **kwargs: Any,) -> 'DataFrame'Docstring:
>>>>> Call ``func`` on self producing a Series with transformed values
>>>>> and that has the same length as its input.
>>>>>
>>>>> See also `Transform and apply a function
>>>>> <https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
>>>>>
>>>>> .. note:: this API executes the function once to infer the type which is
>>>>>  potentially expensive, for instance, when the dataset is created 
>>>>> after
>>>>>  aggregations or sorting.
>>>>>
>>>>>  To avoid this, specify return type in ``func``, for instance, as 
>>>>> below:
>>>>>
>>>>>  >>> def square(x) -> ps.Series[np.int32]:
>>>>>  ... return x ** 2
>>>>>
>>>>>  pandas-on-Spark uses return type hint and does not try to infer the 
>>>>> type.
>>>>>
>>>>> .. note:: the series within ``func`` is actually multiple pandas series 
>>>>> as the
>>>>> segments of the whole pandas-on-Spark series; therefore, the length 
>>>>> of each series
>>>>> is not guaranteed. As an example, an aggregation against each series
>>>>> does work as a global aggregation but an aggregation of each segment. 
>>>>> See
>>>>> below:
>>>>>
>>>>> >>> def func(x) -> ps.Series[np.int32]:
>>>>> ... return x + sum(x)
>>>>>
>>>>> Parameters
>>>>> --
>>>>> func : function
>>>>> Function to use for transforming the data. It must work when pandas 
>>>>> Series
>>>>> is passed.
>>>>> axis : int, default 0 or 'index'
>>>>> Can only be set to 0 at the moment.
>>>>> *args
>>>>> Positional arguments to pass to func.
>>>>> **kwargs
>>>>> Keyword arguments to pass to func.
>>>>>
>>>>> Returns
>>>>> ---
>>>>> DataFrame
>>>>> A DataFrame that must have the same length as self.
>>>>>
>>>>> Raises
>>>>> --
>>>>> Exception : If the returned DataFrame has a different length than self.
>>>>>
>>>>> See Also
>>>>> 
>>>>> DataFrame.aggregate : Only perform aggregating type operations.
>>>>> DataFrame.apply : Invoke function on DataFrame.
>>>>> Series.transform : The equivalent function for Series.
>>>>>
>>>>> Examples
>>>>> 
>>>>> >>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 
>>>>> >>> 'B'])
>>>>> >>> df
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  2
>>>>> 2  2  3
>>>>>
>>>>> >>> def square(x) -> ps.Series[np.int32]:
>>>>> ... return x ** 2
>>>>> >>> df.transform(square)
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> You can omit the type hint and let pandas-on-Spark infer its type.
>>>>>
>>>>> >>> df.transform(lambda x: x ** 2)
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> For multi-index columns:
>>>>>
>>>>> >>> df.columns = [('X', 'A'), ('X', 'B')]
>>>>> >>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE
>>>>>X
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  4
>>>>> 2  4  9
>>>>>
>>>>> >>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE
>>>>>X
>>>>>A  B
>>>>> 0  0  1
>>>>> 1  1  2
>>>>> 2  2  3
>>>>>
>>>>> You can also specify extra arguments.
>>>>>
>>>>> >>> def calculation(x, y, z) -> ps.Series[int]:
>>>>> ... return x ** y + z
>>>>> >>> df.transform(calculation, y=10, z=20)  # doctest: 
>>>>> >>> +NORMALIZE_WHITESPACE
>>>>>   X
>>>>>   A  B
>>>>> 020 21
>>>>> 121   1044
>>>>> 2  1044  59069File:  /opt/spark/python/pyspark/pandas/frame.pyType:   
>>>>>method
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson >>>> >:
>>>>>
>>>>>> Hi Bjorn
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have been looking for spark transform for a while. Can you send me
>>>>>> a link to the pyspark function?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I assume pandas transform is not really an option. I think it will
>>>>>> try to pull the entire dataframe into the drivers memory.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kind regards
>>>>>>
>>>>>>
>>>>>>
>>>>>> Andy
>>>>>>
>>>>>>
>>>>>>
>>>>>> p.s. My real problem is that spark does not allow you to bind
>>>>>> columns. You can use union() to bind rows. I could get the equivalent of
>>>>>> cbind() using union().transform()
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Bjørn Jørgensen 
>>>>>> *Date: *Tuesday, March 15, 2022 at 10:37 AM
>>>>>> *To: *Mich Talebzadeh 
>>>>>> *Cc: *"user @spark" 
>>>>>> *Subject: *Re: pivoting panda dataframe
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html
>>>>>>  we
>>>>>> have that transpose in pandas api for spark to.
>>>>>>
>>>>>>
>>>>>>
>>>>>> You also have stack() and multilevel
>>>>>> https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com>:
>>>>>>
>>>>>>
>>>>>> hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is it possible to pivot a panda dataframe by making the row column
>>>>>> heading?
>>>>>>
>>>>>>
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  [image: Image removed by sender.]  view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Bjørn Jørgensen
>>>>>> Vestre Aspehaug 4
>>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>,
>>>>>> 6010 Ålesund
>>>>>> Norge
>>>>>>
>>>>>> +47 480 94 297
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Bjørn Jørgensen
>>>>> Vestre Aspehaug 4
>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>,
>>>>> 6010 Ålesund
>>>>> Norge
>>>>>
>>>>> +47 480 94 297
>>>>>
>>>> --
Best Regards,
Ayan Guha


Re: how to change data type for columns of dataframe

2022-04-01 Thread ayan guha
Please use cast. Also I would strongly recommend to go through spark doco,
its pretty good.

On Sat, 2 Apr 2022 at 12:43 pm,  wrote:

> Hi
>
> I got a dataframe object from other application, it means this obj is
> not generated by me.
> How can I change the data types for some columns in this dataframe?
>
> For example, change the column type from Int to Float.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Question about bucketing and custom partitioners

2022-04-11 Thread ayan guha
IMHO you should ask this to dev email for better response and suggestions

On Tue, 12 Apr 2022 at 1:47 am, David Diebold 
wrote:

> Hello,
>
> I have a few questions related to bucketing and custom partitioning in
> dataframe api.
>
> I am considering bucketing to perform one-side free shuffle join in
> incremental jobs, but there is one thing that I'm not happy with.
> Data is likely to grow/skew over time. At some point, i would need to
> change amount of buckets which would provoke shuffle.
>
> Instead of this, I would like to use a custom partitioner, that would
> replace shuffle by narrow transformation.
> That is something that was feasible with RDD developer api. For example, I
> could use such partitioning scheme:
> partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
> (Int.maxValue - Int.minValue)
> When I multiply amount of partitions by 2 each new partition depends only
> on one partition from parent (=> narrow transformation)
>
> So, here are my questions :
>
> 1/ Is it possible to use custom partitioner when saving a dataframe with
> bucketing ?
> 2/ Still with the API dataframe, is it possible to apply custom
> partitioner to a dataframe ?
> Is it possible to repartition the dataframe with a narrow
> transformation like what could be done with RDD ?
> Is there some sort of dataframe developer API ? Do you have any
> pointers on this ?
>
> Thanks !
>
> David
>
-- 
Best Regards,
Ayan Guha


Re: reading each JSON file from dataframe...

2022-07-12 Thread ayan guha
path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+--+---+
>>
>> Cheers,
>> Enrico
>>
>>
>>
>>
>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>>
>> Hello there,
>>
>> I have a dataframe with the following...
>>
>>
>> +-+---+---+
>> |entity_id|file_path
>>  |other_useful_id|
>>
>> +-+---+---+
>>
>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>
>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>
>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>
>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+---+---+
>>
>> I would like to read each row from `file_path` and write the result to
>> another dataframe containing `entity_id`, `other_useful_id`,
>> `json_content`, `file_path`.
>> Assume that I already have the required HDFS url libraries in my
>> classpath.
>>
>> Please advice,
>> Muthu
>>
>>
>>
>>
>

-- 
Best Regards,
Ayan Guha


Re: very simple UI on webpage to display x/y plots+histogram of data stored in hive

2022-07-18 Thread ayan guha
If possible, start with a Jupyter or databricks notebook

On Tue, 19 Jul 2022 at 7:35 am, Joris Billen 
wrote:

> Thank you - looks like it COULD do it.
> Have to try if I can have a simple UI, user selects one out of 100
> options, and receives the correct x/y plot and correct histogram of data
> stored in hive and retrieved with spark into pandas…
>
> Many thanks for your suggestion!
>
>
> On 18 Jul 2022, at 15:08, Sean Owen  wrote:
>
> You pull your data via Spark to a pandas DF and do whatever you want
>
>
> --
Best Regards,
Ayan Guha


Re: Salting technique doubt

2022-07-31 Thread ayan guha
One option is create a separate column in table A with salting. Use it as
partition key. Use original column for joining.

Ayan

On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn  wrote:

> The key is this line from Amit's email (emphasis added):
>
> > Change the join_col to *all possible values* of the sale.
>
> The two tables are treated asymmetrically:
>
> 1. The skewed table gets random salts appended to the join key.
> 2. The other table gets all possible salts appended to the join key (e.g.
> using a range array literal + explode).
>
> Thus guarantees that every row in the skewed table will match a row in the
> other table. This StackOverflow answer
> <https://stackoverflow.com/a/57951114/1892435> gives an example.
>
> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi  >:
>
>> Hi Sid,
>>
>> I am not sure I understood your question.
>> But the keys cannot be different post salting in both the tables, this is
>> what i have shown in the explanation.
>> You salt Table A and then explode Table B to create all possible values.
>>
>> In your case, I do not understand, what Table B has x_8/9. It should be
>> all possible values which you used to create salt.
>>
>> I hope you understand.
>>
>> Thanks
>>
>>
>>
>> On Sun, Jul 31, 2022 at 10:02 AM Sid  wrote:
>>
>>> Hi Amit,
>>>
>>> Thanks for your reply. However, your answer doesn't seem different from
>>> what I have explained.
>>>
>>> My question is after salting if the keys are different like in my
>>> example then post join there would be no results assuming the join type as
>>> inner join because even though the keys are segregated in different
>>> partitions based on unique keys they are not matching because x_1/x_2
>>> !=x_8/x_9
>>>
>>> How do you ensure that the results are matched?
>>>
>>> Best,
>>> Sid
>>>
>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi 
>>> wrote:
>>>
>>>> Hi Sid,
>>>>
>>>> Salting is normally a technique to add random characters to existing
>>>> values.
>>>> In big data we can use salting to deal with the skewness.
>>>> Salting in join cas be used as :
>>>> * Table A-*
>>>> Col1, join_col , where join_col values are {x1, x2, x3}
>>>> x1
>>>> x1
>>>> x1
>>>> x2
>>>> x2
>>>> x3
>>>>
>>>> *Table B-*
>>>> join_col, Col3 , where join_col  value are {x1, x2}
>>>> x1
>>>> x2
>>>>
>>>> *Problem: *Let say for table A, data is skewed on x1
>>>> Now salting goes like this.  *Salt value =2*
>>>> For
>>>> *table A, *create a new col with values by salting join col
>>>> *New_Join_Col*
>>>> x1_1
>>>> x1_2
>>>> x1_1
>>>> x2_1
>>>> x2_2
>>>> x3_1
>>>>
>>>> For *Table B,*
>>>> Change the join_col to all possible values of the sale.
>>>> join_col
>>>> x1_1
>>>> x1_2
>>>> x2_1
>>>> x2_2
>>>>
>>>> And then join it like
>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>>>>
>>>> Let me know if you have any questions.
>>>>
>>>> Regards
>>>> Amit Joshi
>>>>
>>>>
>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid  wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I was trying to understand the Salting technique for the column where
>>>>> there would be a huge load on a single partition because of the same keys.
>>>>>
>>>>> I referred to one youtube video with the below understanding:
>>>>>
>>>>> So, using the salting technique we can actually change the joining
>>>>> column values by appending some random number in a specified range.
>>>>>
>>>>> So, suppose I have these two values in a partition of two different
>>>>> tables:
>>>>>
>>>>> Table A:
>>>>> Partition1:
>>>>> x
>>>>> .
>>>>> .
>>>>> .
>>>>> x
>>>>>
>>>>> Table B:
>>>>> Partition1:
>>>>> x
>>>>> .
>>>>> .
>>>>> .
>>>>> x
>>>>>
>>>>> After Salting it would be something like the below:
>>>>>
>>>>> Table A:
>>>>> Partition1:
>>>>> x_1
>>>>>
>>>>> Partition 2:
>>>>> x_2
>>>>>
>>>>> Table B:
>>>>> Partition1:
>>>>> x_3
>>>>>
>>>>> Partition 2:
>>>>> x_8
>>>>>
>>>>> Now, when I inner join these two tables after salting in order to
>>>>> avoid data skewness problems, I won't get a match since the keys are
>>>>> different after applying salting techniques.
>>>>>
>>>>> So how does this resolves the data skewness issue or if there is some
>>>>> understanding gap?
>>>>>
>>>>> Could anyone help me in layman's terms?
>>>>>
>>>>> TIA,
>>>>> Sid
>>>>>
>>>> --
Best Regards,
Ayan Guha


Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-02 Thread ayan guha
; 'TERMINATED', 'THEN', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 
> 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 
> 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 
> 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 
> 'WINDOW', 'WITH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 23)
>
> == SQL ==
> CREATE OR REPLACE TABLE
>
>
> On Mon, Aug 1, 2022 at 8:32 PM Sean Owen  wrote:
>
>> Pretty much what it says? you are creating a table over a path that
>> already has data in it. You can't do that without mode=overwrite at least,
>> if that's what you intend.
>>
>> On Mon, Aug 1, 2022 at 7:29 PM Kumba Janga  wrote:
>>
>>>
>>>
>>>- Component: Spark Delta, Spark SQL
>>>- Level: Beginner
>>>- Scenario: Debug, How-to
>>>
>>> *Python in Jupyter:*
>>>
>>> import pyspark
>>> import pyspark.sql.functions
>>>
>>> from pyspark.sql import SparkSession
>>> spark = (
>>> SparkSession
>>> .builder
>>> .appName("programming")
>>> .master("local")
>>> .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
>>> .config("spark.sql.extensions", 
>>> "io.delta.sql.DeltaSparkSessionExtension")
>>> .config("spark.sql.catalog.spark_catalog", 
>>> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
>>> .config('spark.ui.port', '4050')
>>> .getOrCreate()
>>>
>>> )
>>> from delta import *
>>>
>>> string_20210609 = '''worked_date,worker_id,delete_flag,hours_worked
>>> 2021-06-09,1001,Y,7
>>> 2021-06-09,1002,Y,3.75
>>> 2021-06-09,1003,Y,7.5
>>> 2021-06-09,1004,Y,6.25'''
>>>
>>> rdd_20210609 = spark.sparkContext.parallelize(string_20210609.split('\n'))
>>>
>>> # FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO 
>>> BROWSE THEM
>>> OUTPUT_DELTA_PATH = './output/delta/'
>>>
>>> spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')
>>>
>>> spark.sql('''
>>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
>>> worked_date date
>>> , worker_id int
>>> , delete_flag string
>>> , hours_worked double
>>> ) USING DELTA
>>>
>>>
>>> PARTITIONED BY (worked_date)
>>> LOCATION "{0}"
>>> '''.format(OUTPUT_DELTA_PATH)
>>> )
>>>
>>> *Error Message:*
>>>
>>> AnalysisException Traceback (most recent call 
>>> last) in   4 spark.sql('CREATE 
>>> DATABASE IF NOT EXISTS EXERCISE')  5 > 6 spark.sql('''  7 
>>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(  8 
>>> worked_date date
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\session.py in 
>>> sql(self, sqlQuery)647 [Row(f1=1, f2=u'row1'), Row(f1=2, 
>>> f2=u'row2'), Row(f1=3, f2=u'row3')]648 """--> 649 
>>> return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)650   
>>>   651 @since(2.0)
>>> \Users\kyjan\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py
>>>  in __call__(self, *args)   13021303 answer = 
>>> self.gateway_client.send_command(command)-> 1304 return_value = 
>>> get_return_value(   1305 answer, self.gateway_client, 
>>> self.target_id, self.name)   1306
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
>>> deco(*a, **kw)132 # Hide where the exception came from 
>>> that shows a non-Pythonic133 # JVM exception 
>>> message.--> 134 raise_from(converted)135 
>>> else:136 raise
>>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
>>> raise_from(e)
>>> AnalysisException: Cannot create table ('`EXERCISE`.`WORKED_HOURS`'). The 
>>> associated location ('output/delta') is not empty.;
>>>
>>>
>>> --
>>> Best Wishes,
>>> Kumba Janga
>>>
>>> "The only way of finding the limits of the possible is by going beyond
>>> them into the impossible"
>>> -Arthur C. Clarke
>>>
>>
>
> --
> Best Wishes,
> Kumba Janga
>
> "The only way of finding the limits of the possible is by going beyond
> them into the impossible"
> -Arthur C. Clarke
>
-- 
Best Regards,
Ayan Guha


Re: log transfering into hadoop/spark

2022-08-02 Thread ayan guha
ELK or Splunk agents typically.

Or if you are in cloud then there are cloud native solutions which can
forward logs to object store, which can then be read like hdfs.

On Tue, 2 Aug 2022 at 4:43 pm, pengyh  wrote:

> since flume is not continued to develop.
> what's the current opensource tool to transfer webserver logs into
> hdfs/spark?
>
> thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread ayan guha
 🙂
>
> Maybe I'll be more specific:
> Suppose I start the application with maxExecutors=500, executors.cores=2,
> because that's the amount of resources needed for the ETL part. But for the
> DL part I only need 20 GPUs. SLS API only allows to set the resources per
> executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I
> configure the profile with 1 GPU per executor.
> So, the question is how do I limit the stage resources to 20 GPUs total?
>
> Thanks again,
> Shay
>
> --
> *From:* Artemis User 
> *Sent:* Thursday, November 3, 2022 5:23 PM
>
> *To:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Re: Stage level scheduling - lower the number
> of executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Shay,  You may find this video helpful (with some API code samples that
> you are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc&t=171s.
> The issue here isn't how to limit the number of executors but to request
> for the right GPU-enabled executors dynamically.  Those executors used in
> pre-GPU stages should be returned back to resource managers with dynamic
> resource allocation enabled (and with the right DRA policies).  Hope this
> helps..
>
> Unfortunately there isn't a lot of detailed docs for this topic since GPU
> acceleration is kind of new in Spark (not straightforward like in TF).   I
> wish the Spark doc team could provide more details in the next release...
>
> On 11/3/22 2:37 AM, Shay Elbaz wrote:
>
> Thanks Artemis. We are *not* using Rapids, but rather using GPUs through
> the Stage Level Scheduling feature with ResourceProfile. In Kubernetes
> you have to turn on shuffle tracking for dynamic allocation, anyhow.
> The question is how we can limit the *number of executors *when building
> a new ResourceProfile, directly (API) or indirectly (some advanced
> workaround).
>
> Thanks,
> Shay
>
>
> --
> *From:* Artemis User  
> *Sent:* Thursday, November 3, 2022 1:16 AM
> *To:* user@spark.apache.org 
> 
> *Subject:* [EXTERNAL] Re: Stage level scheduling - lower the number of
> executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>   Are you using Rapids for GPU support in Spark?  Couple of options you
> may want to try:
>
>1. In addition to dynamic allocation turned on, you may also need to
>turn on external shuffling service.
>2. Sounds like you are using Kubernetes.  In that case, you may also
>need to turn on shuffle tracking.
>3. The "stages" are controlled by the APIs.  The APIs for dynamic
>resource request (change of stage) do exist, but only for RDDs (e.g.
>TaskResourceRequest and ExecutorResourceRequest).
>
>
> On 11/2/22 11:30 AM, Shay Elbaz wrote:
>
> Hi,
>
> Our typical applications need less *executors* for a GPU stage than for a
> CPU stage. We are using dynamic allocation with stage level scheduling, and
> Spark tries to maximize the number of executors also during the GPU stage,
> causing a bit of resources chaos in the cluster. This forces us to use a
> lower value for 'maxExecutors' in the first place, at the cost of the CPU
> stages performance. Or try to solve this in the Kubernets scheduler level,
> which is not straightforward and doesn't feel like the right way to go.
>
> Is there a way to effectively use less executors in Stage Level
> Scheduling? The API does not seem to include such an option, but maybe
> there is some more advanced workaround?
>
> Thanks,
> Shay
>
>
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: Profiling data quality with Spark

2022-12-27 Thread ayan guha
The way I would approach is to evaluate GE, Deequ (there is a python
binding called pydeequ) and others like Delta Live tables with expectations
from Data Quality feature perspective. All these tools have their pros and
cons, and all of them are compatible with spark as a compute engine.

Also, you may want to look at dbt based DQ toolsets if sql is your thing.

On Wed, 28 Dec 2022 at 3:14 pm, Sean Owen  wrote:

> I think this is kind of mixed up. Data warehouses are simple SQL
> creatures; Spark is (also) a distributed compute framework. Kind of like
> comparing maybe a web server to Java.
> Are you thinking of Spark SQL? then I dunno sure you may well find it more
> complicated, but it's also just a data warehousey SQL surface.
>
> But none of that relates to the question of data quality tools. You could
> use GE with Redshift, or indeed with Spark - are you familiar with it? It's
> probably one of the most common tools people use with Spark for this in
> fact. It's just a Python lib at heart and you can apply it with Spark, but
> _not_ with a data warehouse, so I'm not sure what you're getting at.
>
> Deequ is also commonly seen. It's actually built on Spark, so again,
> confused about this "use Redshift or Snowflake not Spark".
>
> On Tue, Dec 27, 2022 at 9:55 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> SPARK is just another querying engine with a lot of hype.
>>
>> I would highly suggest using Redshift (storage and compute decoupled
>> mode) or Snowflake without all this super complicated understanding of
>> containers/ disk-space, mind numbing variables, rocket science tuning, hair
>> splitting failure scenarios, etc. After that try to choose solutions like
>> Athena, or Trino/ Presto, and then come to SPARK.
>>
>> Try out solutions like  "great expectations" if you are looking for data
>> quality and not entirely sucked into the world of SPARK and want to keep
>> your options open.
>>
>> Dont get me wrong, SPARK used to be great in 2016-2017, but there are
>> superb alternatives now and the industry, in this recession, should focus
>> on getting more value for every single dollar they spend.
>>
>> Best of luck.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Dec 27, 2022 at 7:30 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well, you need to qualify your statement on data quality. Are you
>>> talking about data lineage here?
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 27 Dec 2022 at 19:25, rajat kumar 
>>> wrote:
>>>
>>>> Hi Folks
>>>> Hoping you are doing well, I want to implement data quality to detect
>>>> issues in data in advance. I have heard about few frameworks like GE/Deequ.
>>>> Can anyone pls suggest which one is good and how do I get started on it?
>>>>
>>>> Regards
>>>> Rajat
>>>>
>>> --
Best Regards,
Ayan Guha


Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-05 Thread ayan guha
Hi

What you are trying to do does not make sense. I suggest you to understand
how Views work in SQL. IMHO you are better off creating a table.

Ayan

On Fri, 6 Jan 2023 at 12:20 am, Stelios Philippou 
wrote:

> Vajiha,
>
> I dont see your query working as you hope it will.
>
> spark.sql will execute a query on a database level
>
> to retrieve the temp view you need to go from the sessions.
> i.e
>
> session.sql("SELECT * FROM TEP_VIEW")
>
> You might need to retrieve the data in a collection and iterate over them
> to do batch insertion using spark.sql("INSERt ...");
>
> Hope this helps
>
> Stelios
>
>
> --
> Hi Stelios Philippou,
> I need to create a view table in Postgresql DB using pyspark code. But I'm
> unable to create a view table, I can able to create table through pyspark
> code.
> I need to know Whether through Pyspark code can I create view table in
> postgresql database or not. Thanks for you reply
>
> Pyspark Code:
> df.createOrReplaceTempView("TEMP_VIEW")
> spark.sql("CREATE VIEW TEMP1 AS SELECT * FROM TEMP_VIEW")
>
> On Wed, 4 Jan 2023 at 15:10, Vajiha Begum S A <
> vajihabegu...@maestrowiz.com> wrote:
>
>>
>> I have tried to Create a permanent view in Postgresql DB through Pyspark
>> code, but I have received the below error message. Kindly help me to create
>> a permanent view table in the database.How shall create permanent view
>> using Pyspark code. Please do reply.
>>
>> *Error Message::*
>> *Exception has occurred: Analysis Exception*
>> Not allowed to create a permanent view `default`.`TEMP1` by referencing a
>> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP
>> VIEW
>>
>>
>> Regards,
>> Vajiha
>> Research Analyst
>> MW Solutions
>>
> --
Best Regards,
Ayan Guha


Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread ayan guha
t; Provide a csv file of 5 rows for the users table. Each row has a
>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>
>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>
>>>>> both as comma separated csv file
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>> I have not but I will certainly read up on this today.
>>>>>>
>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>> questions on which someone may be able to shed some light.
>>>>>>
>>>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>>>> instruct Spark to do?
>>>>>>
>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>> search too much?
>>>>>>
>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>> you give them. Spark has become a pillar on my team and knowing it in 
>>>>>> more
>>>>>> detail is warranted.
>>>>>>
>>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>> script I now have the line:
>>>>>>
>>>>>> ```
>>>>>> grouped_orders_df =
>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>> ```
>>>>>> (json is ultimately needed)
>>>>>>
>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>> single Array column. Now my worry is, will this column become too large 
>>>>>> if
>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>> documentation on such a limit but could not find any.
>>>>>>
>>>>>> I truly appreciate your help Mich and team,
>>>>>> Marco.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Have you thought of using  windowing function
>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>> achieve this?
>>>>>>>
>>>>>>> Effectively all your information is in the orders table.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>> marco.costant...@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>>>> for each 1 User in the users table, there are 10 Orders in the 
>>>>>>>> orders
>>>>>>>> table.
>>>>>>>>
>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>> User. So, a single user will need his/her own list of Orders. 
>>>>>>>> Additionally,
>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>> example).
>>>>>>>>
>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the 
>>>>>>>> email
>>>>>>>> sending individually. However, I now do not know the best way to get 
>>>>>>>> each
>>>>>>>> User's Orders.
>>>>>>>>
>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>
>>>>>>>> ```
>>>>>>>> users_df = 
>>>>>>>> orders_df = 
>>>>>>>>
>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>> def foreach_function(row):
>>>>>>>>   user_id = row.user_id
>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>
>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>
>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing 
>>>>>>>> that
>>>>>>>> work. However, I am worried of two things:
>>>>>>>>
>>>>>>>> If I take all Orders up front...
>>>>>>>>
>>>>>>>> Will that work?
>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>> who won't handle them (different User).
>>>>>>>>
>>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>>
>>>>>>>> Will it work?
>>>>>>>> Will that be too much IO to DB?
>>>>>>>>
>>>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>>>
>>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>>> am suffering from choice-paralysis.
>>>>>>>>
>>>>>>>> Please and thank you.
>>>>>>>>
>>>>>>> --
Best Regards,
Ayan Guha


[no subject]

2023-08-23 Thread ayan guha
Unsubscribe--
Best Regards,
Ayan Guha


Re: Spark SQL on large number of columns

2015-05-19 Thread ayan guha
One option is batch up columns and do the batches in sequence.
On 20 May 2015 00:20, "madhu phatak"  wrote:

> Hi,
> Another update, when run on more that 1000 columns I am getting
>
> Could not write class
> __wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1
> because it exceeds JVM code size limits. Method apply's code too large!
>
>
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 6:23 PM, madhu phatak 
> wrote:
>
>> Hi,
>> Tested with HiveContext also. It also take similar amount of time.
>>
>> To make the things clear, the following is select clause for a given
>> column
>>
>>
>> *aggregateStats( "$columnName" , max( cast($columnName as double)),   
>> |min(cast($columnName as double)), avg(cast($columnName as double)), 
>> count(*) )*
>>
>> aggregateStats is UDF generating case class to hold the values.
>>
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>> On Tue, May 19, 2015 at 5:57 PM, madhu phatak 
>> wrote:
>>
>>> Hi,
>>> Tested for calculating values for 300 columns. Analyser takes around 4
>>> minutes to generate the plan. Is this normal?
>>>
>>>
>>>
>>>
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>> On Tue, May 19, 2015 at 4:35 PM, madhu phatak 
>>> wrote:
>>>
>>>> Hi,
>>>> I am using spark 1.3.1
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Madhukara Phatak
>>>> http://datamantra.io/
>>>>
>>>> On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) 
>>>> wrote:
>>>>
>>>>>  And which version are you using
>>>>>
>>>>> 发自我的 iPhone
>>>>>
>>>>> 在 2015年5月19日,18:29,"ayan guha"  写道:
>>>>>
>>>>>   can you kindly share your code?
>>>>>
>>>>> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I  am trying run spark sql aggregation on a file with 26k columns. No
>>>>>> of rows is very small. I am running into issue that spark is taking huge
>>>>>> amount of time to parse the sql and create a logical plan. Even if i have
>>>>>> just one row, it's taking more than 1 hour just to get pass the parsing.
>>>>>> Any idea how to optimize in these kind of scenarios?
>>>>>>
>>>>>>
>>>>>>  Regards,
>>>>>>  Madhukara Phatak
>>>>>> http://datamantra.io/
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>  --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark Job not using all nodes in cluster

2015-05-19 Thread ayan guha
What is your spark env file says? Are you setting number of executors in
spark context?
On 20 May 2015 13:16, "Shailesh Birari"  wrote:

> Hi,
>
> I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
> of RAM.
> I have around 600,000+ Json files on HDFS. Each file is small around 1KB in
> size. Total data is around 16GB. Hadoop block size is 256MB.
> My application reads these files with sc.textFile() (or sc.jsonFile()
> tried
> both) API. But all the files are getting read by only one node (4
> executors). Spark UI shows all 600K+ tasks on one node and 0 on other
> nodes.
>
> I confirmed that all files are accessible from all nodes. Some other
> application which uses big files uses all nodes on same cluster.
>
> Can you please let me know why it is behaving in such way ?
>
> Thanks,
>   Shailesh
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive on Spark VS Spark SQL

2015-05-20 Thread ayan guha
And if I am not wrong, spark SQL api is intended to move closer to SQL
standards. I feel its a clever decision on spark's part to keep both APIs
operational. These short term confusions worth the long term benefits.
On 20 May 2015 17:19, "Sean Owen"  wrote:

> I don't think that's quite the difference. Any SQL  engine has a query
> planner and an execution engine. Both of these Spark for execution. HoS
> uses Hive for query planning. Although it's not optimized for execution on
> Spark per se, it's got a lot of language support and is stable/mature.
> Spark SQL's query planner is less developed at this point but purpose-built
> for Spark as an execution engine. Spark SQL is also how you put SQL-like
> operations in a Spark program -- programmatic SQL if you will -- which
> isn't what Hive or therefore HoS does. HoS is good if you're already using
> Hive and need its language features and need it as it works today, and want
> a faster batch execution version of it.
>
> On Wed, May 20, 2015 at 7:18 AM, Debasish Das 
> wrote:
>
>> SparkSQL was built to improve upon Hive on Spark runtime further...
>>
>> On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk <
>> guoqing0...@yahoo.com.hk> wrote:
>>
>>> Hive on Spark and SparkSQL which should be better , and what are the key
>>> characteristics and the advantages and the disadvantages between ?
>>>
>>> --
>>> guoqing0...@yahoo.com.hk
>>>
>>
>>
>


Re: Spark 1.3.1 - SQL Issues

2015-05-20 Thread ayan guha
Thanks a bunch
On 21 May 2015 07:11, "Davies Liu"  wrote:

> The docs had been updated.
>
> You should convert the DataFrame to RDD by `df.rdd`
>
> On Mon, Apr 20, 2015 at 5:23 AM, ayan guha  wrote:
> > Hi
> > Just upgraded to Spark 1.3.1.
> >
> > I am getting an warning
> >
> > Warning (from warnings module):
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py",
> > line 191
> > warnings.warn("inferSchema is deprecated, please use createDataFrame
> > instead")
> > UserWarning: inferSchema is deprecated, please use createDataFrame
> instead
> >
> > However, documentation still says to use inferSchema.
> > Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
> > section
> >
> > Also, I am getting an error in mlib.ALS.train function when passing
> > dataframe (do I need to convert the DF to RDD?)
> >
> > Code:
> > training = ssc.sql("select userId,movieId,rating from ratings where
> > partitionKey < 6").cache()
> > print type(training)
> > model = ALS.train(training,rank,numIter,lmbda)
> >
> > Error:
> > 
> > Rank:8 Lmbda:1.0 iteration:10
> >
> > Traceback (most recent call last):
> >   File "D:\Project\Spark\code\movie_sql.py", line 109, in 
> > bestConf =
> getBestModel(sc,ssc,training,validation,validationNoRating)
> >   File "D:\Project\Spark\code\movie_sql.py", line 54, in getBestModel
> > model = ALS.train(trainingRDD,rank,numIter,lmbda)
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> > line 139, in train
> > model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank,
> > iterations,
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> > line 127, in _prepare
> > assert isinstance(ratings, RDD), "ratings should be RDD"
> > AssertionError: ratings should be RDD
> >
> > --
> > Best Regards,
> > Ayan Guha
>


Re: Partitioning of Dataframes

2015-05-22 Thread ayan guha
DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already optimizes
partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:

> Hi,
>
> is there any way to control how Dataframes are partitioned? I'm doing lots
> of joins and am seeing very large shuffle reads and writes in the Spark UI.
> With PairRDDs you can control how the data is partitioned across nodes with
> partitionBy. There is no such method on Dataframes however. Can I somehow
> partition the underlying the RDD manually? I am currently using the Python
> API.
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: partitioning after extracting from a hive table?

2015-05-22 Thread ayan guha
I guess not. Spark partitions correspond to number of splits.
On 23 May 2015 00:02, "Cesar Flores"  wrote:

>
> I have a table in a Hive database partitioning by date. I notice that when
> I query this table using HiveContext the created data frame has an specific
> number of partitions.
>
>
> Do this partitioning corresponds to my original table partitioning in Hive?
>
>
> Thanks
> --
> Cesar Flores
>


Re: SparkSQL query plan to Stage wise breakdown

2015-05-23 Thread ayan guha
I think you are looking for Df.explain
On 23 May 2015 12:51, "Pramod Biligiri"  wrote:

> Hi,
> Is there an easy way to see how a SparkSQL query plan maps to different
> stages of the generated Spark job? The WebUI is entirely in terms of RDD
> stages and I'm having a hard time mapping it back to my query.
>
> Pramod
>


Re: DataFrame groupBy vs RDD groupBy

2015-05-23 Thread ayan guha
Hi Michael

This is great info. I am currently using repartitionandsort function to
achieve the same. Is this the recommended way till 1.3 or is there any
better way?
On 23 May 2015 07:38, "Michael Armbrust"  wrote:

> DataFrames have a lot more information about the data, so there is a whole
> class of optimizations that are possible there that we cannot do in RDDs.
> This is why we are focusing a lot of effort on this part of the project.
> In Spark 1.4 you can accomplish what you want using the new window function
> feature.  This can be done with SQL as you described or directly on a
> DataFrame:
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.expressions._
>
> val df = Seq(("a", 1), ("b", 1), ("c", 2), ("d", 2)).toDF("x", "y")
> df.select('x, 'y,
> rowNumber.over(Window.partitionBy("y").orderBy("x")).as("number")).show
>
> +-+-+--+
> |x|y|number|
> +-+-+--+
> |a|1| 1|
> |b|1| 2|
> |c|2| 1|
> |d|2| 2|
> +-+-+--+
>
> On Fri, May 22, 2015 at 3:35 AM, gtanguy 
> wrote:
>
>> Hello everybody,
>>
>> I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
>> of my code using groupBy became really slow.
>>
>> *1/ *Why does the groupBy of rdd is really slow in comparison to the
>> groupBy
>> of dataFrame?
>>
>> // DataFrame : running in few seconds
>> val result = table.groupBy("col1").count
>>
>> // RDD : taking hours with a lot of /spilling in-memory/
>> val schemaOriginel = table.schema
>> val result = table.rdd.groupBy { r =>
>>  val rs = RowSchema(r, schemaOriginel)
>>  val col1 = rs.getValueByName("col1")
>>  col1
>>   }.map(l => (l._1,l._2.size) ).count()
>>
>>
>> *2/* My goal is to groupBy on a key, then to order each group over a
>> column
>> and finally to add the row number in each group. I had this code running
>> before changing to Spark 1.3 and it worked fine, but since I have changed
>> to
>> DataFrame it is really slow.
>>
>>  val schemaOriginel = table.schema
>>  val result = table.rdd.groupBy { r =>
>> val rs = RowSchema(r, schemaOriginel)
>> val col1 = rs.getValueByName("col1")
>>  col1
>> }.flatMap {
>>  l =>
>>l._2.toList
>>  .sortBy {
>>   u =>
>> val rs = RowSchema(u, schemaOriginel)
>> val col1 = rs.getValueByName("col1")
>> val col2 = rs.getValueByName("col2")
>> (col1, col2)
>> } .zipWithIndex
>> }
>>
>> /I think the SQL equivalent of what I try to do : /
>>
>> SELECT a,
>>ROW_NUMBER() OVER (PARTITION BY a) AS num
>> FROM table.
>>
>>
>>  I don't think I can do this with a GroupedData (result of df.groupby).
>> Any
>> ideas on how I can speed up this?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Using Spark like a search engine

2015-05-24 Thread ayan guha
Yes, spark will be useful for following areas of your application:
1. Running same function on every CV in parallel and score
2. Improve scoring function by better access to classification and
clustering algorithms, within and beyond mllib.

These are first benefits you can start with and then think of further
improvement s specific to various use case, like pre annotated CV s etc

Best
Ayan
On 25 May 2015 15:59, "Сергей Мелехин"  wrote:

> HI!
> We are developing scoring system for recruitment. Recruiter enters vacancy
> requirements, and we score tens of thousands of CVs to this requirements,
> and return e.g. top 10 matches.
> We do not use fulltext search and sometimes even dont filter input CVs
> prior to scoring (some vacancies do not have mandatory requirements that
> can be used as a filter effectively).
>
> So we have scoring function F(CV,VACANCY) that is currently inplemented in
> SQL and runs on Postgresql cluster. In worst case F is executed once on
> every CV in database. VACANCY part is fixed for one query, but changes
> between queries and there's very little we can process in advance.
>
> We expect to have about 100 000 000 CVs in next year, and do not expect
> our current implementation to offer desired low latency responce (<1 s) on
> 100M CVs. So we look for a horizontaly scaleable and fault-tolerant
> in-memory solution.
>
> Will Spark be usefull for our task? All tutorials I could find describe
> stream processing, or ML applications. What Spark extensions/backends can
> be useful?
>
>
> With best regards, Segey Melekhin
>


Re: DataFrame. Conditional aggregation

2015-05-25 Thread ayan guha
Case when col2>100 then 1 else col2 end
On 26 May 2015 00:25, "Masf"  wrote:

> Hi.
>
> In a dataframe, How can I execution a conditional sentence in a
> aggregation. For example, Can I translate this SQL statement to DataFrame?:
>
> SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1)
> FROM table
> GROUP BY name
>
> Thanks
> --
> Regards.
> Miguel
>


Re: Running Javascript from scala spark

2015-05-26 Thread ayan guha
Yes you are in right mailing list, for sure :)

Regarding your question, I am sure you are well versed with how spark
works. Essentially you can run any arbitrary function with map call and it
will run in remote nodes. Hence you need to install any needed dependency
in all nodes. You can also pass on any additional custom code through jar
files which get shipped to cluster when your function is run.

This is of course a general idea. In your case, if you can kindly show what
you are doing and any errors then experts here will definitely help.

Best
Ayan
On 27 May 2015 05:08, "andy petrella"  wrote:

> Yop, why not using like you said a js engine le rhino? But then I would
> suggest using mapPartition instead si only one engine per partition.
> Probably broadcasting the script is also a good thing to do.
>
> I guess it's for add hoc transformations passed by a remote client,
> otherwise you could simply convert the js into Scala, right?
>
> HTH
> Andy
>
> Le mar. 26 mai 2015 21:03, marcos rebelo  a écrit :
>
>> Hi all
>>
>> Let me be clear, I'm speaking of Spark (big data, map/reduce, hadoop, ...
>> related). I have multiple map/flatMap/groupBy and one of the steps needs to
>> be a map passing the item inside a JavaScript code.
>>
>> 2 Questions:
>>  - Is this question related to this list?
>>  - Did someone do something similar?
>>
>> Best Regards
>> Marcos Rebelo
>>
>>
>>
>> On Tue, May 26, 2015 at 8:03 PM, Marcelo Vanzin 
>> wrote:
>>
>>> Is it just me or does that look completely unrelated to
>>> Spark-the-Apache-project?
>>>
>>> On Tue, May 26, 2015 at 10:55 AM, Ted Yu  wrote:
>>>
 Have you looked at https://github.com/spark/sparkjs ?

 Cheers

 On Tue, May 26, 2015 at 10:17 AM, marcos rebelo 
 wrote:

> Hi all,
>
> My first message on this mailing list:
>
> I need to run JavaScript on Spark. Somehow I would like to use the
> ScriptEngineManager or any other way that makes Rhino do the work for me.
>
> Consider that I have a Structure that needs to be changed by a
> JavaScript. I will have a set of Javascript and depending on the structure
> I will do some calculation.
>
> Did someone make it work and can get me a simple snippet that works?
>
> Thanks for any support
>
> Best Regards
> Marcos Rebelo
>
>

>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>


Re: DataFrame. Conditional aggregation

2015-05-26 Thread ayan guha
For this, I can give you a SQL solution:

joined data.registerTempTable('j')

Res=ssc.sql('select col1,col2, count(1) counter, min(col3) minimum,
sum(case when endrscp>100 then 1 else 0 end test from j'

Let me know if this works.
On 26 May 2015 23:47, "Masf"  wrote:

> Hi
> I don't know how it works. For example:
>
> val result = joinedData.groupBy("col1","col2").agg(
>   count(lit(1)).as("counter"),
>   min(col3).as("minimum"),
>   sum("case when endrscp> 100 then 1 else 0 end").as("test")
> )
>
> How can I do it?
>
> Thanks
> Regards.
> Miguel.
>
> On Tue, May 26, 2015 at 12:35 AM, ayan guha  wrote:
>
>> Case when col2>100 then 1 else col2 end
>> On 26 May 2015 00:25, "Masf"  wrote:
>>
>>> Hi.
>>>
>>> In a dataframe, How can I execution a conditional sentence in a
>>> aggregation. For example, Can I translate this SQL statement to DataFrame?:
>>>
>>> SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1)
>>> FROM table
>>> GROUP BY name
>>>
>>> Thanks
>>> --
>>> Regards.
>>> Miguel
>>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>


Re: How to give multiple directories as input ?

2015-05-27 Thread ayan guha
What about /blah/*/blah/out*.avro?
On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

> I am doing that now.
> Is there no other way ?
>
> On Wed, May 27, 2015 at 12:40 PM, Akhil Das 
> wrote:
>
>> How about creating two and union [ sc.union(first, second) ] them?
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>> wrote:
>>
>>> I have this piece
>>>
>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>> AvroKeyInputFormat[GenericRecord]](
>>> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>>>
>>> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
>>> input.
>>>
>>> I want to give a second directory as input but this is a invalid syntax
>>>
>>>
>>> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
>>> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>>
>>> OR
>>>
>>> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
>>> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>>
>>>
>>> Please suggest.
>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


Re: How many executors can I acquire in standalone mode ?

2015-05-27 Thread ayan guha
You can request number of cores and amount of memory for each executor.
On 27 May 2015 18:25, "canan chen"  wrote:

> Thanks Arush.
> My scenario is that In standalone mode, if I have one worker, when I start
> spark-shell, there will be one executor launched. But if I have 2 workers,
> there will be 2 executors launched, so I am wondering the mechanism of
> executor allocation.
> Is it possible to specify how many executors I want in the code ?
>
> On Tue, May 26, 2015 at 5:57 PM, Arush Kharbanda <
> ar...@sigmoidanalytics.com> wrote:
>
>> I believe you would be restricted by the number of cores you have in your
>> cluster. Having a worker running without a core is useless.
>>
>> On Tue, May 26, 2015 at 3:04 PM, canan chen  wrote:
>>
>>> In spark standalone mode, there will be one executor per worker. I am
>>> wondering how many executor can I acquire when I submit app ? Is it greedy
>>> mode (as many as I can acquire )?
>>>
>>
>>
>>
>> --
>>
>> [image: Sigmoid Analytics] 
>>
>> *Arush Kharbanda* || Technical Teamlead
>>
>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>
>
>


Re: DataFrame. Conditional aggregation

2015-05-27 Thread ayan guha
Till 1.3, you have to prepare the DF appropriately

def setupCondition(t):
if t[1] > 100:
v = 1
else:
v = 0
return Row(col1=t[0],col2=t[1],col3=t[2],col4=v)


 d1=[[1001,100,50],[1001,200,100],[1002,100,99]]
d1RDD = sc.parallelize(d1).map(setupCondition)
d1DF = ssc.createDataFrame(d1RDD)
d1DF.printSchema()
d1DF.show()
res = d1DF.groupBy("col1").agg({'col3':'min','col4':'sum'})
print "\n\n"
res.show()

root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)
 |-- col4: long (nullable = true)

col1 col2 col3 col4
1001 100  50   0
1001 200  100  1
1002 100  99   0



col1 SUM(col4) MIN(col3)
1001 1 50
1002 0 99

Good news is since 1.4, DF will have methods like when,otherwise (and a LOT
more)cant wait to get my hands on 1.4 :)




On Wed, May 27, 2015 at 5:12 PM, Masf  wrote:

> Yes. I think that your sql solution will work but I was looking for a
> solution with DataFrame API. I had thought to use UDF such as:
>
> val myFunc = udf {(input: Int) => {if (input > 100) 1 else 0}}
>
> Although I'd like to know if it's possible to do it directly in the
> aggregation inserting a lambda function or something else.
>
> Thanks
>
> Regards.
> Miguel.
>
>
> On Wed, May 27, 2015 at 1:06 AM, ayan guha  wrote:
>
>> For this, I can give you a SQL solution:
>>
>> joined data.registerTempTable('j')
>>
>> Res=ssc.sql('select col1,col2, count(1) counter, min(col3) minimum,
>> sum(case when endrscp>100 then 1 else 0 end test from j'
>>
>> Let me know if this works.
>> On 26 May 2015 23:47, "Masf"  wrote:
>>
>>> Hi
>>> I don't know how it works. For example:
>>>
>>> val result = joinedData.groupBy("col1","col2").agg(
>>>   count(lit(1)).as("counter"),
>>>   min(col3).as("minimum"),
>>>   sum("case when endrscp> 100 then 1 else 0 end").as("test")
>>> )
>>>
>>> How can I do it?
>>>
>>> Thanks
>>> Regards.
>>> Miguel.
>>>
>>> On Tue, May 26, 2015 at 12:35 AM, ayan guha  wrote:
>>>
>>>> Case when col2>100 then 1 else col2 end
>>>> On 26 May 2015 00:25, "Masf"  wrote:
>>>>
>>>>> Hi.
>>>>>
>>>>> In a dataframe, How can I execution a conditional sentence in a
>>>>> aggregation. For example, Can I translate this SQL statement to 
>>>>> DataFrame?:
>>>>>
>>>>> SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1)
>>>>> FROM table
>>>>> GROUP BY name
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Regards.
>>>>> Miguel
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>>
>>> Saludos.
>>> Miguel Ángel
>>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>



-- 
Best Regards,
Ayan Guha


Re: Where does partitioning and data loading happen?

2015-05-27 Thread ayan guha
I hate to say this, but your friend is right. Spark slaves (executors)
really pull the data. In fact, it is a standard practice in distributed
world, eg Hadoop. It is not practical to pass large amount of data through
master nor it gives a way to parallely read the data.

You can either use spark's own way of splitting the data, or you can use
hadoop style input formats which gives a set of splits, or you can use self
describing formats like parquet. But essentially your api to data has to
have a concept of cutting data to chunks, and spark just uses that to
decide which "slave" to pull what.

(mesos has nothing to do with this. its just a cluster manager as far as
spark is concerned)

On Thu, May 28, 2015 at 12:22 AM, Stephen Carman 
wrote:

> A colleague and I were having a discussion and we were disagreeing about
> something in Spark/Mesos that perhaps someone can shed some light into.
>
> We have a mesos cluster that runs spark via a sparkHome, rather than
> downloading an executable and such.
>
> My colleague says that say we have parquet files in S3, that slaves should
> know what data is in their partition and only pull from the S3 the
> partitions of parquet data they need, but this seems inherinitly wrong to
> me.
> as I have no idea how it’s possible for Spark or Mesos to know what
> partitions to know what to pull on the slave. It makes much more sense to
> me for the partitioning to be done on the driver and then distributed to the
> slaves so the slaves don’t have to necessarily worry about these details.
> If this were the case there is some data loading that is done on the
> driver, correct? Or does spark/mesos do some magic to pass a reference so
> the slaves
> know what to pull per say?
>
> So I guess in summation, where does partitioning and data loading happen?
> On the driver or on the executor?
>
> Thanks,
> Steve
> This e-mail is intended solely for the above-mentioned recipient and it
> may contain confidential or privileged information. If you have received it
> in error, please notify us immediately and delete the e-mail. You must not
> copy, distribute, disclose or take any action in reliance on it. In
> addition, the contents of an attachment to this e-mail may contain software
> viruses which could damage your own computer system. While ColdLight
> Solutions, LLC has taken every reasonable precaution to minimize this risk,
> we cannot accept liability for any damage which you sustain as a result of
> software viruses. You should perform your own virus checks before opening
> the attachment.
>



-- 
Best Regards,
Ayan Guha


Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread ayan guha
Yes, you are at right path. Only thing to remember is placing hive site XML
to correct path so spark can talk to hive metastore.

Best
Ayan
On 28 May 2015 10:53, "Sanjay Subramanian"
 wrote:

> hey guys
>
> On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x
> , there are about 300+ hive tables.
> The data is stored an text (moving slowly to Parquet) on HDFS.
> I want to use SparkSQL and point to the Hive metadata and be able to
> define JOINS etc using a programming structure like this
>
> import org.apache.spark.sql.hive.HiveContext
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")
>
>
> Is that the way to go ? Some guidance will be great.
>
> thanks
>
> sanjay
>
>
>
>


Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread ayan guha
Probably a naive question: can you try the same in hive CLI and see if your
SQL is working? Looks like hive thing to me as spark is faithfully
delegating the query to hive.
On 29 May 2015 03:22, "Abhishek Tripathi"  wrote:

> Hi ,
> I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
> compatibility so that I can run Spark sql and access temp table remotely.
>
> I used below command to build  Spark, it was build successful but when I
> tried to access Hive data from Spark sql, I get error.
>
> Thanks,
> Abhi
>
> ---
> *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
> -Phive-thriftserver -DskipTests clean package*
>
> [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
> [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
> SET spark.sql.hive.version=0.13.1
> spark-sql> show tables;
> sample_07 false
> t1 false
> Time taken: 3.901 seconds, Fetched 2 row(s)
> spark-sql> select * from t1;
> 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
> java.lang.VerifyError: class
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
> overrides final method *getUnknownFields*
> .()Lcom/google/protobuf/UnknownFieldSet;
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>
>
>
>


Re: Batch aggregation by sliding window + join

2015-05-28 Thread ayan guha
Which version of spark? In 1.4 window queries will show up for these kind
of scenarios.

1 thing I can suggest is keep daily aggregates materialised and partioned
by key and sorted by key-day combination using repartitionandsort method.
It allows you to use custom partitioner and custom sorter.

Best
Ayan
On 29 May 2015 03:31, "igor.berman"  wrote:

> Hi,
> I have a batch daily job that computes daily aggregate of several counters
> represented by some object.
> After daily aggregation is done, I want to compute block of 3 days
> aggregation(3,7,30 etc)
> To do so I need to add new daily aggregation to the current block and then
> subtract from current block the daily aggregation of the last day within
> the
> current block(sliding window...)
> I've implemented it with something like:
>
> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
> All rdds are keyed by unique id(long). Each rdd is saved in avro files
> after
> the job finishes and loaded when job starts(on next day). baseBlockRdd is
> much larger than lastDay and newDay rdds(depends on the size of the block)
>
> Unfortunately the performance is not satisfactory due to many shuffles(I
> have parallelism etc) I was looking for the way to improve performance
> somehow, to make sure that one task "joins" same local keys without
> reshuffling baseBlockRdd(which is big) each time the job starts(see
> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
> so bottom line - how to join big rdd with smaller rdd without reshuffling
> big rdd over and over again?
> As soon as I've saved this big rdd and reloaded it from disk I want that
> every other rdd will be partitioned and collocated by the same
> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
> small
> rdds will be sent over network.
>
> Another idea I had  - somehow split baseBlock into 2 parts with filter by
> keys of small rdds and then join, however I'm not sure it's possible to
> implement this filter without join.
>
> any ideas would be appreciated,
> thanks in advance
> Igor
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Batch aggregation by sliding window + join

2015-05-29 Thread ayan guha
My point is if you keep daily aggregates already computed then you do not
reprocess raw data. But yuh you may decide to recompute last 3 days
everyday.
On 29 May 2015 23:52, "Igor Berman"  wrote:

> Hi Ayan,
> thanks for the response
> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
> core, might be I should?)
> What do you mean by materialized? I can repartitionAndSort by key
> daily-aggregation, however I'm not quite understand how it will help with
> yesterdays block which needs to be loaded from file and it has no
> connection to this repartition of daily block.
>
>
> On 29 May 2015 at 01:51, ayan guha  wrote:
>
>> Which version of spark? In 1.4 window queries will show up for these kind
>> of scenarios.
>>
>> 1 thing I can suggest is keep daily aggregates materialised and partioned
>> by key and sorted by key-day combination using repartitionandsort method.
>> It allows you to use custom partitioner and custom sorter.
>>
>> Best
>> Ayan
>> On 29 May 2015 03:31, "igor.berman"  wrote:
>>
>>> Hi,
>>> I have a batch daily job that computes daily aggregate of several
>>> counters
>>> represented by some object.
>>> After daily aggregation is done, I want to compute block of 3 days
>>> aggregation(3,7,30 etc)
>>> To do so I need to add new daily aggregation to the current block and
>>> then
>>> subtract from current block the daily aggregation of the last day within
>>> the
>>> current block(sliding window...)
>>> I've implemented it with something like:
>>>
>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>>> after
>>> the job finishes and loaded when job starts(on next day). baseBlockRdd is
>>> much larger than lastDay and newDay rdds(depends on the size of the
>>> block)
>>>
>>> Unfortunately the performance is not satisfactory due to many shuffles(I
>>> have parallelism etc) I was looking for the way to improve performance
>>> somehow, to make sure that one task "joins" same local keys without
>>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>>> so bottom line - how to join big rdd with smaller rdd without reshuffling
>>> big rdd over and over again?
>>> As soon as I've saved this big rdd and reloaded it from disk I want that
>>> every other rdd will be partitioned and collocated by the same
>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>>> small
>>> rdds will be sent over network.
>>>
>>> Another idea I had  - somehow split baseBlock into 2 parts with filter by
>>> keys of small rdds and then join, however I'm not sure it's possible to
>>> implement this filter without join.
>>>
>>> any ideas would be appreciated,
>>> thanks in advance
>>> Igor
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Format RDD/SchemaRDD contents to screen?

2015-05-29 Thread ayan guha
Depending on your spark version, you can convert schemaRDD to a dataframe
and then use .show()
On 30 May 2015 10:33, "Minnow Noir"  wrote:

> I"m trying to debug query results inside spark-shell, but finding it
> cumbersome to save to file and then use file system utils to explore the
> results, and .foreach(print) tends to interleave the results among the
> myriad log messages.  Take() and collect() truncate.
>
> Is there a simple way to present the contents of an RDD/SchemaRDD on the
> screen in a formatted way?   For example, say I want to take() the first 30
> lines/rows in an *RDD and present them in a readable way on the screen so
> that I can see what's missing or invalid.  Obviously, I'm just trying to
> sample the results in a readable way, not download everything to the driver.
>
>
> Thank you
>


Re: How Broadcast variable works

2015-05-29 Thread ayan guha
1. No. thats the purpose of broadcast variable, ie not to be shipped with
every task. From Documentation

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable
cached on each machine rather than shipping a copy of it with tasks. They
can be used, for example, to give every node a copy of a large input
dataset in an efficient manner. Spark also attempts to distribute broadcast
variables using efficient broadcast algorithms to reduce communication cost.


After the broadcast variable is created, it should be used instead of the
value v in any functions run on the cluster so that v is not shipped to the
nodes more than once. In addition, the object v should not be modified
after it is broadcast in order to ensure that all nodes get the same value
of the broadcast variable (e.g. if the variable is shipped to a new node
later).


2. See above :)

If you need what you are asking for, you are looking for closures.


Best

Ayan

On Sat, May 30, 2015 at 4:11 PM, bit1...@163.com  wrote:

>
> Can someone help take a look at my questions? Thanks.
>
> --
> bit1...@163.com
>
>
> *From:* bit1...@163.com
> *Date:* 2015-05-29 18:57
> *To:* user 
> *Subject:* How Broadcast variable works
> Hi,
> I have a spark streaming application.  SparkContext uses broadcast
> vriables to broadcast Configuration information that each task will be used
>
> I have following two questions:
> 1. Will the broadcast variable be broadcasted every time when driver sends
> tasks to workers in each batch interval?
> 2. If the above question is true, then if the broadcast variable is
> modified between the batch interval(The configuration information is
> updated over time) and Spark Context broadcasts it again,  will tasks see
> the updated variable?
>
> Thanks.
>
>
>
> --
> bit1...@163.com
>
>


-- 
Best Regards,
Ayan Guha


Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-30 Thread ayan guha
I hope they will come up with1.4 before spark summit in mid June
On 31 May 2015 10:07, "Joseph Bradley"  wrote:

> Spark 1.4 should be available next month, but I'm not sure about the exact
> date.
> Your interpretation of high lambda is reasonable.  "High" lambda is really
> data-dependent.
> "lambda" is the same as the "regParam" in Spark, available in all recent
> Spark versions.
>
> On Fri, May 29, 2015 at 5:35 AM, mélanie gallois <
> melanie.galloi...@gmail.com> wrote:
>
>> When will Spark 1.4 be available exactly?
>> To answer to "Model selection can be achieved through high
>> lambda resulting lots of zero in the coefficients" : Do you mean that
>> putting a high lambda as a parameter of the logistic regression keeps only
>> a few significant variables and "deletes" the others with a zero in the
>> coefficients? What is a high lambda for you?
>> Is the lambda a parameter available in Spark 1.4 only or can I see it in
>> Spark 1.3?
>>
>> 2015-05-23 0:04 GMT+02:00 Joseph Bradley :
>>
>>> If you want to select specific variable combinations by hand, then you
>>> will need to modify the dataset before passing it to the ML algorithm.  The
>>> DataFrame API should make that easy to do.
>>>
>>> If you want to have an ML algorithm select variables automatically, then
>>> I would recommend using L1 regularization for now and possibly elastic net
>>> after 1.4 is release, per DB's suggestion.
>>>
>>> If you want detailed model statistics similar to what R provides, I've
>>> created a JIRA for discussing how we should add that functionality to
>>> MLlib.  Those types of stats will be added incrementally, but feedback
>>> would be great for prioritization:
>>> https://issues.apache.org/jira/browse/SPARK-7674
>>>
>>> To answer your question: "How are the weights calculated: is there a
>>> correlation calculation with the variable of interest?"
>>> --> Weights are calculated as with all logistic regression algorithms,
>>> by using convex optimization to minimize a regularized log loss.
>>>
>>> Good luck!
>>> Joseph
>>>
>>> On Fri, May 22, 2015 at 1:07 PM, DB Tsai  wrote:
>>>
 In Spark 1.4, Logistic Regression with elasticNet is implemented in ML
 pipeline framework. Model selection can be achieved through high
 lambda resulting lots of zero in the coefficients.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Fri, May 22, 2015 at 1:19 AM, SparknewUser
  wrote:
 > I am new in MLlib and in Spark.(I use Scala)
 >
 > I'm trying to understand how LogisticRegressionWithLBFGS and
 > LogisticRegressionWithSGD work.
 > I usually use R to do logistic regressions but now I do it on Spark
 > to be able to analyze Big Data.
 >
 > The model only returns weights and intercept. My problem is that I
 have no
 > information about which variable is significant and which variable I
 had
 > better
 > to delete to improve my model. I only have the confusion matrix and
 the AUC
 > to evaluate the performance.
 >
 > Is there any way to have information about the variables I put in my
 model?
 > How can I try different variable combinations, do I have to modify the
 > dataset
 > of origin (e.g. delete one or several columns?)
 > How are the weights calculated: is there a correlation calculation
 with the
 > variable
 > of interest?
 >
 >
 >
 > --
 > View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
 > Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>>
>> --
>> *Mélanie*
>>
>
>


Re: Adding an indexed column

2015-05-31 Thread ayan guha
If you are on spark 1.3, use repartitionandSort followed by mappartition.
In 1.4, window functions will be supported, it seems
On 1 Jun 2015 04:10, "Ricardo Almeida"  wrote:

> That's great and how would you create an ordered index by partition (by
> product in this example)?
>
> Assuming now a dataframe like:
>
> flag | product | price
> --
> 1|   a |47.808764653746
> 1|   b |47.808764653746
> 1|   a |31.9869279512204
> 1|   b |47.7907893713564
> 1|   a |16.7599200038239
> 1|   b |16.7599200038239
> 1|   b |20.3916014172137
>
>
> get a new dataframe such as:
>
> flag | product | price | index
> --
> 1|   a |47.808764653746  | 0
> 1|   a |31.9869279512204 | 1
> 1|   a |16.7599200038239 | 2
> 1|   b |47.808764653746  | 0
> 1|   b |47.7907893713564 | 1
> 1|   b |20.3916014172137 | 2
> 1|   b |16.7599200038239 | 3
>
>
>
>
>
>
>
>
> On 29 May 2015 at 12:25, Wesley Miao  wrote:
>
>> One way I can see is to -
>>
>> 1. get rdd from your df
>> 2. call rdd.zipWithIndex to get a new rdd
>> 3. turn your new rdd to a new df
>>
>> On Fri, May 29, 2015 at 5:43 AM, Cesar Flores  wrote:
>>
>>>
>>> Assuming that I have the next data frame:
>>>
>>> flag | price
>>> --
>>> 1|47.808764653746
>>> 1|47.808764653746
>>> 1|31.9869279512204
>>> 1|47.7907893713564
>>> 1|16.7599200038239
>>> 1|16.7599200038239
>>> 1|20.3916014172137
>>>
>>> How can I create a data frame with an extra indexed column as the next
>>> one:
>>>
>>> flag | price  | index
>>> --|---
>>> 1|47.808764653746 | 0
>>> 1|47.808764653746 | 1
>>> 1|31.9869279512204| 2
>>> 1|47.7907893713564| 3
>>> 1|16.7599200038239| 4
>>> 1|16.7599200038239| 5
>>> 1|20.3916014172137| 6
>>>
>>> --
>>> Cesar Flores
>>>
>>
>>
>


Re: Spark stages very slow to complete

2015-06-01 Thread ayan guha
Would you mind posting the code?
On 2 Jun 2015 00:53, "Karlson"  wrote:

> Hi,
>
> In all (pyspark) Spark jobs, that become somewhat more involved, I am
> experiencing the issue that some stages take a very long time to complete
> and sometimes don't at all. This clearly correlates with the size of my
> input data. Looking at the stage details for one such stage, I am wondering
> where Spark spends all this time. Take this table of the stages task
> metrics for example:
>
> Metric  Min 25th
> percentile  Median  75th percentile Max
> Duration1.4 min 1.5 min 1.7 min
>  1.9 min 2.3 min
> Scheduler Delay 1 ms3 ms4 ms
>   5 ms23 ms
> Task Deserialization Time   1 ms2 ms3 ms
>   8 ms22 ms
> GC Time 0 ms0 ms0 ms
>   0 ms0 ms
> Result Serialization Time   0 ms0 ms0 ms
>   0 ms1 ms
> Getting Result Time 0 ms0 ms0 ms
>   0 ms0 ms
> Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB /
> 1 24.1 KB / 1 24.3 KB / 1
>
> Why is the overall duration almost 2min? Where is all this time spent,
> when no progress of the stages is visible? The progress bar simply displays
> 0 succeeded tasks for a very long time before sometimes slowly progressing.
>
> Also, the name of the stage displayed above is `javaToPython at null:-1`,
> which I find very uninformative. I don't even know which action exactly is
> responsible for this stage. Does anyone experience similar issues or have
> any advice for me?
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark sql - reading data from sql tables having space in column names

2015-06-02 Thread ayan guha
I would think the easiest way would be to create a view in DB with column
names with no space.

In fact, you can "pass" a sql in place of a real table.

>From documentation: "The JDBC table that should be read. Note that anything
that is valid in a `FROM` clause of a SQL query can be used. For example,
instead of a full table you could also use a subquery in parentheses."

Kindly let the community know if this works

On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal 
wrote:

> Hi,
>
> We are using spark sql (1.3.1) to load data from Microsoft sql server
> using jdbc (as described in
> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
> ).
>
> It is working fine except when there is a space in column names (we can't
> modify the schemas to remove space as it is a legacy database).
>
> Sqoop is able to handle such scenarios by enclosing column names in '[ ]'
> - the recommended method from microsoft sql server. (
> https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
> - line no 319)
>
> Is there a way to handle this in spark sql?
>
> Thanks,
> sachin
>



-- 
Best Regards,
Ayan Guha


Re: Filter operation to return two RDDs at once.

2015-06-02 Thread ayan guha
Why do you need to do that if filter and content of the resulting rdd are
exactly same? You may as well declare them as 1 RDD.
On 3 Jun 2015 15:28, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

> I want to do this
>
> val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId
> != NULL_VALUE)
>
> val guidUidMapSessions = rawQtSession.filter(_._2.qualifiedTreatmentId
> == NULL_VALUE)
>
> This will run two different stages can this be done in one stage ?
>
> val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
> *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)
>
>
>
>
> --
> Deepak
>
>


Re: Application is "always" in process when I check out logs of completed application

2015-06-02 Thread ayan guha
Have you done sc.stop() ? :)
On 3 Jun 2015 14:05, "amghost"  wrote:

> I run spark application in spark standalone cluster with client deploy
> mode.
> I want to check out the logs of my finished application, but I always get
> a
> page telling me "Application history not found - Application xxx is still
> in
> process".
> I am pretty sure that the application has indeed completed because I can
> see
> it in the Completed Applications list show by Spark WebUI, and I have also
> found the log file with suffix ".inprocess"in the directory set by
> "spark.eventLog.dir" in my spark-default.conf
>
> Oh, BTW, I am using spark 1.3.0
>
> So, is there anything I missed?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Python Image Library and Spark

2015-06-03 Thread ayan guha
Try with large number of partition in parallelize.
On 4 Jun 2015 06:28, "Justin Spargur"  wrote:

> Hi all,
>
>  I'm playing around with manipulating images via Python and want to
> utilize Spark for scalability. That said, I'm just learing Spark and my
> Python is a bit rusty (been doing PHP coding for the last few years). I
> think I have most of the process figured out. However, the script fails on
> larger images and Spark is sending out the following warning for smaller
> images:
>
> Stage 0 contains a task of very large size (1151 KB). The maximum
> recommended task size is 100 KB.
>
> My code is as follows:
>
> import Image
> from pyspark import SparkContext
>
> if __name__ == "__main__":
>
> imageFile = "sample.jpg"
> outFile   = "sample.gray.jpg"
>
> sc = SparkContext(appName="Grayscale")
> im = Image.open(imageFile)
>
> # Create an RDD for the data from the image file
> img_data = sc.parallelize( list(im.getdata()) )
>
> # Create an RDD for the grayscale value
> gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 +
> x[2]*0.07) )
>
> # Put our grayscale value into the RGR channels
> grayscale = gValue.map( lambda x: (x,x,x)  )
>
> # Save the output in a new image.
> im.putdata( grayscale.collect() )
>
> im.save(outFile)
>
> Obviously, something is amiss. However, I can't figure out where I'm off
> track with this. Any help is appreciated! Thanks in advance!!!
>


Re: Saving calculation to single local file

2015-06-05 Thread ayan guha
Just repartition to 1 partition before writing.

On Fri, Jun 5, 2015 at 3:46 PM, marcos rebelo  wrote:

> Hi all
>
> I'm running spark in a single local machine, no hadoop, just reading and
> writing in local disk.
>
> I need to have a single file as output of my calculation.
>
> if I do "rdd.saveAsTextFile(...)" all runs ok but I get allot of files.
> Since I need a single file I was considering to do something like:
>
>   Try {new FileWriter(outputPath)} match {
> case Success(writer) =>
>   try {
> rdd.toLocalIterator.foreach({line =>
>   val str = line.toString
>   writer.write(str)
> }
>   }
> }
> ...
>   }
>
>
> I get:
>
> [error] o.a.s.e.Executor - Exception in task 0.0 in stage 41.0 (TID 32)
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in
> thread Thread[Executor task launch worker-1,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.s.TaskSetManager - Task 0 in stage 41.0 failed 1 times;
> aborting job
> [warn] application - Can't write to /tmp/err1433498283479.csv: {}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 41.0 (TID 32, localhost): java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.10.5.jar:na]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> ~[scala-library-2.10.5.jar:na]
>
>
> if this rdd.toLocalIterator.foreach(...) doesn't work, what is the better
> solution?
>
> Best Regards
> Marcos
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Saving calculation to single local file

2015-06-05 Thread ayan guha
Another option is merge partfiles after your app ends.
On 5 Jun 2015 20:37, "Akhil Das"  wrote:

> you can simply do rdd.repartition(1).saveAsTextFile(...), it might not be
> efficient if your output data is huge since one task will be doing the
> whole writing.
>
> Thanks
> Best Regards
>
> On Fri, Jun 5, 2015 at 3:46 PM, marcos rebelo  wrote:
>
>> Hi all
>>
>> I'm running spark in a single local machine, no hadoop, just reading and
>> writing in local disk.
>>
>> I need to have a single file as output of my calculation.
>>
>> if I do "rdd.saveAsTextFile(...)" all runs ok but I get allot of files.
>> Since I need a single file I was considering to do something like:
>>
>>   Try {new FileWriter(outputPath)} match {
>> case Success(writer) =>
>>   try {
>> rdd.toLocalIterator.foreach({line =>
>>   val str = line.toString
>>   writer.write(str)
>> }
>>   }
>> }
>> ...
>>   }
>>
>>
>> I get:
>>
>> [error] o.a.s.e.Executor - Exception in task 0.0 in stage 41.0 (TID 32)
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> ~[na:1.8.0_45]
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> ~[na:1.8.0_45]
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> ~[na:1.8.0_45]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> ~[na:1.8.0_45]
>> [error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in
>> thread Thread[Executor task launch worker-1,5,main]
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> ~[na:1.8.0_45]
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> ~[na:1.8.0_45]
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> ~[na:1.8.0_45]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> ~[na:1.8.0_45]
>> [error] o.a.s.s.TaskSetManager - Task 0 in stage 41.0 failed 1 times;
>> aborting job
>> [warn] application - Can't write to /tmp/err1433498283479.csv: {}
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 41.0 (TID 32, localhost): java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> ~[scala-library-2.10.5.jar:na]
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> ~[scala-library-2.10.5.jar:na]
>>
>>
>> if this rdd.toLocalIterator.foreach(...) doesn't work, what is the better
>> solution?
>>
>> Best Regards
>> Marcos
>>
>>
>>
>


Re: Managing spark processes via supervisord

2015-06-05 Thread ayan guha
I use a simple python to launch cluster. I just did itfor fun, so of course
not the best and lot ofmodifications can be done.But I think you arelooking
for something similar?

import subprocess as s
from time import sleep
cmd =
"D:\\spark\\spark-1.3.1-bin-hadoop2.6\\spark-1.3.1-bin-hadoop2.6\\spark-1.3.1-bin-hadoop2.6\\bin\\spark-class.cmd"

master = "org.apache.spark.deploy.master.Master"
worker = "org.apache.spark.deploy.worker.Worker"
masterUrl="spark://BigData:7077"
cmds={"masters":1,"workers":3}

masterProcess=[cmd,master]
workerProcess=[cmd,worker,masterUrl]

noWorker = 3

pMaster = s.Popen(masterProcess)
sleep(3)

pWorkers = []
for i in range(noWorker):
pw = s.Popen(workerProcess)
pWorkers.append(pw)



On Sat, Jun 6, 2015 at 8:19 AM, Mike Trienis 
wrote:

> Thanks Ignor,
>
> I managed to find a fairly simple solution. It seems that the shell
> scripts (e.g. .start-master.sh, start-slave.sh) end up executing
> /bin/spark-class which is always run in the foreground.
>
> Here is a solution I provided on stackoverflow:
>
>-
>
> http://stackoverflow.com/questions/30672648/how-to-autostart-an-apache-spark-cluster-using-supervisord/30676844#30676844
>
>
> Cheers Mike
>
>
> On Wed, Jun 3, 2015 at 12:29 PM, Igor Berman 
> wrote:
>
>> assuming you are talking about standalone cluster
>> imho, with workers you won't get any problems and it's straightforward
>> since they are usually foreground processes
>> with master it's a bit more complicated, ./sbin/start-master.sh goes
>> background which is not good for supervisor, but anyway I think it's
>> doable(going to setup it too in a few days)
>>
>> On 3 June 2015 at 21:46, Mike Trienis  wrote:
>>
>>> Hi All,
>>>
>>> I am curious to know if anyone has successfully deployed a spark cluster
>>> using supervisord?
>>>
>>>- http://supervisord.org/
>>>
>>> Currently I am using the cluster launch scripts which are working
>>> greater, however, every time I reboot my VM or development environment I
>>> need to re-launch the cluster.
>>>
>>> I am considering using supervisord to control all the processes (worker,
>>> master, ect.. ) in order to have the cluster up an running after boot-up;
>>> although I'd like to understand if it will cause more issues than it
>>> solves.
>>>
>>> Thanks, Mike.
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: columnar structure of RDDs from Parquet or ORC files

2015-06-08 Thread ayan guha
I would think DF=RDD+Schema+some additional methods. In fact, a DF object
has a DF.rdd in it so you can (if needed) convert DF<=>RDD really easily.

On Mon, Jun 8, 2015 at 5:41 PM, kiran lonikar  wrote:

> Thanks. Can you point me to a place in the documentation of SQL
> programming guide or DataFrame scaladoc where this transformation and
> actions are grouped like in the case of RDD?
>
> Also if you can tell me if sqlContext.load and unionAll are
> transformations or actions...
>
> I answered a question on the forum assuming unionAll is a blocking call
> and said execution of multiple load and df.unionAll in different threads
> would benefit performance :)
>
> Kiran
> On 08-Jun-2015 4:37 pm, "Cheng Lian"  wrote:
>
>>  For DataFrame, there are also transformations and actions. And
>> transformations are also lazily evaluated. However, DataFrame
>> transformations like filter(), select(), agg() return a DataFrame rather
>> than an RDD. Other methods like show() and collect() are actions.
>>
>> Cheng
>>
>> On 6/8/15 1:33 PM, kiran lonikar wrote:
>>
>> Thanks for replying twice :) I think I sent this question by email and
>> somehow thought I did not sent it, hence created the other one on the web
>> interface. Lets retain this thread since you have provided more details
>> here.
>>
>>  Great, it confirms my intuition about DataFrame. It's similar to Shark
>> columnar layout, with the addition of compression. There it used java nio's
>> ByteBuffer to hold actual data. I will go through the code you pointed.
>>
>>  I have another question about DataFrame: The RDD operations are divided
>> in two groups: *transformations *which are lazily evaluated and return a
>> new RDD and *actions *which evaluate lineage defined by transformations,
>> invoke actions and return results. What about DataFrame operations like
>> join, groupBy, agg, unionAll etc which are all transformations in RDD? Are
>> they lazily evaluated or immediately executed?
>>
>>
>>
>>


-- 
Best Regards,
Ayan Guha


Re: Running SparkSql against Hive tables

2015-06-08 Thread ayan guha
I am afraid you are going other way around :) If you want to use Hive in
spark, you'd need a HiveContext with  hive config files in spark cluster
(eveery node). This was spark can talk to hive metastore. Then you can
write queries on hive table using hiveContext's sql method and spark will
run it (either by reading from hive and creating RDD or lettinghive run the
query using MR). Final result will be a spark dataFrame.

What you currently doing is using beeline to connect to hive, which should
work even without spark.

Best
Ayan

On Tue, Jun 9, 2015 at 10:42 AM, James Pirz  wrote:

> Thanks for the help!
> I am actually trying Spark SQL to run queries against tables that I've
> defined in Hive.
>
> I follow theses steps:
> - I start hiveserver2 and in Spark, I start Spark's Thrift server by:
> $SPARK_HOME/sbin/start-thriftserver.sh --master
> spark://spark-master-node-ip:7077
>
> - and I start beeline:
> $SPARK_HOME/bin/beeline
>
> - In my beeline session, I connect to my running hiveserver2
> !connect jdbc:hive2://hive-node-ip:1
>
> and I can run queries successfully. But based on hiveserver2 logs, It
> seems it actually uses "Hadoop's MR" to run queries,  *not* Spark's
> workers. My goals is to access Hive's tables' data, but run queries through
> Spark SQL using Spark workers (not Hadoop).
>
> Is it possible to do that via Spark SQL (its CLI) or through its thrift
> server ? (I tried to find some basic examples in the documentation, but I
> was not able to) - Any suggestion or hint on how I can do that would be
> highly appreciated.
>
> Thnx
>
> On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian  wrote:
>
>>
>>
>> On 6/6/15 9:06 AM, James Pirz wrote:
>>
>> I am pretty new to Spark, and using Spark 1.3.1, I am trying to use
>> 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a
>> better performance, it is a good idea to use Parquet files. I have 2
>> questions regarding that:
>>
>>  1) If I wanna use Spark SQL against  *partitioned & bucketed* tables
>> with Parquet format in Hive, does the provided spark binary on the apache
>> website support that or do I need to build a new spark binary with some
>> additional flags ? (I found a note
>> <https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables>
>>  in
>> the documentation about enabling Hive support, but I could not fully get it
>> as what the correct way of building is, if I need to build)
>>
>> Yes, Hive support is enabled by default now for the binaries on the
>> website. However, currently Spark SQL doesn't support buckets yet.
>>
>>
>>  2) Does running Spark SQL against tables in Hive downgrade the
>> performance, and it is better that I load parquet files directly to HDFS or
>> having Hive in the picture is harmless ?
>>
>> If you're using Parquet, then it should be fine since by default Spark
>> SQL uses its own native Parquet support to read Parquet Hive tables.
>>
>>
>>  Thnx
>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-09 Thread ayan guha
Hi

I am little confused here. If I am writing to HDFS,shouldn't HDFS
replication factor will automatically kick in? In other words, how spark
writer is different than a hdfs -put commnd (from perspective of HDFS, of
course)?

Best
Ayan

On Tue, Jun 9, 2015 at 5:17 PM, Haopu Wang  wrote:

> Cheng,
>
> yes, it works, I set the property in SparkConf before initiating
> SparkContext.
> The property name is "spark.hadoop.dfs.replication"
> Thanks fro the help!
>
> -Original Message-
> From: Cheng Lian [mailto:lian.cs@gmail.com]
> Sent: Monday, June 08, 2015 6:41 PM
> To: Haopu Wang; user
> Subject: Re: SparkSQL: How to specify replication factor on the
> persisted parquet files?
>
> Then one possible workaround is to set "dfs.replication" in
> "sc.hadoopConfiguration".
>
> However, this configuration is shared by all Spark jobs issued within
> the same application. Since different Spark jobs can be issued from
> different threads, you need to pay attention to synchronization.
>
> Cheng
>
> On 6/8/15 2:46 PM, Haopu Wang wrote:
> > Cheng, thanks for the response.
> >
> > Yes, I was using HiveContext.setConf() to set "dfs.replication".
> > However, I cannot change the value in Hadoop core-site.xml because
> that
> > will change every HDFS file.
> > I only want to change the replication factor of some specific files.
> >
> > -Original Message-
> > From: Cheng Lian [mailto:lian.cs@gmail.com]
> > Sent: Sunday, June 07, 2015 10:17 PM
> > To: Haopu Wang; user
> > Subject: Re: SparkSQL: How to specify replication factor on the
> > persisted parquet files?
> >
> > Were you using HiveContext.setConf()?
> >
> > "dfs.replication" is a Hadoop configuration, but setConf() is only
> used
> > to set Spark SQL specific configurations. You may either set it in
> your
> > Hadoop core-site.xml.
> >
> > Cheng
> >
> >
> > On 6/2/15 2:28 PM, Haopu Wang wrote:
> >> Hi,
> >>
> >> I'm trying to save SparkSQL DataFrame to a persistent Hive table
> using
> >> the default parquet data source.
> >>
> >> I don't know how to change the replication factor of the generated
> >> parquet files on HDFS.
> >>
> >> I tried to set "dfs.replication" on HiveContext but that didn't work.
> >> Any suggestions are appreciated very much!
> >>
> >>
> >> -----
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >>
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Reading Really Big File Stream from HDFS

2015-06-11 Thread ayan guha
Why do you need to use stream in this use case? 50g need not to be in
memory. Give it a try with high number of partitions.
On 11 Jun 2015 23:09, "SLiZn Liu"  wrote:

> Hi Spark Users,
>
> I'm trying to load a literally big file (50GB when compressed as gzip
> file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as
> this file cannot be fitted in my memory. However, it looks like no RDD will
> be received until I copy this big file to a prior-specified location on
> HDFS. Ideally, I'd like read this file by a small number of lines at a
> time, but receiving a file stream requires additional writing to HDFS. Any
> idea to achieve this?
>
> BEST REGARDS,
> Todd Leo
>


Re: Deleting HDFS files from Pyspark

2015-06-11 Thread ayan guha
Simplest way would be issuing a os.system with HDFS rm command from driver,
assuming it has hdfs connectivity, like a gateway node. Executors will have
nothing to do with it.
On 12 Jun 2015 08:57, "Siegfried Bilstein"  wrote:

> I've seen plenty of examples for creating HDFS files from pyspark but
> haven't been able to figure out how to delete files from pyspark. Is there
> an API I am missing for filesystem management? Or should I be including the
> HDFS python modules?
>
> Thanks,
> Siegfried
>


Spark 1.4 release date

2015-06-12 Thread ayan guha
Hi

When is official spark 1.4 release date?
Best
Ayan


Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks a lot.
On 12 Jun 2015 19:46, "Todd Nist"  wrote:

> It was released yesterday.
>
> On Friday, June 12, 2015, ayan guha  wrote:
>
>> Hi
>>
>> When is official spark 1.4 release date?
>> Best
>> Ayan
>>
>


Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks guys, my question must look like a stupid one today :) Looking
forward to test out 1.4.0, just downloaded it.

Congrats to the team for this much anticipate release.

On Fri, Jun 12, 2015 at 10:12 PM, Guru Medasani  wrote:

> Here is a spark 1.4 release blog by data bricks.
>
> https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html
>
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> On Jun 12, 2015, at 7:08 AM, ayan guha  wrote:
>
> Thanks a lot.
> On 12 Jun 2015 19:46, "Todd Nist"  wrote:
>
>> It was released yesterday.
>>
>> On Friday, June 12, 2015, ayan guha  wrote:
>>
>>> Hi
>>>
>>> When is official spark 1.4 release date?
>>> Best
>>> Ayan
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: What is most efficient to do a large union and remove duplicates?

2015-06-14 Thread ayan guha
Can you do dedupe process locally for each file first and then globally?
Also I did not fully get the logic of the part inside reducebykey. Can you
kindly explain?
On 14 Jun 2015 13:58, "Gavin Yue"  wrote:

> I have 10 folder, each with 6000 files. Each folder is roughly 500GB.  So
> totally 5TB data.
>
> The data is  formatted as  key t/ value.  After union,  I want to remove
> the duplicates among keys. So each key should be unique and  has only one
> value.
>
> Here is what I am doing.
>
> folders = Array("folder1","folder2""folder10" )
>
> var rawData = sc.textFile(folders(0)).map(x => (x.split("\t")(0),
> x.split("\t")(1)))
>
> for (a <- 1 to sud_paths.length - 1) {
>   rawData = rawData.union(sc.textFile(folders (a)).map(x =>
> (x.split("\t")(0), x.split("\t")(1
> }
>
> val nodups = rawData.reduceByKey((a,b)=>
> {
>   if(a.length > b.length)
>   {a}
>   else
>   {b}
>   }
> )
> nodups.saveAsTextFile("/nodups")
>
> Anything I could do to make this process faster?   Right now my process
> dies when output the data to the HDFS.
>
>
> Thank you !
>


Re: Union of two Diff. types of DStreams

2015-06-14 Thread ayan guha
That's by design. You can make up the value in 2nd rdd by putting a default
Int value.
On 15 Jun 2015 15:30, "anshu shukla"  wrote:

> How to take union of JavaPairDStream  and  
> JavaDStream .
>
> *>a.union(b)  is  working  only with  Dstreams of same type.*
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: How to set up a Spark Client node?

2015-06-15 Thread ayan guha
I feel he wanted to ask about workers. In that case, pplease launch workers
on Node 3,4,5 (and/or Node 8,9,10 etc).
You need to go to each worker and start worker daemon with master URL:Port
(typically7077) as parameter (so workers can talk to master).

You shoud be able to see 1 masterr and N workers in UI which typically
starts on Master URL:8080.

Once you do that,you follow Akhil's instruction above to get a sqlContexxt
and set master property properly and runyour app.
HTH

On Mon, Jun 15, 2015 at 7:02 PM, Akhil Das 
wrote:

> I'm assuming by spark-client you mean the spark driver program. In that
> case you can pick any machine (say Node 7), create your driver program in
> it and use spark-submit to submit it to the cluster or if you create the
> SparkContext within your driver program (specifying all the properties)
> then you may simply run it with sbt run.
>
> Thanks
> Best Regards
>
> On Sun, Jun 14, 2015 at 6:17 AM, MrAsanjar .  wrote:
>
>> I have following hadoop & spark cluster nodes configuration:
>> Nodes 1 & 2 are resourceManager and nameNode respectivly
>> Nodes 3, 4, and 5 each includes nodeManager & dataNode
>> Node 7 is Spark-master configured to run yarn-client or yarn-master modes
>> I have tested it and it works fine.
>> Is there any instuctions on how to setup spark client in a cluster mode?
>> I am not sure if I am doing it right.
>> Thanks in advance
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark on EMR

2015-06-16 Thread ayan guha
That's great news. Can I assume spark on EMR supports kinesis to hbase
pipeline?
On 17 Jun 2015 05:29, "kamatsuoka"  wrote:

> Spark is now officially supported on Amazon Elastic Map Reduce:
> http://aws.amazon.com/elasticmapreduce/details/spark/
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-EMR-tp23343.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark or Storm

2015-06-16 Thread ayan guha
I have a similar scenario where we need to bring data from kinesis to
hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
be required but that's regardless of the tool so we will be writing that
piece in Java pojo.

All env is on aws. Hbase is on a long running EMR and kinesis on a separate
cluster.

TIA.
Best
Ayan
On 17 Jun 2015 12:13, "Will Briggs"  wrote:

> The programming models for the two frameworks are conceptually rather
> different; I haven't worked with Storm for quite some time, but based on my
> old experience with it, I would equate Spark Streaming more with Storm's
> Trident API, rather than with the raw Bolt API. Even then, there are
> significant differences, but it's a bit closer.
>
> If you can share your use case, we might be able to provide better
> guidance.
>
> Regards,
> Will
>
> On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:
>
> Hi All,
>
> I am evaluating spark VS storm ( spark streaming  ) and i am not able to
> see what is equivalent of Bolt in storm inside spark.
>
> Any help will be appreciated on this ?
>
> Thanks ,
> Ashish
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark or Storm

2015-06-17 Thread ayan guha
o do is
>>> straight forward transformation and you are reading from Kinesis to begin
>>> with, it might be an easier option to just do the transformation in Kinesis.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
>>> Whatever you write in bolts would be the logic you want to apply on your
>>> events. In Spark, that logic would be coded in map() or similar such
>>> transformations and/or actions. Spark doesn't enforce a structure for
>>> capturing your processing logic like Storm does.
>>> Regards
>>> Sab
>>> Probably overloading the question a bit.
>>>
>>> In Storm, Bolts have the functionality of getting triggered on events.
>>> Is that kind of functionality possible with Spark streaming? During each
>>> phase of the data processing, the transformed data is stored to the
>>> database and this transformed data should then be sent to a new pipeline
>>> for further processing
>>>
>>> How can this be achieved using Spark?
>>>
>>>
>>>
>>> On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast <
>>> sparkenthusi...@yahoo.in> wrote:
>>>
>>> I have a use-case where a stream of Incoming events have to be
>>> aggregated and joined to create Complex events. The aggregation will have
>>> to happen at an interval of 1 minute (or less).
>>>
>>> The pipeline is :
>>>   send events
>>>enrich event
>>> Upstream services ---> KAFKA -> event Stream
>>> Processor > Complex Event Processor > Elastic
>>> Search.
>>>
>>> From what I understand, Storm will make a very good ESP and Spark
>>> Streaming will make a good CEP.
>>>
>>> But, we are also evaluating Storm with Trident.
>>>
>>> How does Spark Streaming compare with Storm with Trident?
>>>
>>> Sridhar Chellappa
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>   On Wednesday, 17 June 2015 10:02 AM, ayan guha 
>>> wrote:
>>>
>>>
>>> I have a similar scenario where we need to bring data from kinesis to
>>> hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
>>> be required but that's regardless of the tool so we will be writing that
>>> piece in Java pojo.
>>> All env is on aws. Hbase is on a long running EMR and kinesis on a
>>> separate cluster.
>>> TIA.
>>> Best
>>> Ayan
>>> On 17 Jun 2015 12:13, "Will Briggs"  wrote:
>>>
>>> The programming models for the two frameworks are conceptually rather
>>> different; I haven't worked with Storm for quite some time, but based on my
>>> old experience with it, I would equate Spark Streaming more with Storm's
>>> Trident API, rather than with the raw Bolt API. Even then, there are
>>> significant differences, but it's a bit closer.
>>>
>>> If you can share your use case, we might be able to provide better
>>> guidance.
>>>
>>> Regards,
>>> Will
>>>
>>> On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:
>>>
>>> Hi All,
>>>
>>> I am evaluating spark VS storm ( spark streaming  ) and i am not able to
>>> see what is equivalent of Bolt in storm inside spark.
>>>
>>> Any help will be appreciated on this ?
>>>
>>> Thanks ,
>>> Ashish
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Intermedate stage will be cached automatically ?

2015-06-17 Thread ayan guha
Its not cached per se. For example, you will not see this in Storage tab in
UI. However, spark has read the data and its in memory right now. So, the
next count call should be very fast.


Best
Ayan

On Wed, Jun 17, 2015 at 10:21 PM, Mark Tse  wrote:

>  I think
> https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
> might shed some light on the behaviour you’re seeing.
>
>
>
> Mark
>
>
>
> *From:* canan chen [mailto:ccn...@gmail.com]
> *Sent:* June-17-15 5:57 AM
> *To:* spark users
> *Subject:* Intermedate stage will be cached automatically ?
>
>
>
> Here's one simple spark example that I call RDD#count 2 times. The first
> time it would invoke 2 stages, but the second one only need 1 stage. Seems
> the first stage is cached. Is that true ? Any flag can I control whether
> the cache the intermediate stage
>
>
> *val *data = sc.parallelize(1 to 10, 2).map(e=>(e%2,2)).reduceByKey(_ + 
> _, 2)
>     *println*(data.count())
> *println*(data.count())
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark or Storm

2015-06-17 Thread ayan guha
Thanks for this. It's kcl based kinesis application. But because its just a
Java application we are thinking to use spark on EMR or storm for fault
tolerance and load balancing. Is it a correct approach?
On 17 Jun 2015 23:07, "Enno Shioji"  wrote:

> Hi Ayan,
>
> Admittedly I haven't done much with Kinesis, but if I'm not mistaken you
> should be able to use their "processor" interface for that. In this
> example, it's incrementing a counter:
> https://github.com/awslabs/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/datavis/kcl/CountingRecordProcessor.java
>
> Instead of incrementing a counter, you could do your transformation and
> send it to HBase.
>
>
>
>
>
>
> On Wed, Jun 17, 2015 at 1:40 PM, ayan guha  wrote:
>
>> Great discussion!!
>>
>> One qs about some comment: Also, you can do some processing with Kinesis.
>> If all you need to do is straight forward transformation and you are
>> reading from Kinesis to begin with, it might be an easier option to just do
>> the transformation in Kinesis
>>
>> - Do you mean KCL application? Or some kind of processing withinKineis?
>>
>> Can you kindly share a link? I would definitely pursue this route as our
>> transformations are really simple.
>>
>> Best
>>
>> On Wed, Jun 17, 2015 at 10:26 PM, Ashish Soni 
>> wrote:
>>
>>> My Use case is below
>>>
>>> We are going to receive lot of event as stream ( basically Kafka Stream
>>> ) and then we need to process and compute
>>>
>>> Consider you have a phone contract with ATT and every call / sms / data
>>> useage you do is an event and then it needs  to calculate your bill on real
>>> time basis so when you login to your account you can see all those variable
>>> as how much you used and how much is left and what is your bill till date
>>> ,Also there are different rules which need to be considered when you
>>> calculate the total bill one simple rule will be 0-500 min it is free but
>>> above it is $1 a min.
>>>
>>> How do i maintain a shared state  ( total amount , total min , total
>>> data etc ) so that i know how much i accumulated at any given point as
>>> events for same phone can go to any node / executor.
>>>
>>> Can some one please tell me how can i achieve this is spark as in storm
>>> i can have a bolt which can do this ?
>>>
>>> Thanks,
>>>
>>>
>>>
>>> On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji  wrote:
>>>
>>>> I guess both. In terms of syntax, I was comparing it with Trident.
>>>>
>>>> If you are joining, Spark Streaming actually does offer windowed join
>>>> out of the box. We couldn't use this though as our event stream can grow
>>>> "out-of-sync", so we had to implement something on top of Storm. If your
>>>> event streams don't become out of sync, you may find the built-in join in
>>>> Spark Streaming useful. Storm also has a join keyword but its semantics are
>>>> different.
>>>>
>>>>
>>>> > Also, what do you mean by "No Back Pressure" ?
>>>>
>>>> So when a topology is overloaded, Storm is designed so that it will
>>>> stop reading from the source. Spark on the other hand, will keep reading
>>>> from the source and spilling it internally. This maybe fine, in fairness,
>>>> but it does mean you have to worry about the persistent store usage in the
>>>> processing cluster, whereas with Storm you don't have to worry because the
>>>> messages just remain in the data store.
>>>>
>>>> Spark came up with the idea of rate limiting, but I don't feel this is
>>>> as nice as back pressure because it's very difficult to tune it such that
>>>> you don't cap the cluster's processing power but yet so that it will
>>>> prevent the persistent storage to get used up.
>>>>
>>>>
>>>> On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast <
>>>> sparkenthusi...@yahoo.in> wrote:
>>>>
>>>>> When you say Storm, did you mean Storm with Trident or Storm?
>>>>>
>>>>> My use case does not have simple transformation. There are complex
>>>>> events that need to be generated by joining the incoming event stream.
>>>>>
>>>>> Also, what do you mean by "No Back PRessure

Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-17 Thread ayan guha
Try to grant read execute access through sentry.
On 18 Jun 2015 05:47, "Nitin kak"  wrote:

> I am trying to run a hive query from Spark code using HiveContext object.
> It was running fine earlier but since the Apache Sentry has been set
> installed the process is failing with this exception :
>
> *org.apache.hadoop.security.AccessControlException: Permission denied:
> user=kakn, access=READ_EXECUTE,
> inode="/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in":hive:hive:drwxrwx--t*
>
> I have pasted the full stack trace at the end of this post. My username
> "kakn" is a registered user with Sentry. I know that Spark takes all the
> configurations from hive-site.xml to execute the hql, so I added a few
> Sentry specific properties but seem to have no effect. I have attached the
> hive-site.xml
>
> **
> *hive.security.authorization.task.factory*
> *
> org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl*
> *  *
> *  *
> *hive.metastore.pre.event.listeners*
> *
> org.apache.sentry.binding.metastore.MetastoreAuthzBinding*
> *list of comma seperated listeners for metastore
> events.*
> *  *
> *  *
> *hive.warehouse.subdir.inherit.perms*
> *true*
> * *
>
>
>
>
> *org.apache.hadoop.security.AccessControlException: Permission denied:
> user=kakn, access=READ_EXECUTE,
> inode="/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in":hive:hive:drwxrwx--t*
> * at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
> * at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
> * at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
> * at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
> * at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
> * at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
> * at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
> * at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
> * at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
> * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
> * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
> * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
> * at java.security.AccessController.doPrivileged(Native Method)*
> * at javax.security.auth.Subject.doAs(Subject.java:415)*
> * at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
> * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*
>
> * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
> * at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
> * at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
> * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
> * at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
> * at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
> * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
> * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
> * at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)*
> * at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)*
> * at
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:716)*
> * at
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)*
> * at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)*
> * at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:712)*
> * at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.s

Re: Pyspark RDD search

2015-06-18 Thread ayan guha
You can do cross join and filter.
On 18 Jun 2015 14:17, "bhavyateja"  wrote:

> I have an RDD which is list of list
> And another RDD which is list of pairs
> No duplicates in inner list of first RDD and
> No duplicates in the pairs from second rdd
> I am trying to check if any pair of second RDD is present in the any list
> of
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-RDD-search-tp23386.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Best way to randomly distribute elements

2015-06-18 Thread ayan guha
how about generating the key using some 1-way hashing like md5?

On Thu, Jun 18, 2015 at 9:59 PM, Guillaume Pitel  wrote:

>
> I think you can randomly reshuffle your elements just by emitting a random
> key (mapping a PairRdd's key triggers a reshuffle IIRC)
>
> yourrdd.map{ x => (rand(), x)}
>
> There is obiously a risk that rand() will give same sequence of numbers in
> each partition, so you may need to use mapPartitionsWithIndex first and
> seed your rand with the partition id (or compute your rand from a seed
> based on x).
>
> Guillaume
>
> Hello,
>
> In the context of a machine learning algorithm, I need to be able to
> randomly distribute the elements of a large RDD across partitions (i.e.,
> essentially assign each element to a random partition). How could I achieve
> this? I have tried to call repartition() with the current number of
> partitions - but it seems to me that this moves only some of the elements,
> and in a deterministic way.
>
> I know this will be an expensive operation but I only need to perform it
> every once in a while.
>
> Thanks a lot!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> --
>[image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>



-- 
Best Regards,
Ayan Guha


Re: Spark 1.4 on HortonWork HDP 2.2

2015-06-19 Thread ayan guha
what problem are you facing? are you trying to build it yurself or
gettingpre-built version?

On Fri, Jun 19, 2015 at 10:22 PM, Ashish Soni  wrote:

> Hi ,
>
> Is any one able to install Spark 1.4 on HDP 2.2 , Please let me know how
> can i do the same ?
>
> Ashish
>



-- 
Best Regards,
Ayan Guha


Re: Spark 1.4 on HortonWork HDP 2.2

2015-06-19 Thread ayan guha
I think you can get spark 1.4 pre built with hadoop 2.6 (as that what hdp
2.2 provides) and just start using it

On Fri, Jun 19, 2015 at 10:28 PM, Ashish Soni  wrote:

> I do not where to start  as Spark 1.2 comes bundled with HDP2.2 but i want
> to use 1.4 and i do not know how to update it to 1.4
>
> Ashish
>
> On Fri, Jun 19, 2015 at 8:26 AM, ayan guha  wrote:
>
>> what problem are you facing? are you trying to build it yurself or
>> gettingpre-built version?
>>
>> On Fri, Jun 19, 2015 at 10:22 PM, Ashish Soni 
>> wrote:
>>
>>> Hi ,
>>>
>>> Is any one able to install Spark 1.4 on HDP 2.2 , Please let me know how
>>> can i do the same ?
>>>
>>> Ashish
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: JavaDStream read and write rdbms

2015-06-22 Thread ayan guha
Spark docs has addresses this pretty well. Look for patterns of use
foreachRDD.
On 22 Jun 2015 17:09, "Manohar753"  wrote:

>
> Hi Team,
>
> How to  split and put the red JavaDStream in to mysql in java.
>
> any existing api in sark 1.3/1.4.
> team can you please share the code snippet if any body have it.
>
> Thanks,
> Manohar
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/JavaDStream-String-read-and-write-rdbms-tp23423.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


  1   2   3   4   5   6   7   8   >