[ https://issues.apache.org/jira/browse/SPARK-49127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-49127. ---------------------------------- Resolution: Fixed Please post to user@ mailing list. The Jira ticket isn't purposed to handle question. > How to restart failed spark stream job from the failure point > ------------------------------------------------------------- > > Key: SPARK-49127 > URL: https://issues.apache.org/jira/browse/SPARK-49127 > Project: Spark > Issue Type: Question > Components: PySpark > Affects Versions: 3.5.0 > Environment: GCP Dataproc cluster > Reporter: Rajdeepak > Priority: Minor > > I am setting up a ETL process using pyspark. My input is a kafka stream and i > am writing output to multiple sink (one into kafka and another into cloud > storage). I am writing checkpoints on the cloud storage. The issue i am > facing is that, whenever my application is getting failed due to some reason > and when i am restarting my application then, my pyspark application is again > reprocessing some (not all) of the input stream data causing data redundancy. > Is there any way i can avoid this. I am using spark 3.5.0 and python 3.11. > Below are some of my application code: > Spark Session : > spark = SparkSession \ > .builder \ > .appName("ETL") \ > .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \ > .config('spark.hadoop.fs.s3a.aws.credentials.provider', > 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\ > .config('spark.driver.extraJavaOptions', '-Duser.timezone=GMT') \ > .config('spark.executor.extraJavaOptions', '-Duser.timezone=GMT') \ > .config('spark.sql.session.timeZone', 'UTC') \ > .config('spark.hadoop.fs.s3a.buffer.dir', '/tmp,/mnt/tmp') \ > .config('spark.hadoop.fs.s3a.fast.upload.buffer', 'bytebuffer') \ > .config('spark.hadoop.fs.s3a.fast.upload.active.blocks', 1) \ > .config('spark.streaming.backpressure.enabled', True) \ > .config("spark.redis.host",conf["nosql-host"]) \ > .config("spark.redis.port",conf["nosql-port"]) \ > .config("spark.redis.db",conf["nosql-db"]) \ > .config("spark.redis.auth", __REDIS_CREDENTIAL__) \ > .getOrCreate() > > Kafka Read Stream : > streamDF = (spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", kafka_bootstrap_server_consumer) \ > .option("subscribe", kafka_topic_name) \ > .option("mode", "PERMISSIVE") \ > .option("startingOffsets", "earliest").option("failOnDataLoss", "false") \ > .load().withColumn('fixedValue', fn.expr("substring(value, 6, > length(value)-5)")).select('fixedValue')) > > Write Stream to multiple sinks : > write_stream = extractionDF \ > .writeStream \ > .trigger(processingTime='2 seconds') \ > .outputMode("append") \ > .foreachBatch(lambda df,epochId: write_to_multiple_sinks(df, > epochId,processed_cloud_storage_path,kafka_bootstrap_server_producer)) \ > .option("truncate", "false").option("checkpointLocation", cloud_storage_path)\ > .start() > write_to_multiple_sinks Function : > def write_to_multiple_sinks(dataframe: DataFrame, epochId,cloud_storage_path, > kafka_bootstrap_server): > dataframe = dataframe.cache() > druidDF = dataframe.select(druidSchema()) > druidDF.selectExpr(producerTopic,"to_json(struct(*)) AS value").write\ > .format("kafka")\ > .option("kafka.bootstrap.servers", kafka_bootstrap_server).save() > processedDF = dataframe.select(processedSchema()) > processedDF.write.format("csv").mode("append").option("sep","^").option("compression","gzip").option("path", > cloud_storage_path).save() > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org