I agree with what is stated. This is the gist of my understanding having tested it. When working with Spark Structured Streaming, each streaming query runs in its own separate Spark session to ensure isolation and avoid conflicts between different queries. So here I have:
def process_data(self, df: F.DataFrame, batchId: int) -> None: if(len(df.take(1))) > 0: df.select(col("timestamp"), col("value"), col("rowkey"), col("ID"), col("CLUSTERED"), col("op_time")).show(1, False) df.createOrReplaceTempView("tmp_view") try: rows = *df.sparkSession.sq*l("SELECT COUNT(1) FROM tmp_view").collect()[0][0] print(f"Number of rows: {rows}") except Exception as e: logging.error(f"Error counting rows: {e}") else: logging.warning("DataFrame is empty") Here, df.sparkSession accesses the rows associated with the streaming DataFrame 'df' +-----------------------+--------+------------------------------------+--------+---------+-------------------+ |timestamp |value |rowkey |ID |CLUSTERED|op_time | +-----------------------+--------+------------------------------------+--------+---------+-------------------+ |2024-01-31 20:31:24.152|25754740|df4d864d-517d-4f59-8f9e-bd1e7cd9678f|25754740|2575473.9|2024-01-31 20:31:30| +-----------------------+--------+------------------------------------+--------+---------+-------------------+ only showing top 1 row rows is 50 HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 31 Jan 2024 at 13:30, Karthick Nk <kcekarth...@gmail.com> wrote: > Hi Team, > > I am using structered streaming in pyspark in azure Databricks, in that I > am creating temp_view from dataframe > (df.createOrReplaceTempView('temp_view')) for performing spark sql query > transformation. > In that I am facing the issue that temp_view not found, so that as a > workaround i have created global temp_view to use. > But same when i have tried to create without streaming, i am able to > perform the temp_view. > > > write_to_final_table = > > (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1 > minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema", > "true").option("failOnDataLoss", "false").start() > > > def process_micro_batch(micro_batch_df, batchId) : > micro_batch_df.createOrReplaceTempView("temp_view") > df = spark.sql(f"select * from temp_view") > return df > > Here, I am getting error, while reading data from temp_view that temp_view > not found error. > > > I need to perform or create temp_view (*Not global temp_view)based on the > dataframe, and need to perform the spark sql transformation in structered > streaming. > > I have few question in my hand? > 1. is strucutered streaming and spark.sql will have different > spark.context within same databricks notebook? > 2. If i want to create temp_view based on the dataframe and need to > perform the spark sql operation, how can i create the tempview (Not global > tempview, Since global temp view will be available in the cluster level > across all the notebook)? > > Thanks & Regards >