Hello list,
for the code in the link:
https://github.com/apache/spark/blob/v3.2.1/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
I am not sure, why enclose the RDD to Dataframe logic in a foreachRDD block?
What's the use of foreachRDD?
Thanks in advance.
Absolutely
The reason this error happens is that an rdd is a one dimensional data
structure whilst a data frame has to be 2 dimensional, i.e. we have a
List[Integer] but we need List[Tuple[Integer]].
Try this
>>> rdd = sc.parallelize([3,2,1,4])
>>> df = rdd.map(lambda x: (x,)).toDF()
>>> df.p
Hi,
can we understand the requirement first?
What is that you are trying to achieve by auto increment id? Do you just
want different ID's for rows, or you may want to keep track of the record
count of a table as well, or do you want to do use them for surrogate keys?
If you are going to insert re
Probably not the answer you are looking for, but the best thing to do is to
avoid making Spark code sleep. Is there a way you can predict how big your
autoscaling group needs to be without looking at all the data? Are you using
fixed number of Spark executors or are you have some way of scaling
Hello All,
I'm using StructuredStreaming to read data from Kafka, and need to do
transformation on each individual row.
I'm trying to use 'foreach' (or foreachBatch), and running into issues.
Basic question - how is the row passed to the function when foreach is used
?
Also, when I use foreachBa
read below
"""
"foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
*foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Data
Thanks, Mich .. that worked fine!
On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh
wrote:
> read below
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
>
Hello,
For this query:
df.select("*").orderBy("amount",ascending=False).show()
+--+--+
| fruit|amount|
+--+--+
|tomato| 9|
| apple| 6|
|cherry| 5|
|orange| 3|
+--+--+
I want to add a column "top", in which the value is: 1,2,3... meaning
top1, top2, to
For this req you can rank or dense rank.
On Tue, 8 Feb 2022 at 1:12 pm, wrote:
> Hello,
>
> For this query:
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |tomato| 9|
> | apple| 6|
> |cherry| 5|
> |orange| 3
Hi,
sorry once again, will try to understand the problem first :)
As we can clearly see that the initial responses were incorrectly guessing
the solution to be monotonically_increasing function
What if there are two fruits with equal amount? For any real life
application, can we understand what
Hello Gourav
As you see here orderBy has already give the solution for "equal
amount":
df =
sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])
df.select("*").orderBy("amount",ascending=False).show()
+--+--+
| fruit|amount|
+--+
This has the information that you require in order to add an extra column
with a sequence to it.
On Tue, 8 Feb 2022 at 09:11, wrote:
>
> Hello Gourav
>
>
> As you see here orderBy has already give the solution for "equal
> amount":
>
> >>> df =
> >>>
> sc.parallelize([("orange",2),("apple",3),(
https://stackoverflow.com/a/51854022/299676
On Tue, 8 Feb 2022 at 09:25, Stelios Philippou wrote:
> This has the information that you require in order to add an extra column
> with a sequence to it.
>
>
> On Tue, 8 Feb 2022 at 09:11, wrote:
>
>>
>> Hello Gourav
>>
>>
>> As you see here orderBy
simple either rank() or desnse_rank()
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.functions import col
>>> from pyspark.sql.window import Window
>>> wOrder = Window().orderBy(df['amount'].desc())
>>> df.select(F.rank().over(wOrder).alias("rank"), col('fruit'),
col('amount')).sh
14 matches
Mail list logo