Have you used the new equality functions introduced in Spark 3.5? https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.testing.assertDataFrameEqual.html
El jue, 17 abr 2025, 13:18, daniel williams <daniel.willi...@gmail.com> escribió: > Good call out. Yeah, once you take your work out of Spark it’s all on you. > Any level partitions operations (e.g. map, flat map, foreach) ends up as a > lambda in catalyst. I’ve found, however, not using explode and doing things > procedurally at this point with a sufficient amount of unit testing helps a > ton on coverage and stability. If you’re not using spark-testing-base I > highly advise it. I’ve built many streaming and processing topologies on > spark for several orgs now and Holden’s framework has saved me more times > than I can imagine. > > -dan > > > On Thu, Apr 17, 2025 at 12:09 AM Ángel Álvarez Pascua < > angel.alvarez.pas...@gmail.com> wrote: > >> Just a quick note on working at the RDD level in Spark — once you go down >> to that level, it’s entirely up to you to handle everything. You gain more >> control and flexibility, but Spark steps back and hands you the steering >> wheel. If tasks fail, it's usually because you're allowing them to — by not >> properly capturing or handling all potential errors. >> >> Last year, I worked on a Spark Streaming project that had to interact >> with various external systems (OpenSearch, Salesforce, Microsoft Office >> 365, a ticketing service, ...) through HTTP requests. We handled those >> interactions the way I told you, and it’s been running in production >> without issues since last summer. >> >> That said, a word of caution: while we didn’t face any issues, some >> colleagues on a different project ran into a problem when calling a model >> inside a UDF. In certain cases — especially after using the built-in >> explode function — Spark ended up calling the model twice per row. Their >> workaround was to cache the DataFrame before invoking the model. (I don’t >> think Spark speculation was enabled in their case, by the way.) >> >> I still need to dig deeper into the root cause, but just a heads-up — >> under certain conditions, Spark might trigger multiple executions. We also >> used the explode function in our project, but didn’t observe any >> duplicate calls... so for now, it remains a bit of a mystery. >> >> >> >> El jue, 17 abr 2025 a las 2:02, daniel williams (< >> daniel.willi...@gmail.com>) escribió: >> >>> Apologies. I’m imagining a pure Kafka application and imagining a >>> transactional consumer on processing. Angel’s suggestion is the correct >>> one, mapPartition to maintain state across your broadcast Kafka producers >>> to retry or introduce back pressure given your producer and retry in its >>> threadpool given the producers built in backoff. This would allow for you >>> to retry n times and then upon final (hopefully not) failure update your >>> dataset for further processing. >>> >>> -dan >>> >>> >>> On Wed, Apr 16, 2025 at 5:04 PM Abhishek Singla < >>> abhisheksingla...@gmail.com> wrote: >>> >>>> @daniel williams <daniel.willi...@gmail.com> >>>> >>>> > Operate your producer in transactional mode. >>>> Could you elaborate more on this. How to achieve this with >>>> foreachPartition and HTTP Client. >>>> >>>> > Checkpointing is an abstract concept only applicable to streaming. >>>> That means checkpointing is not supported in batch mode, right? >>>> >>>> @angel.alvarez.pas...@gmail.com <angel.alvarez.pas...@gmail.com> >>>> >>>> > What about returning the HTTP codes as a dataframe result in the >>>> foreachPartition >>>> I believe you are referring to mapParitions, that could be done. The >>>> main concern here is that the issue is not only with failures due to the >>>> third system but also with task/job failures. I wanted to know if there is >>>> an existing way in spark batch to checkpoint already processed rows of a >>>> partition if using foreachPartition or mapParitions, so that they are not >>>> processed again on rescheduling of task due to failure or retriggering of >>>> job due to failures. >>>> >>>> Regards, >>>> Abhishek Singla >>>> >>>> On Wed, Apr 16, 2025 at 7:38 PM Ángel Álvarez Pascua < >>>> angel.alvarez.pas...@gmail.com> wrote: >>>> >>>>> What about returning the HTTP codes as a dataframe result in the >>>>> foreachPartition, saving it to files/table/whatever and then performing a >>>>> join to discard already ok processed rows when you it try again? >>>>> >>>>> El mié, 16 abr 2025 a las 15:01, Abhishek Singla (< >>>>> abhisheksingla...@gmail.com>) escribió: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> We are using foreachPartition to send dataset row data to third >>>>>> system via HTTP client. The operation is not idempotent. I wanna ensure >>>>>> that in case of failures the previously processed dataset should not get >>>>>> processed again. >>>>>> >>>>>> Is there a way to checkpoint in Spark batch >>>>>> 1. checkpoint processed partitions so that if there are 1000 >>>>>> partitions and 100 were processed in the previous batch, they should not >>>>>> get processed again. >>>>>> 2. checkpoint partial partition in foreachPartition, if I have >>>>>> processed 100 records from a partition which have 1000 total records, is >>>>>> there a way to checkpoint offset so that those 100 should not get >>>>>> processed >>>>>> again. >>>>>> >>>>>> Regards, >>>>>> Abhishek Singla >>>>>> >>>>>