[ https://issues.apache.org/jira/browse/SPARK-51360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ramakrishna updated SPARK-51360: -------------------------------- Description: It looks like Spark's *numInputRows* is wrongly calculated , it is double the number of records. This can be very misleading in production scenarios. Please find the screenshots attached . I have also zipped the project which reproduces the problem ``` def insertToDeltaTable(batchDF: Dataset[NewEmployee], batchId: Long): Unit = { println("batch ID :: "+ batchId) println("No of records ::: "+ batchDF.count()) batchDF.write.partitionBy("systemId").format("delta").mode("append").save(deltaTablePath) } ``` was: The data in Partition 0 was executed twice. Here is the reproduction code; the issue occurs every time. It appears that the string starting with "0cool" is printed twice. {code:java} from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("llm hard negs records") \ .master(f"local[8]") \ .getOrCreate() def process_partition(index, partition): results = [] s = 0 for _ in partition: row = {"Result": "cool"} results.append(row) s += 1 print(str(index) + "cool" + str(s)) return results data = list(range(2000)) results_rdd = spark.sparkContext.parallelize(data).repartition(8).mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path, mode="overwrite") {code} > Spark counts the total no of records twice in forEachBatch > ---------------------------------------------------------- > > Key: SPARK-51360 > URL: https://issues.apache.org/jira/browse/SPARK-51360 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.5.1 > Reporter: Ramakrishna > Priority: Critical > Labels: pull-request-available > > It looks like Spark's > *numInputRows* is wrongly calculated , it is double the number of records. > This can be very misleading in production scenarios. > > Please find the screenshots attached . > > I have also zipped the project which reproduces the problem > ``` > > def insertToDeltaTable(batchDF: Dataset[NewEmployee], batchId: Long): Unit = { > println("batch ID :: "+ batchId) > println("No of records ::: "+ batchDF.count()) > > batchDF.write.partitionBy("systemId").format("delta").mode("append").save(deltaTablePath) > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org