[ 
https://issues.apache.org/jira/browse/SPARK-49127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-49127.
----------------------------------
    Resolution: Fixed

Please post to user@ mailing list. The Jira ticket isn't purposed to handle 
question.

> How to restart failed spark stream job from the failure point
> -------------------------------------------------------------
>
>                 Key: SPARK-49127
>                 URL: https://issues.apache.org/jira/browse/SPARK-49127
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark
>    Affects Versions: 3.5.0
>         Environment: GCP Dataproc cluster
>            Reporter: Rajdeepak
>            Priority: Minor
>
> I am setting up a ETL process using pyspark. My input is a kafka stream and i 
> am writing output to multiple sink (one into kafka and another into cloud 
> storage). I am writing checkpoints on the cloud storage. The issue i am 
> facing is that, whenever my application is getting failed due to some reason 
> and when i am restarting my application then, my pyspark application is again 
> reprocessing some (not all) of the input stream data causing data redundancy. 
> Is there any way i can avoid this. I am using spark 3.5.0 and python 3.11. 
> Below are some of my application code:
> Spark Session :
> spark = SparkSession \
> .builder \
> .appName("ETL") \
> .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \
> .config('spark.hadoop.fs.s3a.aws.credentials.provider', 
> 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
> .config('spark.driver.extraJavaOptions', '-Duser.timezone=GMT') \
> .config('spark.executor.extraJavaOptions', '-Duser.timezone=GMT') \
> .config('spark.sql.session.timeZone', 'UTC') \
> .config('spark.hadoop.fs.s3a.buffer.dir', '/tmp,/mnt/tmp') \
> .config('spark.hadoop.fs.s3a.fast.upload.buffer', 'bytebuffer') \
> .config('spark.hadoop.fs.s3a.fast.upload.active.blocks', 1) \
> .config('spark.streaming.backpressure.enabled', True) \
> .config("spark.redis.host",conf["nosql-host"]) \
> .config("spark.redis.port",conf["nosql-port"]) \
> .config("spark.redis.db",conf["nosql-db"]) \
> .config("spark.redis.auth", __REDIS_CREDENTIAL__) \
> .getOrCreate()  
>  
> Kafka Read Stream :
> streamDF = (spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafka_bootstrap_server_consumer) \
> .option("subscribe", kafka_topic_name) \
> .option("mode", "PERMISSIVE") \
> .option("startingOffsets", "earliest").option("failOnDataLoss", "false") \
> .load().withColumn('fixedValue', fn.expr("substring(value, 6, 
> length(value)-5)")).select('fixedValue'))
>  
> Write Stream to multiple sinks :
> write_stream = extractionDF \
> .writeStream \
> .trigger(processingTime='2 seconds') \
> .outputMode("append") \
> .foreachBatch(lambda df,epochId: write_to_multiple_sinks(df, 
> epochId,processed_cloud_storage_path,kafka_bootstrap_server_producer)) \
> .option("truncate", "false").option("checkpointLocation", cloud_storage_path)\
> .start()
> write_to_multiple_sinks Function :
> def write_to_multiple_sinks(dataframe: DataFrame, epochId,cloud_storage_path, 
> kafka_bootstrap_server):
> dataframe = dataframe.cache()
> druidDF = dataframe.select(druidSchema())
> druidDF.selectExpr(producerTopic,"to_json(struct(*)) AS value").write\
> .format("kafka")\
> .option("kafka.bootstrap.servers", kafka_bootstrap_server).save()
> processedDF = dataframe.select(processedSchema())
> processedDF.write.format("csv").mode("append").option("sep","^").option("compression","gzip").option("path",
>  cloud_storage_path).save()
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to