I will try to provide a stripped down example of what I am doing

The initial delta lake is built from a dataframe like this from within
a notebook
val hourly_new = events.select(window('timestamp, "1 hour"), 'serial,
'type)
        .select($"window.start".as("start"), 'serialno, 'eventType)
        .withWatermark("start", "70 minutes")
        .groupBy("start", "serial", "type")
        .count()
        .withColumn("year", year(col("start")))
        .withColumn("month", month(col("start")))

    // COMMAND ----------

hourly_new.write.format("delta").partitionBy("year","month").save("/pat

h/to/delta-lake")

Once this data has been written to disk I would like to append to it
using a streaming task 

val trigger =  Trigger.ProcessingTime(10000)
df.writeStream
      .format(format)
      .option("path", "/path/to/delta-lake")
      .option("checkpointLocation", "/path/to/checkpoint")
      .outputMode(OutputMode.Append())
      .option("mergeSchema","true")
      .trigger(trigger)
      .partitionBy("year", "month")
      .start()

When I start the streaming job, the delta lake referenced never gets
any updates. The number of rows remain as they were after the notebook
code above.
Interestingly, when I create a new file with the streaming code above
and then restart the task the delta lake is happly updated every hour.
What I do not understand is what I do wrong in the notebook code so
that the streaming task is unable to append the data to the delta lake.

Thanks in advance and best regards
   Eugen

On Wed, 2021-07-21 at 19:51 +0000, Denny Lee wrote:
> Including the Delta Lake Users and Developers DL to help out.
> 
> Saying this, could you clarify how data is not being added?  By any
> chance do you have any code samples to recreate this?  
> 
> 
> Sent via Superhuman
> 
> 
> On Wed, Jul 21, 2021 at 2:49 AM, <eugen.wintersber...@gmail.com>
> wrote:
> > Hi all,
> >   I stumbled upon an interessting problem. I have an existing
> > Deltalake with data recovered from a backup and would like to
> > append to this Deltalake using Spark structured streaming. This
> > does not work. Although the streaming job is running no data is
> > appended.
> > If I created the original file with structured streaming than
> > appending to this file with a streaming job (at least with the same
> > job) works flawlessly.  Did I missunderstand something here?
> > 
> > best regards
> >    Eugen Wintersberger
> 

Reply via email to