Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Ignore me, a bit more digging and I was able to find the filesink source Following that pattern worked a treat! Thanks again Michea

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Sorry those are methods I wrote so you can ignore them :) so just adding a path parameter tells spark thats where the update log is? Do I check for the unique id there and identify which batch was written and which weren't Are there any examples of this out there? there aren't much connectors in

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
The JSON log is only used by the file sink (which it doesn't seem like you are using). Though, I'm not sure exactly what is going on inside of setupGoogle or how tableReferenceSource is used. Typically you would run df.writeStream.option("path", "/my/path")... and then the transaction log would g

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Hi Micheal If thats the case for the below example, where should i be reading these json log files first? im assuming sometime between df and query? val df = spark .readStream .option("tableReferenceSource",tableName) .load() setUpGoogle(spark.sqlContext) val query = df .writeStre

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Read the JSON log of files that is in `/your/path/_spark_metadata` and only read files that are present in that log (ignore anything else). On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin wrote: > Ah I see ok so probably it's the retry that's causing it > > So when you say I'll have to take this into

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Ah I see ok so probably it's the retry that's causing it So when you say I'll have to take this into account, how do I best do that? My sink will have to know what was that extra file. And i was under the impression spark would automagically know this because of the checkpoint directory set when y

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Sorry, I think I was a little unclear. There are two things at play here. - Exactly-once semantics with file output: spark writes out extra metadata on which files are valid to ensure that failures don't cause us to "double count" any of the input. Spark 2.0+ detects this info automatically whe

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Hmm ok I understand that but the job is running for a good few mins before I kill it so there should not be any jobs left because I can see in the log that its now polling for new changes, the latest offset is the right one After I kill it and relaunch it picks up that same file? Sorry if I misu

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
It is always possible that there will be extra jobs from failed batches. However, for the file sink, only one set of files will make it into _spark_metadata directory log. This is how we get atomic commits even when there are files in more than one directory. When reading the files with Spark, we

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
On another note, when it comes to checkpointing on structured streaming I noticed if I have a stream running off s3 and I kill the process. The next time the process starts running it dulplicates the last record inserted. is that normal? So say I have streaming enabled on one folder "test" wh

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Thanks Micheal! On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust wrote: > Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497 > > We should add this soon. > > On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin > wrote: > >> Hi All >> >> When trying to read a stream off S3 and I try and dro

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497 We should add this soon. On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin wrote: > Hi All > > When trying to read a stream off S3 and I try and drop duplicates I get > the following error: > > Exception in thread "main" org.apache.spark.s