Hi Jungtaek, thanks for your reply. I was afraid that the problem is not only on my side but rather of conceptual nature. I guess I have to rethink my approach. However, because you mentioned DeltaLake. I have the same problem, but the other way around, with DeltaLake. I cannot write with a stream to a DeltaLake created from a static dataframe.
Anyhow, best regards Eugen On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote: > Hi, > > The file stream sink maintains the metadata in the output directory. > The metadata retains the list of files written by the streaming > query, and Spark reads the metadata on listing the files to read. > > This is to guarantee end-to-end exactly once on writing files in the > streaming query. There could be failure on the streaming query and > some files may be partially written. Metadata will help to skip > reading these files and only read files which are correctly written. > > This leads to a major restriction, you can't write the output > directory from multiple queries. For your case, Spark will only read > the files which are written from the streaming query. > > There are 3rd party projects dealing with transactional write from > multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so > on. You may want to check them out. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Sep 2, 2021 at 10:04 PM <eugen.wintersber...@gmail.com> > wrote: > > Hi all, > > I recently stumbled about a rather strange problem with > > streaming sources in one of my tests. I am writing a Parquet file > > from a streaming source and subsequently try to append the same > > data but this time from a static dataframe. Surprisingly, the > > number of rows in the Parquet file remains the same after the > > append operation. > > Here is the relevant code > > > > "Appending data from static dataframe" must "produce twice as much > > data" in { > > logLinesStream.writeStream > > .format("parquet") > > .option("path", path.toString) > > .outputMode("append") > > .start() > > .processAllAvailable() > > spark.read.format("parquet").load(path.toString).count mustBe 1159 > > > > logLinesDF.write.format("parquet").mode("append").save(path.toStrin > > g) > > spark.read.format("parquet").load(path.toString).count mustBe > > 2*1159 > > } > > > > Does anyone have an idea what I am doing wrong here? > > > > thanks in advance > > Eugen Wintersberger