Yay! Glad you could figure it out!
On Sat, Feb 15, 2020 at 7:41 AM Ruijing Li wrote:
> Thought to update this thread. Figured out my issue with forEachBatch and
> structured streaming, I had an issue where I did a count() before write()
> so my streaming query branched into 2. I am now using Tri
Thought to update this thread. Figured out my issue with forEachBatch and
structured streaming, I had an issue where I did a count() before write()
so my streaming query branched into 2. I am now using Trigger and
structured streaming to handle checkpointing instead of doing it myself.
Thanks all f
Looks like I’m wrong, since I tried that exact snippet and it worked
So to be clear, in the part where I do batchDF.write.parquet, that is not
the exact code I’m using.
I’m using a custom write function that does similar to write.parquet but
has some added functionality. Somehow my custom write f
Hi all,
I tried with forEachBatch but got an error. Is this expected?
Code is
df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
batchDF.write.parquet(hdfsPath)
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()
Exception is: Queries with streaming sources must be
Hi Burak,
I am not quite used to streaming, but was almost thinking on the same lines
:) makes a lot of sense to me now.
Regards,
Gourav
On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz wrote:
> Do you really want to build all of that and open yourself to bugs when you
> can just use foreachBatch? H
Do you really want to build all of that and open yourself to bugs when you
can just use foreachBatch? Here are your options:
1. Build it yourself
// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()
df = spark.read.format("kafka").option("startOffsets",
prevOf
Thanks Anil, I think that’s the approach I will take.
Hi Burak,
That was a possibility to think about, but my team has custom dataframe
writer functions we would like to use, unfortunately they were written for
static dataframes in mind. I do see there is a ForEachBatch write mode but
my thinking
Hi Ruijing,
Why do you not want to use structured streaming here? This is exactly why
structured streaming + Trigger.Once was built, just so that you don't build
that solution yourself.
You also get exactly once semantics if you use the built in sinks.
Best,
Burak
On Mon, Feb 3, 2020 at 3:15 PM
Hi Ruijing,
We did the below things to read Kafka in batch from spark:
1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes.
3) Pass the start and end offsets
4) Overwrite the start offset with the end offset. (Should be done post
processing
The most common delivery semantic for Kafka producer is at least once.
So your consumers have to handle dedupe.
Spark can do checkpoint but you have to be explicit about it. It only makes
sense if your dataframe lineage gets too long (only if you're doing a
highly iterative algorithm) and you nee
Hi Chris,
Thanks for the answer. So if I understand correctly:
- there will be need to dedupe since I should be expecting at least once
delivery.
- storing the result of (group by partition and and aggregate max offsets)
is enough since kafka message is immutable, so a message will get sent with
Kafka can keep track of the offsets (in a separate topic based on your
consumer group) you've seen but it is usually best effort and you're
probably better off also keeping track of your offsets.
If the producer resends a message you would have to dedupe it as you've
most likely already seen it, h
12 matches
Mail list logo