Sorry a correction regarding creating incrementing ID in Pyspark

>>> df = spark.range(1,5)
>>> from pyspark.sql.window import Window as W
>>> from pyspark.sql import functions as F
>>> df = df.withColumn("idx", F.monotonically_increasing_id())
>>> Wspec = W.orderBy("idx")
>>> df.withColumn("idx", F.row_number().over(windowSpec)).show()
2021-07-13 22:04:15,001 WARN window.WindowExec: No Partition Defined for
Window operation! Moving all data to a single partition, this can cause
serious performance degradation.
+---+---+
| id|idx|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Jul 2021 at 21:32, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Meaning as a monolithically incrementing ID as in Oracle sequence for each
> record read from Kafka. adding that to your dataframe?
>
> If you do Structured Structured Streaming in microbatch mode, you will get
> what is known as BatchId
>
>            result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      *foreachBatch(sendToSink). \*
>                      trigger(processingTime='30 seconds'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName(config['MDVariables']['topic']). \
>
> That function sendToSink will introduce two variables df and batchId
>
> def *sendToSink(df, batchId):*
>     if(len(df.take(1))) > 0:
>         print(f"""md batchId is {batchId}""")
>         df.show(100,False)
>         df. persist()
>         # write to BigQuery batch table
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>         df.unpersist()
>         print(f"""wrote to DB""")
>     else:
>         print("DataFrame md is empty")
>
> That value batchId can be used for each Batch.
>
>
> Otherwise you can do this
>
>
> startval = 1
> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
> felixkizhakkelj...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>> I am wondering if its possible for me to introduce a uniquely incrementing
>> identifier for each record as we do in RDBMS (incrementing long id)?
>> This would greatly benefit to range prune while reading based on this ID.
>>
>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>> like its not deterministic and it wont ensure new records gets next id from
>> the latest id what  is already present in the storage (S3)
>>
>> Regards,
>> Felix K Jose
>>
>

Reply via email to