Re: Edge AI with Spark

2020-09-24 Thread ayan guha
es running Android/iOS? > > > > > > Best regards, > > > Marco Sassarini > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Marco SassariniArtificial Intelligence Department* > > > > > > > office: +39 0434 562 978 > > > > www.overit.it > > > > > > > > > > > > > > > > > > > -- Best Regards, Ayan Guha

https://issues.apache.org/jira/browse/SPARK-18381

2020-09-24 Thread ayan guha
Anyone aware of any workaround for https://issues.apache.org/jira/browse/SPARK-18381 Other than upgrade to Spark 3 I mean,,, -- Best Regards, Ayan Guha

Re: Scala vs Python for ETL with Spark

2020-10-10 Thread ayan guha
purposes. >>>> >>>> These are my understanding but they are not facts so I would like to >>>> get some informed views on this if I can? >>>> >>>> Many thanks, >>>> >>>> Mich >>>> >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>> -- Best Regards, Ayan Guha

Re: Scala vs Python for ETL with Spark

2020-10-11 Thread ayan guha
has folks that can do python seriously why then spark in the first > place. You can do workflow on your own, streaming or batch or what ever you > want. > I would not do anything else aside from python, but that is me. > > On Sat, Oct 10, 2020, 9:42 PM ayan guha wrote: > >>

Re: Count distinct and driver memory

2020-10-19 Thread ayan guha
> > > -- > > nicolas paris > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- > nicolas paris > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Why spark-submit works with package not with jar

2020-10-20 Thread ayan guha
me/hduser/jars/ddhybrid.jar >>> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6* >>> >>> >>> I have read the write-ups about packages searching the maven >>> libraries etc. Not convinced why using the package should make so much >>> difference between a failure and success. In other words, when to use a >>> package rather than a jar. >>> >>> >>> Any ideas will be appreciated. >>> >>> >>> Thanks >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >> -- Best Regards, Ayan Guha

Re: Apache Spark Connector for SQL Server and Azure SQL

2020-10-26 Thread ayan guha
> www.vattenfall.se > > Please consider the environment before printing this e-mail > > Confidentiality: C2 - Internal > -- Best Regards, Ayan Guha

Re: How to apply ranger policies on Spark

2020-11-24 Thread ayan guha
xpected when we use hive cli and beeline. But when we access those hive > tables using spark-shell or spark-submit it does not work. > > Any suggestions to make Ranger work with Spark? > > > Regards > > Joyan > > -- Best Regards, Ayan Guha

Re: Rdd - zip with index

2021-03-23 Thread ayan guha
gmail.com> wrote: >> >> >> >> Hi, Mohammed >> >> I think that the reason that only one executor is running and have >> single partition is because you have single file that might be read/loaded >> into memory. >> >> >> >> In order to achieve better parallelism I’d suggest to split the csv >> file. >> >> >> >> -- Best Regards, Ayan Guha

Re: Rdd - zip with index

2021-03-24 Thread ayan guha
Name (3),Company Registration No. (3),Proprietorship >>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor >>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company >>>> Registration No. (4),Proprietorship Category (4),Country Incorporated >>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) >>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator >>>> >>>> >>>> 10GB is not much of a big CSV file >>>> >>>> that will resolve the header anyway. >>>> >>>> >>>> Also how are you running the spark, in a local mode (single jvm) or >>>> other distributed modes (yarn, standalone) ? >>>> >>>> >>>> HTH >>>> >>> -- Best Regards, Ayan Guha

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
; > Thanks, > > Ankamma Rao B > -- Best Regards, Ayan Guha

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
> *From:* Sean Owen > *Sent:* Friday, April 9, 2021 6:11 PM > *To:* ayan guha > *Cc:* Rao Bandaru ; User > *Subject:* Re: [Spark SQL]:to calculate distance between four > coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk > dataframe >

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
; and I've seen good wins over, at least, a single-row UDF. > > On Fri, Apr 9, 2021 at 9:14 AM ayan guha wrote: > >> Hi Sean - absolutely open to suggestions. >> >> My impression was using spark native functions should provide similar >> perf as scala ones b

Re: Python level of knowledge for Spark and PySpark

2021-04-14 Thread ayan guha
I also know > Pandas and am also familiar with plotting routines like matplotlib. > > Warmest > > Ashok > -- Best Regards, Ayan Guha

Spark Streaming with Files

2021-04-23 Thread ayan guha
es on like T1, T2 etc folders. Also, lets assume files are written every 10 mins, but I want to process them every 4 hours. Can I use streaming method so that it can manage checkpoints on its own? Best - Ayan -- Best Regards, Ayan Guha

Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread ayan guha
n.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 5 May 2021 at 17:30, Gourav Sengupta < >> gourav.sengupta.develo...@gmail.com> wrote: >> >>> Hi, >>> >>> just thought of reaching out once again and seeking out your kind help >>> to find out what is the best way to stop SPARK streaming gracefully. Do we >>> still use the methods of creating a file as in SPARK 2.4.x which is several >>> years old method or do we have a better approach in SPARK 3.1? >>> >>> Regards, >>> Gourav Sengupta >>> >>> -- Forwarded message - >>> From: Gourav Sengupta >>> Date: Wed, Apr 21, 2021 at 10:06 AM >>> Subject: Graceful shutdown SPARK Structured Streaming >>> To: >>> >>> >>> Dear friends, >>> >>> is there any documentation available for gracefully stopping SPARK >>> Structured Streaming in 3.1.x? >>> >>> I am referring to articles which are 4 to 5 years old and was wondering >>> whether there is a better way available today to gracefully shutdown a >>> SPARK streaming job. >>> >>> Thanks a ton in advance for all your kind help. >>> >>> Regards, >>> Gourav Sengupta >>> >> -- Best Regards, Ayan Guha

Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-07 Thread ayan guha
gt;>> >>> We want to introduce the backup and restore from an API level. We are >>> thinking of doing this simply by adding backup() and restore() in >>> CatalogImpl, as ExternalCatalog already includes all the methods we need to >>> retrieve and recreate entities. We are wondering if there is any concern or >>> drawback of this approach. Please advise. >>> >>> Thank you in advance, >>> Tianchen >>> >> -- Best Regards, Ayan Guha

Re: Merge two dataframes

2021-05-19 Thread ayan guha
| 400| 800| >> >>> >> >>> | 500| 900| >> >>> >> >>> +-+-+ >> >>> >> >>> >> >>> -- >> >>> Raghavendra > > >> >>> >> >>> >> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep < >> kushagra94d...@gmail.com> wrote: >> >>>> >> >>>> Hi All, >> >>>> >> >>>> I have two dataframes >> >>>> >> >>>> df1 >> >>>> >> >>>> amount_6m >> >>>> 100 >> >>>> 200 >> >>>> 300 >> >>>> 400 >> >>>> 500 >> >>>> >> >>>> And a second data df2 below >> >>>> >> >>>> amount_9m >> >>>> 500 >> >>>> 600 >> >>>> 700 >> >>>> 800 >> >>>> 900 >> >>>> >> >>>> The number of rows is same in both dataframes. >> >>>> >> >>>> Can I merge the two dataframes to achieve below df >> >>>> >> >>>> df3 >> >>>> >> >>>> amount_6m | amount_9m >> >>>> 100 500 >> >>>> 200 600 >> >>>> 300 700 >> >>>> 400 800 >> >>>> 500 900 >> >>>> >> >>>> Thanks in advance >> >>>> >> >>>> Reg, >> >>>> Kushagra Deep >> >>>> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Best Regards, Ayan Guha

Re: PySpark Write File Container exited with a non-zero exit code 143

2021-05-19 Thread ayan guha
print('showing {}'.format(fname)) > > if ('\r' in lastcol): > > lastcol=lastcol.replace('\r','') > > df=df.withColumn(lastcol, > regexp_replace(col("{}\r".format(lastcol)), "[\r]", > "")).drop('{}\r'.format(lastcol)) > > > df.write.format('parquet').mode('overwrite').save("{}/{}".format(HDFS_OUT,fname_noext)) > > > > > > > > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task > 0.3 in stage 1.0 (TID 4, DataNode01.mydomain.com, executor 5): > ExecutorLostFailure (executor 5 exited caused by one of the running tasks) > Reason: Container marked as failed: > container_e331_1621375512548_0021_01_06 on host: > DataNode01.mydomain.com. Exit status: 143. Diagnostics: [2021-05-19 > 18:09:06.392]Container killed on request. Exit code is 143 > [2021-05-19 18:09:06.413]Container exited with a non-zero exit code 143. > [2021-05-19 18:09:06.414]Killed by external signal > > > > > > THANKS! CLAY > > > > -- Best Regards, Ayan Guha

Re: DF blank value fill

2021-05-21 Thread ayan guha
empty cells of all records with the same > combination of MainKey and SubKey with the respective value of other rows > with the same key combination? > > A certain value, if not null, of a col is guaranteed to be unique within > the df. If a col exists then there is at least one row with a not-null > value. > > > > I am using pyspark. > > > > Thanks for any hint, > > Best > > Meikel > -- Best Regards, Ayan Guha

Re: Spark-sql can replace Hive ?

2021-06-10 Thread ayan guha
e.? > > > > thanks > > > > > -- Best Regards, Ayan Guha

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread ayan guha
6.jar >>> >>> HTH >>> >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala wrote: >>> >>>> Hi All, >>>> >>>> I am using spark to ingest data from file to database Oracle table . >>>> For one of the fields , the value to be populated is generated from a >>>> function that is written in database . >>>> >>>> The input to the function is one of the fields of data frame >>>> >>>> I wanted to use spark.dbc.write to perform the operation, which >>>> generates the insert query at back end . >>>> >>>> For example : It can generate the insert query as : >>>> >>>> Insert into table values (?,?, getfunctionvalue(?) ) >>>> >>>> Please advise if it is possible in spark and if yes , how can it be >>>> done >>>> >>>> This is little urgent for me . So any help is appreciated >>>> >>>> Thanks >>>> Anshul >>>> >>> -- Best Regards, Ayan Guha

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
, col("parsed_value.price").alias("price")). \ >>>>>>>>> writeStream. \ >>>>>>>>> outputMode('append'). \ >>>>>>>>> option("truncate", "false"). \ >>>>>>>>> *foreachBatch(sendToSink). \* >>>>>>>>> trigger(processingTime='30 seconds'). \ >>>>>>>>> option('checkpointLocation', >>>>>>>>> checkpoint_path). \ >>>>>>>>> queryName(config['MDVariables']['topic']). \ >>>>>>>>> start() >>>>>>>>> print(result) >>>>>>>>> >>>>>>>>> except Exception as e: >>>>>>>>> print(f"""{e}, quitting""") >>>>>>>>> sys.exit(1) >>>>>>>>> >>>>>>>>> Inside that function say *sendToSink *you can get the df and >>>>>>>>> batchId >>>>>>>>> >>>>>>>>> def sendToSink(df, batchId): >>>>>>>>> if(len(df.take(1))) > 0: >>>>>>>>> print(f"""md batchId is {batchId}""") >>>>>>>>> df.show(100,False) >>>>>>>>> df. persist() >>>>>>>>> # write to BigQuery batch table >>>>>>>>> s.writeTableToBQ(df, "append", >>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>>>>>>>> df.unpersist() >>>>>>>>> print(f"""wrote to DB""") >>>>>>>>> else: >>>>>>>>> print("DataFrame md is empty") >>>>>>>>> >>>>>>>>> And you have created DF from the other topic newtopic >>>>>>>>> >>>>>>>>> def sendToControl(dfnewtopic, batchId): >>>>>>>>> if(len(dfnewtopic.take(1))) > 0: >>>>>>>>> .. >>>>>>>>> >>>>>>>>> Now you have two dataframe* df* and *dfnewtopic* in the same >>>>>>>>> session. Will you be able to join these two dataframes through common >>>>>>>>> key >>>>>>>>> value? >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>view my Linkedin profile >>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello! Sure thing! >>>>>>>>>> >>>>>>>>>> I'm reading them *separately*, both are apps written with Scala >>>>>>>>>> + Spark Structured Streaming. >>>>>>>>>> >>>>>>>>>> I feel like I missed some details on my original thread (sorry it >>>>>>>>>> was past 4 AM) and it was getting frustrating >>>>>>>>>> Please let me try to clarify some points: >>>>>>>>>> >>>>>>>>>> *Transactions Created Consumer* >>>>>>>>>> --- >>>>>>>>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>>>>>>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>>>>>>>> (Transactions) >>>>>>>>>> --- >>>>>>>>>> >>>>>>>>>> *Transactions Processed Consumer* >>>>>>>>>> - >>>>>>>>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>>>>>>>> - 2) Selects the >>>>>>>>>> Ids >>>>>>>>>> - >>>>>>>>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ >>>>>>>>>> the matching ids that have status 'created (let's call it "b") >>>>>>>>>> - 4) Performs an >>>>>>>>>> intersection between "a" and "b" resulting in a >>>>>>>>>> "b_that_needs_sinking" (but >>>>>>>>>> now there's some "b_leftovers" that were out of the intersection) >>>>>>>>>> 5) Sinks >>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>>>>>>>> unprocessed (not persisted) >>>>>>>>>> 6) However, >>>>>>>>>> those "b_leftovers" would, ultimately, be processed at some point >>>>>>>>>> (even if >>>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are >>>>>>>>>> pushed >>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then processed by >>>>>>>>>> that >>>>>>>>>> first consumer >>>>>>>>>> >>>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess >>>>>>>>>> those "b_leftovers" *without *having to restart the app >>>>>>>>>> Does that make sense? >>>>>>>>>> >>>>>>>>>> PS: It doesn't necessarily have to be real streaming, if >>>>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it >>>>>>>>>> would >>>>>>>>>> technically work (although I keep hearing it's not advisable) >>>>>>>>>> >>>>>>>>>> Thank you so much! >>>>>>>>>> >>>>>>>>>> Kind regards >>>>>>>>>> >>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Can you please clarify if you are reading these two topics >>>>>>>>>>> separately or within the same scala or python script in Spark >>>>>>>>>>> Structured >>>>>>>>>>> Streaming? >>>>>>>>>>> >>>>>>>>>>> HTH >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>view my Linkedin profile >>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>> other >>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>> content is >>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any >>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira < >>>>>>>>>>> bruno.ar...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello guys, >>>>>>>>>>>> >>>>>>>>>>>> I've been struggling with this for some days now, without >>>>>>>>>>>> success, so I would highly appreciate any enlightenment. The >>>>>>>>>>>> simplified >>>>>>>>>>>> scenario is the following: >>>>>>>>>>>> >>>>>>>>>>>>- I've got 2 topics in Kafka (it's already like that in >>>>>>>>>>>>production, can't change it) >>>>>>>>>>>> - transactions-created, >>>>>>>>>>>> - transaction-processed >>>>>>>>>>>>- Even though the schema is not exactly the same, they all >>>>>>>>>>>>share a correlation_id, which is their "transaction_id" >>>>>>>>>>>> >>>>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic, >>>>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them >>>>>>>>>>>> w/ Spark >>>>>>>>>>>> Structured Streaming, btw >>>>>>>>>>>> >>>>>>>>>>>> So far so good, the caveat here is: >>>>>>>>>>>> >>>>>>>>>>>> - I cannot write a given "*processed" *transaction unless >>>>>>>>>>>> there is an entry of that same transaction with the status " >>>>>>>>>>>> *created*". >>>>>>>>>>>> >>>>>>>>>>>> - There is *no* guarantee that any transactions in the topic >>>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id) >>>>>>>>>>>> in the "transaction-*created*" at the moment the messages are >>>>>>>>>>>> fetched. >>>>>>>>>>>> >>>>>>>>>>>> So the workflow so far is: >>>>>>>>>>>> - Msgs from the "transaction-created" just get synced to >>>>>>>>>>>> postgres, no questions asked >>>>>>>>>>>> >>>>>>>>>>>> - As for the "transaction-processed", it goes as follows: >>>>>>>>>>>> >>>>>>>>>>>>- a) Messages are fetched from the Kafka topic >>>>>>>>>>>>- b) Select the transaction_id of those... >>>>>>>>>>>>- c) Fetch all the rows w/ the corresponding id from a >>>>>>>>>>>>Postgres table AND that have the status "CREATED" >>>>>>>>>>>>- d) Then, a pretty much do a intersection between the two >>>>>>>>>>>>datasets, and sink only on "processed" ones that have with step >>>>>>>>>>>> c >>>>>>>>>>>>- e) Persist the resulting dataset >>>>>>>>>>>> >>>>>>>>>>>> But the rows (from the 'processed') that were not part of the >>>>>>>>>>>> intersection get lost afterwards... >>>>>>>>>>>> >>>>>>>>>>>> So my question is: >>>>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT >>>>>>>>>>>> restarting the app? >>>>>>>>>>>> - For this scenario, should I fall back to Spark Streaming, >>>>>>>>>>>> instead of Structured Streaming? >>>>>>>>>>>> >>>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and >>>>>>>>>>>> managed to commit only the ones in the microbatches that were fully >>>>>>>>>>>> successful (still failed to find a way to "poll" for the >>>>>>>>>>>> uncommitted ones >>>>>>>>>>>> without restarting, though). >>>>>>>>>>>> >>>>>>>>>>>> Thank you very much in advance! >>>>>>>>>>>> >>>>>>>>>>>> -- Best Regards, Ayan Guha

Re: Naming files while saving a Dataframe

2021-07-16 Thread ayan guha
t which job wrote > which file. Maybe provide a 'prefix' to the file names. I was wondering if > there's any 'option' that allows us to do this. Googling didn't come up > with any solution so thought of asking the Spark experts on this mailing > list. > > Thanks in advance. > -- Best Regards, Ayan Guha

Re: Naming files while saving a Dataframe

2021-07-17 Thread ayan guha
the same directory is that the data is > partitioned by 'day' (mmdd) but the job runs hourly. Maybe the only way > to do this is to create an hourly partition (/mmdd/hh). Is that the > only way to solve this? > > On Fri, Jul 16, 2021 at 5:45 PM ayan guha wrote: >

Re: What happens when a partition that holds data under a task fails

2022-01-23 Thread ayan guha
ssed, but as in the first pass, the same partition >>>>>> is >>>>>> processed. >>>>>> >>>>>> On Fri, Jan 21, 2022 at 12:00 PM Siddhesh Kalgaonkar < >>>>>> kalgaonkarsiddh...@gmail.com> wrote: >>>>>> >>>>>>> Hello team, >>>>>>> >>>>>>> I am aware that in case of memory issues when a task fails, it will >>>>>>> try to restart 4 times since it is a default number and if it still >>>>>>> fails >>>>>>> then it will cause the entire job to fail. >>>>>>> >>>>>>> But suppose if I am reading a file that is distributed across nodes >>>>>>> in partitions. So, what will happen if a partition fails that holds some >>>>>>> data? Will it re-read the entire file and get that specific subset of >>>>>>> data >>>>>>> since the driver has the complete information? or will it copy the data >>>>>>> to >>>>>>> the other working nodes or tasks and try to run it? >>>>>>> >>>>>> -- Best Regards, Ayan Guha

Re: How to delete the record

2022-01-27 Thread ayan guha
gt;>>>> >>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, wrote: >>>>> >>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No >>>>>> need to manage it manually or use a DBMS. These formats allow deletes, >>>>>> upserts, etc of data, using Spark, on cloud storage. >>>>>> >>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Where ETL data is stored? >>>>>>> >>>>>>> >>>>>>> >>>>>>> *But now the main problem is when the record at the source is >>>>>>> deleted, it should be deleted in my final transformed record too.* >>>>>>> >>>>>>> >>>>>>> If your final sync (storage) is data warehouse, it should be soft >>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp). >>>>>>> >>>>>>> >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> >>>>>>>view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal >>>>>>> wrote: >>>>>>> >>>>>>>> I am using Spark incremental approach for bringing the latest data >>>>>>>> everyday. Everything works fine. >>>>>>>> >>>>>>>> But now the main problem is when the record at the source is >>>>>>>> deleted, it should be deleted in my final transformed record too. >>>>>>>> >>>>>>>> How do I capture such changes and change my table too ? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Sid >>>>>>>> >>>>>>>> -- Best Regards, Ayan Guha

Re: add an auto_increment column

2022-02-06 Thread ayan guha
t; > Thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: add an auto_increment column

2022-02-07 Thread ayan guha
gt; > >> Monotonically_increasing_id() will give the same functionality > >> > >> On Mon, 7 Feb, 2022, 6:57 am , wrote: > >> > >>> For a dataframe object, how to add a column who is auto_increment > >>> like > >>> mysql's behavior? > >>> > >>> Thank you. > >>> > >>> > >> > > - > >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Does spark support something like the bind function in R?

2022-02-08 Thread ayan guha
; *##6 2.883 55 3300* > > ``` > > > > I wonder if this is just a wrapper around join? If so it is probably not > going to help me out. > > > > Also I would prefer to work in python > > > > Any thoughts? > > > > Kind regards > > > > Andy > > > > > -- Best Regards, Ayan Guha

Re: Cast int to string not possible?

2022-02-17 Thread ayan guha
--- >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> > >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Best Regards, Ayan Guha

Re: Question about spark.sql min_by

2022-02-21 Thread ayan guha
s slow (maybe >> because of temp table creation ?): >> >> df.createOrReplaceTempView("table") >> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId, >> min(price) from table group by productId") >> >> Is there a way I can rely on min_by directly in groupby ? >> Is there some code missing in pyspark wrapper to make min_by visible >> somehow ? >> >> Thank you in advance for your help. >> >> Cheers >> David >> > -- Best Regards, Ayan Guha

Re: Decompress Gzip files from EventHub with Structured Streaming

2022-03-08 Thread ayan guha
3. Should I use Autoloader or just simply stream data into Databricks >using Event Hubs? > > I am especially curious about the trade-offs and the best way forward. I > don't have massive amounts of data. > > Thank you very much in advance! > > Best wishes, > Maurizio Vancho Argall > > -- Best Regards, Ayan Guha

Re: pivoting panda dataframe

2022-03-16 Thread ayan guha
;>>> >>> df >>>>>A B >>>>> 0 0 1 >>>>> 1 1 2 >>>>> 2 2 3 >>>>> >>>>> >>> def square(x) -> ps.Series[np.int32]: >>>>> ... return x ** 2 >>>>> >>> df.transform(square) >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> You can omit the type hint and let pandas-on-Spark infer its type. >>>>> >>>>> >>> df.transform(lambda x: x ** 2) >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> For multi-index columns: >>>>> >>>>> >>> df.columns = [('X', 'A'), ('X', 'B')] >>>>> >>> df.transform(square) # doctest: +NORMALIZE_WHITESPACE >>>>>X >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> >>> (df * -1).transform(abs) # doctest: +NORMALIZE_WHITESPACE >>>>>X >>>>>A B >>>>> 0 0 1 >>>>> 1 1 2 >>>>> 2 2 3 >>>>> >>>>> You can also specify extra arguments. >>>>> >>>>> >>> def calculation(x, y, z) -> ps.Series[int]: >>>>> ... return x ** y + z >>>>> >>> df.transform(calculation, y=10, z=20) # doctest: >>>>> >>> +NORMALIZE_WHITESPACE >>>>> X >>>>> A B >>>>> 020 21 >>>>> 121 1044 >>>>> 2 1044 59069File: /opt/spark/python/pyspark/pandas/frame.pyType: >>>>>method >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson >>>> >: >>>>> >>>>>> Hi Bjorn >>>>>> >>>>>> >>>>>> >>>>>> I have been looking for spark transform for a while. Can you send me >>>>>> a link to the pyspark function? >>>>>> >>>>>> >>>>>> >>>>>> I assume pandas transform is not really an option. I think it will >>>>>> try to pull the entire dataframe into the drivers memory. >>>>>> >>>>>> >>>>>> >>>>>> Kind regards >>>>>> >>>>>> >>>>>> >>>>>> Andy >>>>>> >>>>>> >>>>>> >>>>>> p.s. My real problem is that spark does not allow you to bind >>>>>> columns. You can use union() to bind rows. I could get the equivalent of >>>>>> cbind() using union().transform() >>>>>> >>>>>> >>>>>> >>>>>> *From: *Bjørn Jørgensen >>>>>> *Date: *Tuesday, March 15, 2022 at 10:37 AM >>>>>> *To: *Mich Talebzadeh >>>>>> *Cc: *"user @spark" >>>>>> *Subject: *Re: pivoting panda dataframe >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html >>>>>> we >>>>>> have that transpose in pandas api for spark to. >>>>>> >>>>>> >>>>>> >>>>>> You also have stack() and multilevel >>>>>> https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com>: >>>>>> >>>>>> >>>>>> hi, >>>>>> >>>>>> >>>>>> >>>>>> Is it possible to pivot a panda dataframe by making the row column >>>>>> heading? >>>>>> >>>>>> >>>>>> >>>>>> thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> [image: Image removed by sender.] view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Bjørn Jørgensen >>>>>> Vestre Aspehaug 4 >>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>, >>>>>> 6010 Ålesund >>>>>> Norge >>>>>> >>>>>> +47 480 94 297 >>>>>> >>>>> >>>>> >>>>> -- >>>>> Bjørn Jørgensen >>>>> Vestre Aspehaug 4 >>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>, >>>>> 6010 Ålesund >>>>> Norge >>>>> >>>>> +47 480 94 297 >>>>> >>>> -- Best Regards, Ayan Guha

Re: how to change data type for columns of dataframe

2022-04-01 Thread ayan guha
columns in this dataframe? > > For example, change the column type from Int to Float. > > Thanks. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Question about bucketing and custom partitioners

2022-04-11 Thread ayan guha
ataframe, is it possible to apply custom > partitioner to a dataframe ? > Is it possible to repartition the dataframe with a narrow > transformation like what could be done with RDD ? > Is there some sort of dataframe developer API ? Do you have any > pointers on this ? > > Thanks ! > > David > -- Best Regards, Ayan Guha

Re: reading each JSON file from dataframe...

2022-07-12 Thread ayan guha
-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> >> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> >> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-+---+---+ >> >> I would like to read each row from `file_path` and write the result to >> another dataframe containing `entity_id`, `other_useful_id`, >> `json_content`, `file_path`. >> Assume that I already have the required HDFS url libraries in my >> classpath. >> >> Please advice, >> Muthu >> >> >> >> > -- Best Regards, Ayan Guha

Re: very simple UI on webpage to display x/y plots+histogram of data stored in hive

2022-07-18 Thread ayan guha
m of data > stored in hive and retrieved with spark into pandas… > > Many thanks for your suggestion! > > > On 18 Jul 2022, at 15:08, Sean Owen wrote: > > You pull your data via Spark to a pandas DF and do whatever you want > > > -- Best Regards, Ayan Guha

Re: Salting technique doubt

2022-07-31 Thread ayan guha
video with the below understanding: >>>>> >>>>> So, using the salting technique we can actually change the joining >>>>> column values by appending some random number in a specified range. >>>>> >>>>> So, suppose I have these two values in a partition of two different >>>>> tables: >>>>> >>>>> Table A: >>>>> Partition1: >>>>> x >>>>> . >>>>> . >>>>> . >>>>> x >>>>> >>>>> Table B: >>>>> Partition1: >>>>> x >>>>> . >>>>> . >>>>> . >>>>> x >>>>> >>>>> After Salting it would be something like the below: >>>>> >>>>> Table A: >>>>> Partition1: >>>>> x_1 >>>>> >>>>> Partition 2: >>>>> x_2 >>>>> >>>>> Table B: >>>>> Partition1: >>>>> x_3 >>>>> >>>>> Partition 2: >>>>> x_8 >>>>> >>>>> Now, when I inner join these two tables after salting in order to >>>>> avoid data skewness problems, I won't get a match since the keys are >>>>> different after applying salting techniques. >>>>> >>>>> So how does this resolves the data skewness issue or if there is some >>>>> understanding gap? >>>>> >>>>> Could anyone help me in layman's terms? >>>>> >>>>> TIA, >>>>> Sid >>>>> >>>> -- Best Regards, Ayan Guha

Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-02 Thread ayan guha
l.catalog.spark_catalog", >>> "org.apache.spark.sql.delta.catalog.DeltaCatalog") >>> .config('spark.ui.port', '4050') >>> .getOrCreate() >>> >>> ) >>> from delta import * >>> >>> string_20210609 = '''worked_date,worker_id,delete_flag,hours_worked >>> 2021-06-09,1001,Y,7 >>> 2021-06-09,1002,Y,3.75 >>> 2021-06-09,1003,Y,7.5 >>> 2021-06-09,1004,Y,6.25''' >>> >>> rdd_20210609 = spark.sparkContext.parallelize(string_20210609.split('\n')) >>> >>> # FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO >>> BROWSE THEM >>> OUTPUT_DELTA_PATH = './output/delta/' >>> >>> spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE') >>> >>> spark.sql(''' >>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS( >>> worked_date date >>> , worker_id int >>> , delete_flag string >>> , hours_worked double >>> ) USING DELTA >>> >>> >>> PARTITIONED BY (worked_date) >>> LOCATION "{0}" >>> '''.format(OUTPUT_DELTA_PATH) >>> ) >>> >>> *Error Message:* >>> >>> AnalysisException Traceback (most recent call >>> last) in 4 spark.sql('CREATE >>> DATABASE IF NOT EXISTS EXERCISE') 5 > 6 spark.sql(''' 7 >>> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS( 8 >>> worked_date date >>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\session.py in >>> sql(self, sqlQuery)647 [Row(f1=1, f2=u'row1'), Row(f1=2, >>> f2=u'row2'), Row(f1=3, f2=u'row3')]648 """--> 649 >>> return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)650 >>> 651 @since(2.0) >>> \Users\kyjan\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py >>> in __call__(self, *args) 13021303 answer = >>> self.gateway_client.send_command(command)-> 1304 return_value = >>> get_return_value( 1305 answer, self.gateway_client, >>> self.target_id, self.name) 1306 >>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in >>> deco(*a, **kw)132 # Hide where the exception came from >>> that shows a non-Pythonic133 # JVM exception >>> message.--> 134 raise_from(converted)135 >>> else:136 raise >>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in >>> raise_from(e) >>> AnalysisException: Cannot create table ('`EXERCISE`.`WORKED_HOURS`'). The >>> associated location ('output/delta') is not empty.; >>> >>> >>> -- >>> Best Wishes, >>> Kumba Janga >>> >>> "The only way of finding the limits of the possible is by going beyond >>> them into the impossible" >>> -Arthur C. Clarke >>> >> > > -- > Best Wishes, > Kumba Janga > > "The only way of finding the limits of the possible is by going beyond > them into the impossible" > -Arthur C. Clarke > -- Best Regards, Ayan Guha

Re: log transfering into hadoop/spark

2022-08-02 Thread ayan guha
rce tool to transfer webserver logs into > hdfs/spark? > > thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread ayan guha
ons need less *executors* for a GPU stage than for a > CPU stage. We are using dynamic allocation with stage level scheduling, and > Spark tries to maximize the number of executors also during the GPU stage, > causing a bit of resources chaos in the cluster. This forces us to use a > lower value for 'maxExecutors' in the first place, at the cost of the CPU > stages performance. Or try to solve this in the Kubernets scheduler level, > which is not straightforward and doesn't feel like the right way to go. > > Is there a way to effectively use less executors in Stage Level > Scheduling? The API does not seem to include such an option, but maybe > there is some more advanced workaround? > > Thanks, > Shay > > > > > > > > -- Best Regards, Ayan Guha

Re: Profiling data quality with Spark

2022-12-27 Thread ayan guha
no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Tue, 27 Dec 2022 at 19:25, rajat kumar >>> wrote: >>> >>>> Hi Folks >>>> Hoping you are doing well, I want to implement data quality to detect >>>> issues in data in advance. I have heard about few frameworks like GE/Deequ. >>>> Can anyone pls suggest which one is good and how do I get started on it? >>>> >>>> Regards >>>> Rajat >>>> >>> -- Best Regards, Ayan Guha

Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-05 Thread ayan guha
rmanent view table in the database.How shall create permanent view >> using Pyspark code. Please do reply. >> >> *Error Message::* >> *Exception has occurred: Analysis Exception* >> Not allowed to create a permanent view `default`.`TEMP1` by referencing a >> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP >> VIEW >> >> >> Regards, >> Vajiha >> Research Analyst >> MW Solutions >> > -- Best Regards, Ayan Guha

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread ayan guha
a limit? I have search for >>>>>> documentation on such a limit but could not find any. >>>>>> >>>>>> I truly appreciate your help Mich and team, >>>>>> Marco. >>>>>> >>>>>> >>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Have you thought of using windowing function >>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to >>>>>>> achieve this? >>>>>>> >>>>>>> Effectively all your information is in the orders table. >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Lead Solutions Architect/Engineering Lead >>>>>>> Palantir Technologies Limited >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>>view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini < >>>>>>> marco.costant...@rocketfncl.com> wrote: >>>>>>> >>>>>>>> I have two tables: {users, orders}. In this example, let's say that >>>>>>>> for each 1 User in the users table, there are 10 Orders in the >>>>>>>> orders >>>>>>>> table. >>>>>>>> >>>>>>>> I have to use pyspark to generate a statement of Orders for each >>>>>>>> User. So, a single user will need his/her own list of Orders. >>>>>>>> Additionally, >>>>>>>> I need to send this statement to the real-world user via email (for >>>>>>>> example). >>>>>>>> >>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users >>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the >>>>>>>> email >>>>>>>> sending individually. However, I now do not know the best way to get >>>>>>>> each >>>>>>>> User's Orders. >>>>>>>> >>>>>>>> I will soon try the following (pseudo-code): >>>>>>>> >>>>>>>> ``` >>>>>>>> users_df = >>>>>>>> orders_df = >>>>>>>> >>>>>>>> #this is poorly named for max understandability in this context >>>>>>>> def foreach_function(row): >>>>>>>> user_id = row.user_id >>>>>>>> user_orders_df = orders_df.select(f'user_id = {user_id}') >>>>>>>> >>>>>>>> #here, I'd get any User info from 'row' >>>>>>>> #then, I'd convert all 'user_orders' to JSON >>>>>>>> #then, I'd prepare the email and send it >>>>>>>> >>>>>>>> users_df.foreach(foreach_function) >>>>>>>> ``` >>>>>>>> >>>>>>>> It is my understanding that if I do my user-specific work in the >>>>>>>> foreach function, I will capitalize on Spark's scalability when doing >>>>>>>> that >>>>>>>> work. However, I am worried of two things: >>>>>>>> >>>>>>>> If I take all Orders up front... >>>>>>>> >>>>>>>> Will that work? >>>>>>>> Will I be taking too much? Will I be taking Orders on partitions >>>>>>>> who won't handle them (different User). >>>>>>>> >>>>>>>> If I create the orders_df (filtered) within the foreach function... >>>>>>>> >>>>>>>> Will it work? >>>>>>>> Will that be too much IO to DB? >>>>>>>> >>>>>>>> The question ultimately is: How can I achieve this goal efficiently? >>>>>>>> >>>>>>>> I have not yet tried anything here. I am doing so as we speak, but >>>>>>>> am suffering from choice-paralysis. >>>>>>>> >>>>>>>> Please and thank you. >>>>>>>> >>>>>>> -- Best Regards, Ayan Guha

[no subject]

2023-08-23 Thread ayan guha
Unsubscribe-- Best Regards, Ayan Guha

Re: Spark SQL on large number of columns

2015-05-19 Thread ayan guha
t;> >>> >>> >>> Regards, >>> Madhukara Phatak >>> http://datamantra.io/ >>> >>> On Tue, May 19, 2015 at 4:35 PM, madhu phatak >>> wrote: >>> >>>> Hi, >>>> I am using spark 1.3.1 >&g

Re: Spark Job not using all nodes in cluster

2015-05-19 Thread ayan guha
What is your spark env file says? Are you setting number of executors in spark context? On 20 May 2015 13:16, "Shailesh Birari" wrote: > Hi, > > I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB > of RAM. > I have around 600,000+ Json files on HDFS. Each file is small aro

Re: Hive on Spark VS Spark SQL

2015-05-20 Thread ayan guha
And if I am not wrong, spark SQL api is intended to move closer to SQL standards. I feel its a clever decision on spark's part to keep both APIs operational. These short term confusions worth the long term benefits. On 20 May 2015 17:19, "Sean Owen" wrote: > I don't think that's quite the differe

Re: Spark 1.3.1 - SQL Issues

2015-05-20 Thread ayan guha
Thanks a bunch On 21 May 2015 07:11, "Davies Liu" wrote: > The docs had been updated. > > You should convert the DataFrame to RDD by `df.rdd` > > On Mon, Apr 20, 2015 at 5:23 AM, ayan guha wrote: > > Hi > > Just upgraded to Spark 1.3.1. > > > &

Re: Partitioning of Dataframes

2015-05-22 Thread ayan guha
DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, "Karlson" wrote: > Hi, > > is there any

Re: partitioning after extracting from a hive table?

2015-05-22 Thread ayan guha
I guess not. Spark partitions correspond to number of splits. On 23 May 2015 00:02, "Cesar Flores" wrote: > > I have a table in a Hive database partitioning by date. I notice that when > I query this table using HiveContext the created data frame has an specific > number of partitions. > > > Do t

Re: SparkSQL query plan to Stage wise breakdown

2015-05-23 Thread ayan guha
I think you are looking for Df.explain On 23 May 2015 12:51, "Pramod Biligiri" wrote: > Hi, > Is there an easy way to see how a SparkSQL query plan maps to different > stages of the generated Spark job? The WebUI is entirely in terms of RDD > stages and I'm having a hard time mapping it back to m

Re: DataFrame groupBy vs RDD groupBy

2015-05-23 Thread ayan guha
Hi Michael This is great info. I am currently using repartitionandsort function to achieve the same. Is this the recommended way till 1.3 or is there any better way? On 23 May 2015 07:38, "Michael Armbrust" wrote: > DataFrames have a lot more information about the data, so there is a whole > cla

Re: Using Spark like a search engine

2015-05-24 Thread ayan guha
Yes, spark will be useful for following areas of your application: 1. Running same function on every CV in parallel and score 2. Improve scoring function by better access to classification and clustering algorithms, within and beyond mllib. These are first benefits you can start with and then thin

Re: DataFrame. Conditional aggregation

2015-05-25 Thread ayan guha
Case when col2>100 then 1 else col2 end On 26 May 2015 00:25, "Masf" wrote: > Hi. > > In a dataframe, How can I execution a conditional sentence in a > aggregation. For example, Can I translate this SQL statement to DataFrame?: > > SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1) > FR

Re: Running Javascript from scala spark

2015-05-26 Thread ayan guha
Yes you are in right mailing list, for sure :) Regarding your question, I am sure you are well versed with how spark works. Essentially you can run any arbitrary function with map call and it will run in remote nodes. Hence you need to install any needed dependency in all nodes. You can also pass

Re: DataFrame. Conditional aggregation

2015-05-26 Thread ayan guha
t;).as("test") > ) > > How can I do it? > > Thanks > Regards. > Miguel. > > On Tue, May 26, 2015 at 12:35 AM, ayan guha wrote: > >> Case when col2>100 then 1 else col2 end >> On 26 May 2015 00:25, "Masf" wrote: >> >

Re: How to give multiple directories as input ?

2015-05-27 Thread ayan guha
What about /blah/*/blah/out*.avro? On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)" wrote: > I am doing that now. > Is there no other way ? > > On Wed, May 27, 2015 at 12:40 PM, Akhil Das > wrote: > >> How about creating two and union [ sc.union(first, second) ] them? >> >> Thanks >> Best Regards >> >> On

Re: How many executors can I acquire in standalone mode ?

2015-05-27 Thread ayan guha
You can request number of cores and amount of memory for each executor. On 27 May 2015 18:25, "canan chen" wrote: > Thanks Arush. > My scenario is that In standalone mode, if I have one worker, when I start > spark-shell, there will be one executor launched. But if I have 2 workers, > there will

Re: DataFrame. Conditional aggregation

2015-05-27 Thread ayan guha
else 0}} > > Although I'd like to know if it's possible to do it directly in the > aggregation inserting a lambda function or something else. > > Thanks > > Regards. > Miguel. > > > On Wed, May 27, 2015 at 1:06 AM, ayan guha wrote: > >> For this, I

Re: Where does partitioning and data loading happen?

2015-05-27 Thread ayan guha
ystem. While ColdLight > Solutions, LLC has taken every reasonable precaution to minimize this risk, > we cannot accept liability for any damage which you sustain as a result of > software viruses. You should perform your own virus checks before opening > the attachment. > -- Best Regards, Ayan Guha

Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread ayan guha
Yes, you are at right path. Only thing to remember is placing hive site XML to correct path so spark can talk to hive metastore. Best Ayan On 28 May 2015 10:53, "Sanjay Subramanian" wrote: > hey guys > > On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x > , there are abo

Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread ayan guha
Probably a naive question: can you try the same in hive CLI and see if your SQL is working? Looks like hive thing to me as spark is faithfully delegating the query to hive. On 29 May 2015 03:22, "Abhishek Tripathi" wrote: > Hi , > I'm using CDH5.4.0 quick start VM and tried to build Spark with H

Re: Batch aggregation by sliding window + join

2015-05-28 Thread ayan guha
Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Be

Re: Batch aggregation by sliding window + join

2015-05-29 Thread ayan guha
gt; connection to this repartition of daily block. > > > On 29 May 2015 at 01:51, ayan guha wrote: > >> Which version of spark? In 1.4 window queries will show up for these kind >> of scenarios. >> >> 1 thing I can suggest is keep daily aggregates materialised and

Re: Format RDD/SchemaRDD contents to screen?

2015-05-29 Thread ayan guha
Depending on your spark version, you can convert schemaRDD to a dataframe and then use .show() On 30 May 2015 10:33, "Minnow Noir" wrote: > I"m trying to debug query results inside spark-shell, but finding it > cumbersome to save to file and then use file system utils to explore the > results, an

Re: How Broadcast variable works

2015-05-29 Thread ayan guha
park Context broadcasts it again, will tasks see > the updated variable? > > Thanks. > > > > -- > bit1...@163.com > > -- Best Regards, Ayan Guha

Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-30 Thread ayan guha
I hope they will come up with1.4 before spark summit in mid June On 31 May 2015 10:07, "Joseph Bradley" wrote: > Spark 1.4 should be available next month, but I'm not sure about the exact > date. > Your interpretation of high lambda is reasonable. "High" lambda is really > data-dependent. > "lam

Re: Adding an indexed column

2015-05-31 Thread ayan guha
If you are on spark 1.3, use repartitionandSort followed by mappartition. In 1.4, window functions will be supported, it seems On 1 Jun 2015 04:10, "Ricardo Almeida" wrote: > That's great and how would you create an ordered index by partition (by > product in this example)? > > Assuming now a dat

Re: Spark stages very slow to complete

2015-06-01 Thread ayan guha
Would you mind posting the code? On 2 Jun 2015 00:53, "Karlson" wrote: > Hi, > > In all (pyspark) Spark jobs, that become somewhat more involved, I am > experiencing the issue that some stages take a very long time to complete > and sometimes don't at all. This clearly correlates with the size of

Re: spark sql - reading data from sql tables having space in column names

2015-06-02 Thread ayan guha
is able to handle such scenarios by enclosing column names in '[ ]' > - the recommended method from microsoft sql server. ( > https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java > - line no 319) > > Is there a way to handle this in spark sql? > > Thanks, > sachin > -- Best Regards, Ayan Guha

Re: Filter operation to return two RDDs at once.

2015-06-02 Thread ayan guha
Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, "ÐΞ€ρ@Ҝ (๏̯͡๏)" wrote: > I want to do this > > val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId > != NULL_VALUE) > > val

Re: Application is "always" in process when I check out logs of completed application

2015-06-02 Thread ayan guha
Have you done sc.stop() ? :) On 3 Jun 2015 14:05, "amghost" wrote: > I run spark application in spark standalone cluster with client deploy > mode. > I want to check out the logs of my finished application, but I always get > a > page telling me "Application history not found - Application xxx is

Re: Python Image Library and Spark

2015-06-03 Thread ayan guha
Try with large number of partition in parallelize. On 4 Jun 2015 06:28, "Justin Spargur" wrote: > Hi all, > > I'm playing around with manipulating images via Python and want to > utilize Spark for scalability. That said, I'm just learing Spark and my > Python is a bit rusty (been doing PHP c

Re: Saving calculation to single local file

2015-06-05 Thread ayan guha
$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > ~[spark-core_2.10-1.3.1.jar:1.3.1] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.10.5.jar:na] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > ~[scala-library-2.10.5.jar:na] > > > if this rdd.toLocalIterator.foreach(...) doesn't work, what is the better > solution? > > Best Regards > Marcos > > > -- Best Regards, Ayan Guha

Re: Saving calculation to single local file

2015-06-05 Thread ayan guha
Another option is merge partfiles after your app ends. On 5 Jun 2015 20:37, "Akhil Das" wrote: > you can simply do rdd.repartition(1).saveAsTextFile(...), it might not be > efficient if your output data is huge since one task will be doing the > whole writing. > > Thanks > Best Regards > > On Fri

Re: Managing spark processes via supervisord

2015-06-05 Thread ayan guha
>>> Currently I am using the cluster launch scripts which are working >>> greater, however, every time I reboot my VM or development environment I >>> need to re-launch the cluster. >>> >>> I am considering using supervisord to control all the processes (worker, >>> master, ect.. ) in order to have the cluster up an running after boot-up; >>> although I'd like to understand if it will cause more issues than it >>> solves. >>> >>> Thanks, Mike. >>> >>> >> > -- Best Regards, Ayan Guha

Re: columnar structure of RDDs from Parquet or ORC files

2015-06-08 Thread ayan guha
t; in two groups: *transformations *which are lazily evaluated and return a >> new RDD and *actions *which evaluate lineage defined by transformations, >> invoke actions and return results. What about DataFrame operations like >> join, groupBy, agg, unionAll etc which are all transformations in RDD? Are >> they lazily evaluated or immediately executed? >> >> >> >> -- Best Regards, Ayan Guha

Re: Running SparkSql against Hive tables

2015-06-08 Thread ayan guha
gt;> Yes, Hive support is enabled by default now for the binaries on the >> website. However, currently Spark SQL doesn't support buckets yet. >> >> >> 2) Does running Spark SQL against tables in Hive downgrade the >> performance, and it is better that I load parquet files directly to HDFS or >> having Hive in the picture is harmless ? >> >> If you're using Parquet, then it should be fine since by default Spark >> SQL uses its own native Parquet support to read Parquet Hive tables. >> >> >> Thnx >> >> >> > -- Best Regards, Ayan Guha

Re: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-09 Thread ayan guha
------ > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > >> > > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Reading Really Big File Stream from HDFS

2015-06-11 Thread ayan guha
Why do you need to use stream in this use case? 50g need not to be in memory. Give it a try with high number of partitions. On 11 Jun 2015 23:09, "SLiZn Liu" wrote: > Hi Spark Users, > > I'm trying to load a literally big file (50GB when compressed as gzip > file, stored in HDFS) by receiving a D

Re: Deleting HDFS files from Pyspark

2015-06-11 Thread ayan guha
Simplest way would be issuing a os.system with HDFS rm command from driver, assuming it has hdfs connectivity, like a gateway node. Executors will have nothing to do with it. On 12 Jun 2015 08:57, "Siegfried Bilstein" wrote: > I've seen plenty of examples for creating HDFS files from pyspark but

Spark 1.4 release date

2015-06-12 Thread ayan guha
Hi When is official spark 1.4 release date? Best Ayan

Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks a lot. On 12 Jun 2015 19:46, "Todd Nist" wrote: > It was released yesterday. > > On Friday, June 12, 2015, ayan guha wrote: > >> Hi >> >> When is official spark 1.4 release date? >> Best >> Ayan >> >

Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
gt; https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html > > > Guru Medasani > gdm...@gmail.com > > > > On Jun 12, 2015, at 7:08 AM, ayan guha wrote: > > Thanks a lot. > On 12 Jun 2015 19:46, "Todd Nist" wrote: > >> It was release

Re: What is most efficient to do a large union and remove duplicates?

2015-06-14 Thread ayan guha
Can you do dedupe process locally for each file first and then globally? Also I did not fully get the logic of the part inside reducebykey. Can you kindly explain? On 14 Jun 2015 13:58, "Gavin Yue" wrote: > I have 10 folder, each with 6000 files. Each folder is roughly 500GB. So > totally 5TB da

Re: Union of two Diff. types of DStreams

2015-06-14 Thread ayan guha
That's by design. You can make up the value in 2nd rdd by putting a default Int value. On 15 Jun 2015 15:30, "anshu shukla" wrote: > How to take union of JavaPairDStream and > JavaDStream . > > *>a.union(b) is working only with Dstreams of same type.* > > > -- > Thanks & Regards, > Ans

Re: How to set up a Spark Client node?

2015-06-15 Thread ayan guha
deManager & dataNode >> Node 7 is Spark-master configured to run yarn-client or yarn-master modes >> I have tested it and it works fine. >> Is there any instuctions on how to setup spark client in a cluster mode? >> I am not sure if I am doing it right. >> Thanks in advance >> > > -- Best Regards, Ayan Guha

Re: Spark on EMR

2015-06-16 Thread ayan guha
That's great news. Can I assume spark on EMR supports kinesis to hbase pipeline? On 17 Jun 2015 05:29, "kamatsuoka" wrote: > Spark is now officially supported on Amazon Elastic Map Reduce: > http://aws.amazon.com/elasticmapreduce/details/spark/ > > > > -- > View this message in context: > http://

Re: Spark or Storm

2015-06-16 Thread ayan guha
I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis

Re: Spark or Storm

2015-06-17 Thread ayan guha
;> phase of the data processing, the transformed data is stored to the >>> database and this transformed data should then be sent to a new pipeline >>> for further processing >>> >>> How can this be achieved using Spark? >>> >>> >>> >>&

Re: Intermedate stage will be cached automatically ?

2015-06-17 Thread ayan guha
ediate stage > > > *val *data = sc.parallelize(1 to 10, 2).map(e=>(e%2,2)).reduceByKey(_ + > _, 2) > *println*(data.count()) > *println*(data.count()) > > -- Best Regards, Ayan Guha

Re: Spark or Storm

2015-06-17 Thread ayan guha
in/java/com/amazonaws/services/kinesis/samples/datavis/kcl/CountingRecordProcessor.java > > Instead of incrementing a counter, you could do your transformation and > send it to HBase. > > > > > > > On Wed, Jun 17, 2015 at 1:40 PM, ayan guha wrote: > >> Great discu

Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-17 Thread ayan guha
Try to grant read execute access through sentry. On 18 Jun 2015 05:47, "Nitin kak" wrote: > I am trying to run a hive query from Spark code using HiveContext object. > It was running fine earlier but since the Apache Sentry has been set > installed the process is failing with this exception : > >

Re: Pyspark RDD search

2015-06-18 Thread ayan guha
You can do cross join and filter. On 18 Jun 2015 14:17, "bhavyateja" wrote: > I have an RDD which is list of list > And another RDD which is list of pairs > No duplicates in inner list of first RDD and > No duplicates in the pairs from second rdd > I am trying to check if any pair of second RDD i

Re: Best way to randomly distribute elements

2015-06-18 Thread ayan guha
--- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > -- >[image: eXenSa] > *Guillaume PITEL, Président* > +33(0)626 222 431 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 > -- Best Regards, Ayan Guha

Re: Spark 1.4 on HortonWork HDP 2.2

2015-06-19 Thread ayan guha
what problem are you facing? are you trying to build it yurself or gettingpre-built version? On Fri, Jun 19, 2015 at 10:22 PM, Ashish Soni wrote: > Hi , > > Is any one able to install Spark 1.4 on HDP 2.2 , Please let me know how > can i do the same ? > > Ashish > -- Best Regards, Ayan Guha

Re: Spark 1.4 on HortonWork HDP 2.2

2015-06-19 Thread ayan guha
ate it to 1.4 > > Ashish > > On Fri, Jun 19, 2015 at 8:26 AM, ayan guha wrote: > >> what problem are you facing? are you trying to build it yurself or >> gettingpre-built version? >> >> On Fri, Jun 19, 2015 at 10:22 PM, Ashish Soni >> wrote: >> >

Re: JavaDStream read and write rdbms

2015-06-22 Thread ayan guha
Spark docs has addresses this pretty well. Look for patterns of use foreachRDD. On 22 Jun 2015 17:09, "Manohar753" wrote: > > Hi Team, > > How to split and put the red JavaDStream in to mysql in java. > > any existing api in sark 1.3/1.4. > team can you please share the code snippet if any body

  1   2   3   4   5   6   7   8   >