[ 
https://issues.apache.org/jira/browse/SPARK-51360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931970#comment-17931970
 ] 

Ramakrishna commented on SPARK-51360:
-------------------------------------

This is found in Delta's documentation

 

[https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge]

 

 
 * When {{merge}} is used in {{{}foreachBatch{}}}, the input data rate of the 
streaming query (reported through {{StreamingQueryProgress}} and visible in the 
notebook rate graph) may be reported as a multiple of the actual rate at which 
data is generated at the source. This is because {{merge}} reads the input data 
multiple times causing the input metrics to be multiplied. If this is a 
bottleneck, you can cache the batch DataFrame before {{merge}} and then uncache 
it after {{{}merge{}}}.

> Spark counts the total no of records twice in forEachBatch
> ----------------------------------------------------------
>
>                 Key: SPARK-51360
>                 URL: https://issues.apache.org/jira/browse/SPARK-51360
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Spark Submit, Structured Streaming
>    Affects Versions: 3.5.1
>            Reporter: Ramakrishna
>            Priority: Critical
>              Labels: SPARK
>         Attachments: Scala_practice.zip, Screenshot 2025-03-01 at 1.48.11 
> PM.png
>
>
> It looks like Spark's 
> *numInputRows* in spark's streamingQuery Listener, is wrongly calculated , it 
> is double the number of actual records. This can be very misleading in 
> production scenarios.
>  
> Please find the screenshots attached .
> The *numInputRows* is sometimes a multiple of actual no of records .
> If you see the screenshot, I have df.count() inside forEachBatch and it 
> matches the rate stream's ingestion rate. 2 rows per second and 30 seconds 
> trigger is around 60 records , But *numInputRows* is double the value.
>  
> This seems to be a problem only with forEachBatch, otherwise it works fine.
>  
> I have observed this issue with Delta table as input source, and delta table 
> as output source.
> However in my example I have used *rate* stream.
>  
> I have also zipped the project which reproduces the problem, (Minimal 
> Reproducible Example)
> You need Java 8 and SBT to run this locally.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to