Nice, thanks again Michael for helping out.

Dmitry

2017-09-14 21:37 GMT+03:00 Michael Armbrust <mich...@databricks.com>:

> Yep, that is correct.  You can also use the query ID which is a GUID that
> is stored in the checkpoint and preserved across restarts if you want to
> distinguish the batches from different streams.
>
> sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
>
> This was added recently
> <https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27>
> though.
>
> On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm.naume...@gmail.com>
> wrote:
>
>> Ok. So since I can get repeated batch ids, I guess I can just store the
>> last committed batch id in my storage (in the same transaction with the
>> data) and initialize the custom sink with right batch id when application
>> re-starts. After this just ignore batch if current batchId <=
>> latestBatchId.
>>
>> Dmitry
>>
>>
>> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>
>>> I think the right way to look at this is the batchId is just a proxy for
>>> offsets that is agnostic to what type of source you are reading from (or
>>> how many sources their are).  We might call into a custom sink with the
>>> same batchId more than once, but it will always contain the same data
>>> (there is no race condition, since this is stored in a write-ahead log).
>>> As long as you check/commit the batch id in the same transaction as the
>>> data you will get exactly once.
>>>
>>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>>> wrote:
>>>
>>>> Thanks, I see.
>>>>
>>>> However, I guess reading from checkpoint directory might be less
>>>> efficient comparing just preserving offsets in Dataset.
>>>>
>>>> I have one more question about operation idempotence (hope it help
>>>> others to have a clear picture).
>>>>
>>>> If I read offsets on re-start from RDBMS and manually specify starting
>>>> offsets on Kafka Source, is it still possible that in case of any failure I
>>>> got a situation where the duplicate batch id will go to a Custom Sink?
>>>>
>>>> Previously on DStream, you will just read offsets from storage on start
>>>> and just write them into DB in one transaction with data and it's was
>>>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>>>> does the same strategy will work with Structured Streaming?
>>>>
>>>> I guess, that in case of Structured Streaming, Spark will commit batch
>>>> offset to a checkpoint directory and there can be a race condition where
>>>> you can commit your data with offsets into DB, but Spark will fail to
>>>> commit the batch id, and some kind of automatic retry happen. If this is
>>>> true, is it possible to disable this automatic re-try, so I can still use
>>>> unified API for batch/streaming with my own re-try logic (which is
>>>> basically, just ignore intermediate data, re-read from Kafka and re-try
>>>> processing and load)?
>>>>
>>>> Dmitry
>>>>
>>>>
>>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>>
>>>>> In the checkpoint directory there is a file /offsets/$batchId that
>>>>> holds the offsets serialized as JSON.  I would not consider this a public
>>>>> stable API though.
>>>>>
>>>>> Really the only important thing to get exactly once is that you must
>>>>> ensure whatever operation you are doing downstream is idempotent with
>>>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>>>> could have a table that records the batch ID and update that in the same
>>>>> transaction as you append the results of the batch.  Before trying to
>>>>> append you should check that batch ID and make sure you have not already
>>>>> committed.
>>>>>
>>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <
>>>>> dm.naume...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for response, Michael
>>>>>>
>>>>>> >  You should still be able to get exactly once processing by using
>>>>>> the batchId that is passed to the Sink.
>>>>>>
>>>>>> Could you explain this in more detail, please? Is there some kind of
>>>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>>>
>>>>>> Dmitry
>>>>>>
>>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>>>>
>>>>>>> I think that we are going to have to change the Sink API as part of
>>>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>>>>> which is why I linked these tickets together.  I'm still targeting an
>>>>>>> initial version for Spark 2.3 which should happen sometime towards the 
>>>>>>> end
>>>>>>> of the year.
>>>>>>>
>>>>>>> There are some misconceptions in that stack overflow answer that I
>>>>>>> can correct.  Until we improve the Source API, You should still be able 
>>>>>>> to
>>>>>>> get exactly once processing by using the batchId that is passed to
>>>>>>> the Sink. We guarantee that the offsets present at any given batch
>>>>>>> ID will be the same across retries by recording this information in the
>>>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like
>>>>>>> DStreams does) and can be used even after upgrading Spark.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>>>>> dm.naume...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Cody
>>>>>>>>
>>>>>>>> Unfortunately, it seems to be there is no active development right
>>>>>>>> now. Maybe I can step in and help with it somehow?
>>>>>>>>
>>>>>>>> Dmitry
>>>>>>>>
>>>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>>>
>>>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>>>>>>
>>>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <
>>>>>>>>> dm.naume...@gmail.com> wrote:
>>>>>>>>> > Hi all,
>>>>>>>>> >
>>>>>>>>> > It started as a discussion in
>>>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>>>>>>> a-offsets-with-spark-structured-streaming-api.
>>>>>>>>> >
>>>>>>>>> > So the problem that there is no support in Public API to obtain
>>>>>>>>> the Kafka
>>>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets
>>>>>>>>> in external
>>>>>>>>> > storage in Custom Sink, you should :
>>>>>>>>> > 1) preserve topic, partition and offset across all transform
>>>>>>>>> operations of
>>>>>>>>> > Dataset (based on hard-coded Kafka schema)
>>>>>>>>> > 2) make a manual group by partition/offset with aggregate max
>>>>>>>>> offset
>>>>>>>>> >
>>>>>>>>> > Structured Streaming doc says "Every streaming source is assumed
>>>>>>>>> to have
>>>>>>>>> > offsets", so why it's not a part of Public API? What do you
>>>>>>>>> think about
>>>>>>>>> > supporting it?
>>>>>>>>> >
>>>>>>>>> > Dmitry
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to