Re: Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread Ángel Álvarez Pascua
Just a quick note on working at the RDD level in Spark — once you go down to that level, it’s entirely up to you to handle everything. You gain more control and flexibility, but Spark steps back and hands you the steering wheel. If tasks fail, it's usually because you're allowing them to — by not p

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
The contract between the two is a dataset, yes; but did you construct the former via headstream? If not, it’s still batch. -dan On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla wrote: > > Implementation via Kafka Connector > > > org.apache.spark > spark-sql-kafka-0-10_${scala.version}

Re: Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread Abhishek Singla
@daniel williams > Operate your producer in transactional mode. Could you elaborate more on this. How to achieve this with foreachPartition and HTTP Client. > Checkpointing is an abstract concept only applicable to streaming. That means checkpointing is not supported in batch mode, right? @ange

Re: Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread daniel williams
Apologies. I’m imagining a pure Kafka application and imagining a transactional consumer on processing. Angel’s suggestion is the correct one, mapPartition to maintain state across your broadcast Kafka producers to retry or introduce back pressure given your producer and retry in its threadpool giv

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Implementation via Kafka Connector org.apache.spark spark-sql-kafka-0-10_${scala.version} 3.1.2 private static void publishViaKafkaConnector(Dataset dataset, Map conf) { dataset.write().format("kafka").options(conf).save(); } conf = { "kafka.bootstrap.servers": "localh

Appreciate a second opinion – Metadata Analysis of PDF Files

2025-04-16 Thread Mich Talebzadeh
Hi all, I’m reaching out to see if anyone in the community has experience with *digital forensics or metadata analysis* and would be willing to assist. I have two PDF documents that I received in December 2024 but which claim to relate to events in July 2024. I’ve already extracted the metadata u

Re: High count of Active Jobs

2025-04-16 Thread nayan sharma
HI Ángel, I haven't tried disabling speculation but I will try running in DEBUG mode. Thanks & Regards, Nayan Sharma *+91-8095382952* On Wed, Apr 16, 2025 at 12:32 PM Ángel Álvarez Pas

Re: Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread Ángel Álvarez Pascua
What about returning the HTTP codes as a dataframe result in the foreachPartition, saving it to files/table/whatever and then performing a join to discard already ok processed rows when you it try again? El mié, 16 abr 2025 a las 15:01, Abhishek Singla (< abhisheksingla...@gmail.com>) escribió: >

Re: Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread daniel williams
Yes. Operate your producer in transactional mode. Checkpointing is an abstract concept only applicable to streaming. -dan On Wed, Apr 16, 2025 at 7:02 AM Abhishek Singla wrote: > Hi Team, > > We are using foreachPartition to send dataset row data to third system via > HTTP client. The operatio

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
If you are building a broadcast to construct a producer with a set of options then the producer is merely going to operate how it’s going to be configured - it has nothing to do with spark save that the foreachPartition is constructing it via the broadcast. A strategy I’ve used in the past is to *

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Yes, producing via kafka-clients using foreachPartition works as expected. Kafka Producer is initialised within call(Iterator t) method. The issue is with using kafka connector with Spark batch. The configs are not being honored even when they are being set in ProducerConfig. This means kafka reco

Checkpointing in foreachPartition in Spark batck

2025-04-16 Thread Abhishek Singla
Hi Team, We are using foreachPartition to send dataset row data to third system via HTTP client. The operation is not idempotent. I wanna ensure that in case of failures the previously processed dataset should not get processed again. Is there a way to checkpoint in Spark batch 1. checkpoint proc

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
If you’re using in batch mode with foreachPartition you’ll want to pass in your options as a function of the configuration of your app, not spark (i.e. use scopt to pass in a map which then gets passed to your builder/broadxast). If you are using spark structured streaming you can use Kafka.* prope

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Sharing Producer Config on Spark Startup 25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values: acks = -1 batch.size = 1000 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.id

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Hi Daniel and Jungtaek, I am using Spark in batch. Tried with kafka., now I can see they are being set in Producer Config on Spark Startup but still they are not being honored. I have set "linger.ms": "1000" and "batch.size": "10". I am publishing 10 records and they are flushed to kafka serve

Comparison between union and stack in pyspark

2025-04-16 Thread Dhruv Singla
Hey Everyone In Spark, suppose i have the following df. ``` df = spark.createDataFrame([['A', 'A06', 'B', 'B02', '202412'], ['A', 'A04', 'B', 'B03', '202501'], ['B', 'B01', 'C', 'C02', '202411'], ['B', 'B03', 'A', 'A06', '202502']], 'entity_code: string, entity_rollup: string, target_entity_code:

Structured Streaming Initial Listing Issue

2025-04-16 Thread Anastasiia Sokhova
Dear Spark Community, I run a Structured Streaming Query to read json files from S3 into an Iceberg table. This is my query: ```python stream_reader = ( spark_session.readStream.format("json") .schema(schema) .option("maxFilesPerTrigger", 256_000) .option("basePath", f"s3a://tes

Re: High count of Active Jobs

2025-04-16 Thread Ángel Álvarez Pascua
Hi Nayan, Yesterday, I tried to reproduce the issue with speculation enabled, but no luck. Even when using threads and performing multiple actions simultaneously, the stages got queued—but they all eventually completed. I'm curious... have you tried disabling speculation? If so, please let me kno