Hi Jungtaek, 
 thanks for your reply. I was afraid that the problem is not only on my
side but rather of conceptual nature. I guess I have to rethink my
approach. However, because you mentioned DeltaLake. I have the same
problem, but the other way around, with DeltaLake. I cannot write with
a stream to a DeltaLake created from a static dataframe.

Anyhow, best regards
  Eugen

On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
> Hi,
> 
> The file stream sink maintains the metadata in the output directory.
> The metadata retains the list of files written by the streaming
> query, and Spark reads the metadata on listing the files to read.
> 
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and
> some files may be partially written. Metadata will help to skip
> reading these files and only read files which are correctly written.
> 
> This leads to a major restriction, you can't write the output
> directory from multiple queries. For your case, Spark will only read
> the files which are written from the streaming query.
> 
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so
> on. You may want to check them out.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> On Thu, Sep 2, 2021 at 10:04 PM <eugen.wintersber...@gmail.com>
> wrote:
> > Hi all,
> >   I recently stumbled about a rather strange  problem with
> > streaming sources in one of my tests. I am writing a Parquet file
> > from a streaming source and subsequently try to append the same
> > data but this time from a static dataframe. Surprisingly, the
> > number of rows in the Parquet file remains the same after the
> > append operation. 
> > Here is the relevant code
> > 
> > "Appending data from static dataframe" must "produce twice as much
> > data" in {
> > logLinesStream.writeStream
> > .format("parquet")
> > .option("path", path.toString)
> > .outputMode("append")
> > .start()
> > .processAllAvailable()
> > spark.read.format("parquet").load(path.toString).count mustBe 1159
> > 
> > logLinesDF.write.format("parquet").mode("append").save(path.toStrin
> > g)
> > spark.read.format("parquet").load(path.toString).count mustBe
> > 2*1159
> > }
> > 
> > Does anyone have an idea what I am doing wrong here?
> > 
> > thanks in advance
> >  Eugen Wintersberger

Reply via email to