Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Ross Lawley
Hi,

Thanks all for the comments and discussion regarding the API!  It sounds
like the current expectation for database systems is to populate a staging
table in the tasks and the driver moves that data when commit is called.
That would work for many usecases that our users have with the MongoDB
connector, although for use it would be much slower than our v1
implementation. Many customers cite speed being paramount, that may be more
from the reader perspective but it will be interesting to see the feedback.

Another challenge I have is this approach requires all data to be in Spark
and then written out to the database. With the current implementation
moment users can update just parts of their documents in situ. That may
have been a bad design decision but it came from customer requirements and
means in that scenario I can't reliably use a staging table. I'm not sure
if other connectors also allow that behaviour.

Ross

On Tue, Sep 11, 2018 at 5:23 AM Jungtaek Lim  wrote:

> IMHO that's up to how we would like to be strict about "exactly-once".
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> exactly-once but cannot ensure the contract?
>
> Btw, unless achieving exactly-once is light enough for given sink, I think
> the sink should provide both at-least-once (also optimized for the
> semantic) vs exactly-once, and let end users pick one.
>
> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이
> 작성:
>
>> Why is atomic operations a requirement? I feel like doubling the amount
>> of writes (with staging tables) is probably a tradeoff that the end user
>> should make.
>>
>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan  wrote:
>>
>>> Regardless the API, to use Spark to write data atomically, it requires
>>> 1. Write data distributedly, with a central coordinator at Spark driver.
>>> 2. The distributed writers are not guaranteed to run together at the
>>> same time. (This can be relaxed if we can extend the barrier scheduling
>>> feature)
>>> 3. The new data is visible if and only if all distributed writers
>>> success.
>>>
>>> According to these requirements, I think using a staging table is the
>>> most common way and maybe the only way. I'm not sure how 2PC can help, we
>>> don't want users to read partial data, so we need a final step to commit
>>> all the data together.
>>>
>>> For RDBMS data sources, I think a simple solution is to ask users to
>>> coalesce the input RDD/DataFrame into one partition, then we don't need to
>>> care about multi-client transaction. Or using a staging table like Ryan
>>> described before.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim  wrote:
>>>
 > And regarding the issue that Jungtaek brought up, 2PC doesn't require
 tasks to be running at the same time, we need a mechanism to take down
 tasks after they have prepared and bring up the tasks during the commit
 phase.

 I guess we already got into too much details here, but if it is based
 on client transaction Spark must assign "commit" tasks to the executor
 which task was finished "prepare", and if it loses executor it is not
 feasible to force committing. Staging should come into play for that.

 We should also have mechanism for "recovery": Spark needs to ensure it
 finalizes "commit" even in case of failures before starting a new batch.

 So not an easy thing to integrate correctly.

 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성:

> >Well almost all relational databases you can move data in a
> transactional way. That’s what transactions are for.
>
> It would work, but I suspect in most cases it would involve moving
> data from temporary tables to the final tables
>
> Right now theres no mechanisms to let the individual tasks commit in a
> two-phase manner (Not sure if the CommitCordinator might help). If such an
> API is provided, the sources could use it as they wish (e.g. use XA 
> support
> provided by mysql to implement it in a more efficient way than the driver
> moving from temp tables to destination tables).
>
> Definitely there are complexities involved, but I am not sure if the
> network partitioning comes into play here since the driver can act as the
> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
> brought up, 2PC doesn't require tasks to be running at the same time, we
> need a mecha

Off Heap Memory

2018-09-11 Thread Jack Kolokasis

Hello,
    I recently start studying the Spark's memory management system. 
More spesifically I want to understand how spark use the off-Heap memory.
Interanlly I saw, that there are two types of offHeap memory. 
(offHeapExecutionMemoryPool and offHeapStorageMemoryPool).


    How Spark use the offHeap memory ?

    How Spark use the offHeapExecutionMemoryPool and how the 
offHeapStorageMemoryPool ?


    Is there any good tutorial or a guide that explain internally the 
Spark Memory Management ?


Thanks for your reply,
-Iacovos

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Arun Mahadevan
>Some being said it is exactly-once when the output is eventually
exactly-once, whereas others being said there should be no side effect,
like consumer shouldn't see partial write. I guess 2PC is former, since
some partitions can commit earlier while other partitions fail to commit
for some time.
Yes its more about guaranteeing atomicity like all partitions eventually
commit or none commits. The visibility of the data for the readers is
orthogonal (e.g setting the isolation levels like serializable for XA) and
in general its difficult to guarantee that data across partitions are
visible at once. The approach like staging table and global commit works in
a centralized set up but can be difficult to do in a distributed manner
across partitions (e.g each partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim  wrote:

> IMHO that's up to how we would like to be strict about "exactly-once".
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> exactly-once but cannot ensure the contract?
>
> Btw, unless achieving exactly-once is light enough for given sink, I think
> the sink should provide both at-least-once (also optimized for the
> semantic) vs exactly-once, and let end users pick one.
>
> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이
> 작성:
>
>> Why is atomic operations a requirement? I feel like doubling the amount
>> of writes (with staging tables) is probably a tradeoff that the end user
>> should make.
>>
>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan  wrote:
>>
>>> Regardless the API, to use Spark to write data atomically, it requires
>>> 1. Write data distributedly, with a central coordinator at Spark driver.
>>> 2. The distributed writers are not guaranteed to run together at the
>>> same time. (This can be relaxed if we can extend the barrier scheduling
>>> feature)
>>> 3. The new data is visible if and only if all distributed writers
>>> success.
>>>
>>> According to these requirements, I think using a staging table is the
>>> most common way and maybe the only way. I'm not sure how 2PC can help, we
>>> don't want users to read partial data, so we need a final step to commit
>>> all the data together.
>>>
>>> For RDBMS data sources, I think a simple solution is to ask users to
>>> coalesce the input RDD/DataFrame into one partition, then we don't need to
>>> care about multi-client transaction. Or using a staging table like Ryan
>>> described before.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim  wrote:
>>>
 > And regarding the issue that Jungtaek brought up, 2PC doesn't require
 tasks to be running at the same time, we need a mechanism to take down
 tasks after they have prepared and bring up the tasks during the commit
 phase.

 I guess we already got into too much details here, but if it is based
 on client transaction Spark must assign "commit" tasks to the executor
 which task was finished "prepare", and if it loses executor it is not
 feasible to force committing. Staging should come into play for that.

 We should also have mechanism for "recovery": Spark needs to ensure it
 finalizes "commit" even in case of failures before starting a new batch.

 So not an easy thing to integrate correctly.

 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성:

> >Well almost all relational databases you can move data in a
> transactional way. That’s what transactions are for.
>
> It would work, but I suspect in most cases it would involve moving
> data from temporary tables to the final tables
>
> Right now theres no mechanisms to let the individual tasks commit in a
> two-phase manner (Not sure if the CommitCordinator might help). If such an
> API is provided, the sources could use it as they wish (e.g. use XA 
> support
> provided by mysql to implement it in a more efficient way than the driver
> moving from temp tables to destination tables).
>
> Definitely there are complexities involved, but I am not sure if the
> network partitioning comes into play here since the driver can act as the
> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
> brought up, 2PC doesn't require tasks to be running at the same time, we
> need a mechanism to take down tasks after they have prepared and bring up
> the tasks during the commit phase.
>
> Most of the

Is CBO broken?

2018-09-11 Thread emlyn
I was trying to enable CBO on one of our jobs (using Spark 2.3.1 with
partitioned parquet data) but it seemed that the rowCount statistics were
being ignored. I found this JIRA which seems to describe the same issue:
https://issues.apache.org/jira/browse/SPARK-25185, but it has no response so
far.

Does CBO work with partitioned parquet data, or is this a known issue? Is
there a workaround?

Thanks,
Emlyn



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Russell Spitzer
I'm still not sure how the staging table helps for databases which do not
have such atomicity guarantees. For example in Cassandra if you wrote all
of the data temporarily to a staging table, we would still have the same
problem in moving the data from the staging table into the real table. We
would likely have as similar a chance of failing and we still have no way
of making the entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan  wrote:

> >Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
> Yes its more about guaranteeing atomicity like all partitions eventually
> commit or none commits. The visibility of the data for the readers is
> orthogonal (e.g setting the isolation levels like serializable for XA) and
> in general its difficult to guarantee that data across partitions are
> visible at once. The approach like staging table and global commit works in
> a centralized set up but can be difficult to do in a distributed manner
> across partitions (e.g each partition output goes to a different database)
>
> On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim  wrote:
>
>> IMHO that's up to how we would like to be strict about "exactly-once".
>>
>> Some being said it is exactly-once when the output is eventually
>> exactly-once, whereas others being said there should be no side effect,
>> like consumer shouldn't see partial write. I guess 2PC is former, since
>> some partitions can commit earlier while other partitions fail to commit
>> for some time.
>>
>> Being said, there may be couple of alternatives other than the contract
>> Spark provides/requires, and I'd like to see how Spark community wants to
>> deal with others. Would we want to disallow alternatives, like "replay +
>> deduplicate write (per a batch/partition)" which ensures "eventually"
>> exactly-once but cannot ensure the contract?
>>
>> Btw, unless achieving exactly-once is light enough for given sink, I
>> think the sink should provide both at-least-once (also optimized for the
>> semantic) vs exactly-once, and let end users pick one.
>>
>> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이
>> 작성:
>>
>>> Why is atomic operations a requirement? I feel like doubling the amount
>>> of writes (with staging tables) is probably a tradeoff that the end user
>>> should make.
>>>
>>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan  wrote:
>>>
 Regardless the API, to use Spark to write data atomically, it requires
 1. Write data distributedly, with a central coordinator at Spark driver.
 2. The distributed writers are not guaranteed to run together at the
 same time. (This can be relaxed if we can extend the barrier scheduling
 feature)
 3. The new data is visible if and only if all distributed writers
 success.

 According to these requirements, I think using a staging table is the
 most common way and maybe the only way. I'm not sure how 2PC can help, we
 don't want users to read partial data, so we need a final step to commit
 all the data together.

 For RDBMS data sources, I think a simple solution is to ask users to
 coalesce the input RDD/DataFrame into one partition, then we don't need to
 care about multi-client transaction. Or using a staging table like Ryan
 described before.



 On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim  wrote:

> > And regarding the issue that Jungtaek brought up, 2PC doesn't
> require tasks to be running at the same time, we need a mechanism to take
> down tasks after they have prepared and bring up the tasks during the
> commit phase.
>
> I guess we already got into too much details here, but if it is based
> on client transaction Spark must assign "commit" tasks to the executor
> which task was finished "prepare", and if it loses executor it is not
> feasible to force committing. Staging should come into play for that.
>
> We should also have mechanism for "recovery": Spark needs to ensure it
> finalizes "commit" even in case of failures before starting a new batch.
>
> So not an easy thing to integrate correctly.
>
> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성:
>
>> >Well almost all relational databases you can move data in a
>> transactional way. That’s what transactions are for.
>>
>> It would work, but I suspect in most cases it would involve moving
>> data from temporary tables to the final tables
>>
>> Right now theres no mechanisms to let the individual tasks commit in
>> a two-phase manner (Not sure if the CommitCordinator might help). If such
>> an API is provided, the sources could use it as they wish (e.g. use XA
>> support provided by m

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Thakrar, Jayesh
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan 
Cc: Jungtaek Lim , Wenchen Fan , 
Reynold Xin , Ross Lawley , Ryan 
Blue , dev , 
Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guarantee that data across partitions are visible at once. The 
approach like staging table and global commit works in a centralized set up but 
can be difficult to do in a distributed manner across partitions (e.g each 
partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim 
mailto:kabh...@gmail.com>> wrote:
IMHO that's up to how we would like to be strict about "exactly-once".

Some being said it is exactly-once when the output is eventually exactly-once, 
whereas others being said there should be no side effect, like consumer 
shouldn't see partial write. I guess 2PC is former, since some partitions can 
commit earlier while other partitions fail to commit for some time.

Being said, there may be couple of alternatives other than the contract Spark 
provides/requires, and I'd like to see how Spark community wants to deal with 
others. Would we want to disallow alternatives, like "replay + deduplicate 
write (per a batch/partition)" which ensures "eventually" exactly-once but 
cannot ensure the contract?

Btw, unless achieving exactly-once is light enough for given sink, I think the 
sink should provide both at-least-once (also optimized for the semantic) vs 
exactly-once, and let end users pick one.

2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 
mailto:russell.spit...@gmail.com>>님이 작성:
Why is atomic operations a requirement? I feel like doubling the amount of 
writes (with staging tables) is probably a tradeoff that the end user should 
make.
On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Regardless the API, to use Spark to write data atomically, it requires
1. Write data distributedly, with a central coordinator at Spark driver.
2. The distributed writers are not guaranteed to run together at the same time. 
(This can be relaxed if we can extend the barrier scheduling feature)
3. The new data is visible if and only if all distributed writers success.

According to these requirements, I think using a staging table is the most 
common way and maybe the only way. I'm not sure how 2PC can help, we don't want 
users to read partial data, so we need a final step to commit all the data 
together.

For RDBMS data sources, I think a simple solution is to ask users to coalesce 

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Russell Spitzer
That only works assuming that Spark is the only client of the table. It
will be impossible to force an outside user to respect the special metadata
table when reading so they will still see all of the data in transit.
Additionally this would force the incoming data to only be written into new
partitions which is not simple to do from a C* perspective as balancing the
distribution of new rows would be non trivial. If we had to do something
like this we would basically be forced to write to some disk format first
and then when we move the data into C* we still have the same problem that
we started with.

On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> So if Spark and the destination datastore are both non-transactional, you
> will have to resort to an external mechanism for “transactionality”.
>
>
>
> Here are some options for both RDBMS and non-transaction datastore
> destination.
>
> For now assuming that Spark is used in batch mode (and not streaming mode).
>
>
>
> *RDBMS Options*
>
> Use staging table as discussed in the thread.
>
>
>
> As an extension of the above, use partitioned destination tables and load
> data into a staging table and then use partition management to include the
> staging table into the partitioned table.
>
> This this implies a partition per Spark batch run.
>
>
>
> *Non-transactional Datastore Options*
>
> Use another metadata table.
>
> Load the data into a staging table equivalent or even Cassandra
> partition(s).
>
> Start the transaction by making a “start of transaction” entry into the
> metadata table along with partition keys to be populated.
> As part of Spark batch commit, update the metadata entry with appropriate
> details – e.g. partition load time, etc.
> In the event of a failed / incomplete batch, the metadata table entry will
> be incomplete and the corresponding partition keys can be dropped.
>
> So essentially you use the metadata table to load/drop/skip the data to be
> moved/retained into the final destination.
>
>
>
> *Misc*
>
> Another option is to use Spark to stage data into a filesystem
> (distributed, HDFS) and then use RDBMS utilities to transactionally load
> data into the destination table.
>
>
>
>
>
> *From: *Russell Spitzer 
> *Date: *Tuesday, September 11, 2018 at 9:08 AM
> *To: *Arun Mahadevan 
> *Cc: *Jungtaek Lim , Wenchen Fan ,
> Reynold Xin , Ross Lawley ,
> Ryan Blue , dev , <
> dbis...@us.ibm.com>
>
>
> *Subject: *Re: DataSourceWriter V2 Api questions
>
>
>
> I'm still not sure how the staging table helps for databases which do not
> have such atomicity guarantees. For example in Cassandra if you wrote all
> of the data temporarily to a staging table, we would still have the same
> problem in moving the data from the staging table into the real table. We
> would likely have as similar a chance of failing and we still have no way
> of making the entire staging set simultaneously visible.
>
>
>
> On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan  wrote:
>
> >Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Yes its more about guaranteeing atomicity like all partitions eventually
> commit or none commits. The visibility of the data for the readers is
> orthogonal (e.g setting the isolation levels like serializable for XA) and
> in general its difficult to guarantee that data across partitions are
> visible at once. The approach like staging table and global commit works in
> a centralized set up but can be difficult to do in a distributed manner
> across partitions (e.g each partition output goes to a different database)
>
>
>
> On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim  wrote:
>
> IMHO that's up to how we would like to be strict about "exactly-once".
>
>
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
>
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> exactly-once but cannot ensure the contract?
>
>
>
> Btw, unless achieving exactly-once is light enough for given sink, I think
> the sink should provide both at-least-once (also optimized for the
> semantic) vs exactly-once, and let end users pick one.
>
>
>
> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이
> 작성:
>
> Why is atomic operations a requirement? I feel like doubling the amount of
> writes

Speakers needed for Apache DC Roadshow

2018-09-11 Thread Rich Bowen
We need your help to make the Apache Washington DC Roadshow on Dec 4th a 
success.


What do we need most? Speakers!

We're bringing a unique DC flavor to this event by mixing Open Source 
Software with talks about Apache projects as well as OSS CyberSecurity, 
OSS in Government and and OSS Career advice.


Please take a look at: http://www.apachecon.com/usroadshow18/

(Note: You are receiving this message because you are subscribed to one 
or more mailing lists at The Apache Software Foundation.)


Rich, for the ApacheCon Planners

--
rbo...@apache.org
http://apachecon.com
@ApacheCon

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org