Agree on the “constraints” when working with Cassandra.
But remember, this is a weak attempt to make two non-transactional systems 
appear to the outside world as a transactional system.
Scaffolding/plumbing/abstractions will have to be created in the form of say, a 
custom data access layer.

Anyway, Ross is trying to get some practices used by other adopters of the V2 
API while trying to implement a driver/connector for MongoDB.

Probably views can be used similar to partitions in mongoDB?
Essentially each batch load goes into a separate mongoDB table and will result 
in view redefinition after a successful load.
And finally to avoid too many tables in a view, you may have to come up with a 
separate process to merge the underlying tables on a periodic basis.
It gets messy and probably moves you towards a write-once only tables, etc.

Finally using views in a generic mongoDB connector may not be good and flexible 
enough.


From: Russell Spitzer <russell.spit...@gmail.com>
Date: Tuesday, September 11, 2018 at 9:58 AM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: Arun Mahadevan <ar...@apache.org>, Jungtaek Lim <kabh...@gmail.com>, 
Wenchen Fan <cloud0...@gmail.com>, Reynold Xin <r...@databricks.com>, Ross 
Lawley <ross.law...@gmail.com>, Ryan Blue <rb...@netflix.com>, dev 
<dev@spark.apache.org>, "dbis...@us.ibm.com" <dbis...@us.ibm.com>
Subject: Re: DataSourceWriter V2 Api questions

That only works assuming that Spark is the only client of the table. It will be 
impossible to force an outside user to respect the special metadata table when 
reading so they will still see all of the data in transit. Additionally this 
would force the incoming data to only be written into new partitions which is 
not simple to do from a C* perspective as balancing the distribution of new 
rows would be non trivial. If we had to do something like this we would 
basically be forced to write to some disk format first and then when we move 
the data into C* we still have the same problem that we started with.

On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
<russell.spit...@gmail.com<mailto:russell.spit...@gmail.com>>
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan <ar...@apache.org<mailto:ar...@apache.org>>
Cc: Jungtaek Lim <kabh...@gmail.com<mailto:kabh...@gmail.com>>, Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>>, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>>, Ross Lawley 
<ross.law...@gmail.com<mailto:ross.law...@gmail.com>>, Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>>, dev 
<dev@spark.apache.org<mailto:dev@spark.apache.org>>, 
<dbis...@us.ibm.com<mailto:dbis...@us.ibm.com>>

Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
<ar...@apache.org<mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guarantee that data across partitions are visible at once. The 
approach like staging table and global commit works in a centralized set up but 
can be difficult to do in a distributed manner across partitions (e.g each 
partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim 
<kabh...@gmail.com<mailto:kabh...@gmail.com>> wrote:
IMHO that's up to how we would like to be strict about "exactly-once".

Some being said it is exactly-once when the output is eventually exactly-once, 
whereas others being said there should be no side effect, like consumer 
shouldn't see partial write. I guess 2PC is former, since some partitions can 
commit earlier while other partitions fail to commit for some time.

Being said, there may be couple of alternatives other than the contract Spark 
provides/requires, and I'd like to see how Spark community wants to deal with 
others. Would we want to disallow alternatives, like "replay + deduplicate 
write (per a batch/partition)" which ensures "eventually" exactly-once but 
cannot ensure the contract?

Btw, unless achieving exactly-once is light enough for given sink, I think the 
sink should provide both at-least-once (also optimized for the semantic) vs 
exactly-once, and let end users pick one.

2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 
<russell.spit...@gmail.com<mailto:russell.spit...@gmail.com>>님이 작성:
Why is atomic operations a requirement? I feel like doubling the amount of 
writes (with staging tables) is probably a tradeoff that the end user should 
make.
On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
Regardless the API, to use Spark to write data atomically, it requires
1. Write data distributedly, with a central coordinator at Spark driver.
2. The distributed writers are not guaranteed to run together at the same time. 
(This can be relaxed if we can extend the barrier scheduling feature)
3. The new data is visible if and only if all distributed writers success.

According to these requirements, I think using a staging table is the most 
common way and maybe the only way. I'm not sure how 2PC can help, we don't want 
users to read partial data, so we need a final step to commit all the data 
together.

For RDBMS data sources, I think a simple solution is to ask users to coalesce 
the input RDD/DataFrame into one partition, then we don't need to care about 
multi-client transaction. Or using a staging table like Ryan described before.



On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim 
<kabh...@gmail.com<mailto:kabh...@gmail.com>> wrote:
> And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks 
> to be running at the same time, we need a mechanism to take down tasks after 
> they have prepared and bring up the tasks during the commit phase.

I guess we already got into too much details here, but if it is based on client 
transaction Spark must assign "commit" tasks to the executor which task was 
finished "prepare", and if it loses executor it is not feasible to force 
committing. Staging should come into play for that.

We should also have mechanism for "recovery": Spark needs to ensure it 
finalizes "commit" even in case of failures before starting a new batch.

So not an easy thing to integrate correctly.
2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 
<ar...@apache.org<mailto:ar...@apache.org>>님이 작성:
>Well almost all relational databases you can move data in a transactional way. 
>That’s what transactions are for.

It would work, but I suspect in most cases it would involve moving data from 
temporary tables to the final tables

Right now theres no mechanisms to let the individual tasks commit in a 
two-phase manner (Not sure if the CommitCordinator might help). If such an API 
is provided, the sources could use it as they wish (e.g. use XA support 
provided by mysql to implement it in a more efficient way than the driver 
moving from temp tables to destination tables).

Definitely there are complexities involved, but I am not sure if the network 
partitioning comes into play here since the driver can act as the co-ordinator 
and can run in HA mode. And regarding the issue that Jungtaek brought up, 2PC 
doesn't require tasks to be running at the same time, we need a mechanism to 
take down tasks after they have prepared and bring up the tasks during the 
commit phase.

Most of the sources would not need any of the above and just need a way to 
support Idempotent writes and like Ryan suggested we can enable this (if there 
are gaps in the current APIs).


On Mon, 10 Sep 2018 at 13:43, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Well almost all relational databases you can move data in a transactional way. 
That’s what transactions are for.

For just straight HDFS, the move is a pretty fast operation so while it is not 
completely transactional, the window of potential failure is pretty short for 
appends. For writers at the partition level it is fine because it is just 
renaming directory, which is atomic.

On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim 
<kabh...@gmail.com<mailto:kabh...@gmail.com>> wrote:
When network partitioning happens it is pretty OK for me to see 2PC not 
working, cause we deal with global transaction. Recovery should be hard thing 
to get it correctly though. I completely agree it would require massive changes 
to Spark.

What I couldn't find for underlying storages is moving data from staging table 
to final table in transactional way. I'm not fully sure but as I'm aware of, 
many storages would not support moving data, and even HDFS sink it is not 
strictly done in transactional way since we move multiple files with multiple 
operations. If coordinator just crashes it leaves partial write, and among 
writers and coordinator need to deal with ensuring it will not be going to be 
duplicated.

Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to implement 
"commit" (his reply didn't hit dev. mailing list though) but I'm not an expert 
of both twos and I couldn't still imagine it can deal with various crash cases.

2018년 9월 11일 (화) 오전 5:17, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>>님이 작성:
I don't think two phase commit would work here at all.

1. It'd require massive changes to Spark.

2. Unless the underlying data source can provide an API to coordinate commits 
(which few data sources I know provide something like that), 2PC wouldn't work 
in the presence of network partitioning. You can't defy the law of physics.

Really the most common and simple way I've seen this working is through staging 
tables and a final transaction to move data from staging table to final table.





On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
<kabh...@gmail.com<mailto:kabh...@gmail.com>> wrote:
I guess we all are aware of limitation of contract on DSv2 writer. Actually it 
can be achieved only with HDFS sink (or other filesystem based sinks) and other 
external storage are normally not feasible to implement it because there's no 
way to couple a transaction with multiple clients as well as coordinator can't 
take over transactions from writers to do the final commit.

XA is also not a trivial one to get it correctly with current execution model: 
Spark doesn't require writer tasks to run at the same time but to achieve 2PC 
they should run until end of transaction (closing client before transaction 
ends normally means aborting transaction). Spark should also integrate 2PC with 
its checkpointing mechanism to guarantee completeness of batch. And it might 
require different integration for continuous mode.

Jungtaek Lim (HeartSaVioR)

2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 
<ar...@apache.org<mailto:ar...@apache.org>>님이 작성:
In some cases the implementations may be ok with eventual consistency (and does 
not care if the output is written out atomically)

XA can be one option for datasources that supports it and requires atomicity 
but I am not sure how would one implement it with the current API.

May be we need to discuss improvements at the Datasource V2 API level (e.g. 
individual tasks would "prepare" for commit and once the driver receives 
"prepared" from all the tasks, a "commit" would be invoked at each of the 
individual tasks). Right now the responsibility of the final "commit" is with 
the driver and it may not always be possible for the driver to take over the 
transactions started by the tasks.


On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
<dbis...@us.ibm.com<mailto:dbis...@us.ibm.com>> wrote:
This is a pretty big challenge in general for data sources -- for the vast 
majority of data stores, the boundary of a transaction is per client. That is, 
you can't have two clients doing writes and coordinating a single transaction. 
That's certainly the case for almost all relational databases. Spark, on the 
other hand, will have multiple clients (consider each task a client) writing to 
the same underlying data store.

DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not 
sure how easy it is to implement this though :-)

Regards,
Dilip Biswal
Tel: 408-463-4980<tel:(408)%20463-4980>
dbis...@us.ibm.com<mailto:dbis...@us.ibm.com>


----- Original message -----
From: Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>>
To: Ryan Blue <rb...@netflix.com<mailto:rb...@netflix.com>>
Cc: ross.law...@gmail.com<mailto:ross.law...@gmail.com>, dev 
<dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Re: DataSourceWriter V2 Api questions
Date: Mon, Sep 10, 2018 10:26 AM

I don't think the problem is just whether we have a starting point for write. 
As a matter of fact there's always a starting point for write, whether it is 
explicit or implicit.

This is a pretty big challenge in general for data sources -- for the vast 
majority of data stores, the boundary of a transaction is per client. That is, 
you can't have two clients doing writes and coordinating a single transaction. 
That's certainly the case for almost all relational databases. Spark, on the 
other hand, will have multiple clients (consider each task a client) writing to 
the same underlying data store.

On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:
Ross, I think the intent is to create a single transaction on the driver, write 
as part of it in each task, and then commit the transaction once the tasks 
complete. Is that possible in your implementation?

I think that part of this is made more difficult by not having a clear starting 
point for a write, which we are fixing in the redesign of the v2 API. That will 
have a method that creates a Write to track the operation. That can create your 
transaction when it is created and commit the transaction when commit is called 
on it.

rb

On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Typically people do it via transactions, or staging tables.


On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
<ross.law...@gmail.com<mailto:ross.law...@gmail.com>> wrote:
Hi all,

I've been prototyping an implementation of the DataSource V2 writer for the 
MongoDB Spark Connector and I have a couple of questions about how its intended 
to be used with database systems. According to the Javadoc for 
DataWriter.commit():

"this method should still "hide" the written data and ask the DataSourceWriter 
at driver side to do the final commit via WriterCommitMessage"

Although, MongoDB now has transactions, it doesn't have a way to "hide" the 
data once it has been written. So as soon as the DataWriter has committed the 
data, it has been inserted/updated in the collection and is discoverable - 
thereby breaking the documented contract.

I was wondering how other databases systems plan to implement this API and meet 
the contract as per the Javadoc?

Many thanks

Ross


--
Ryan Blue
Software Engineer
Netflix


--------------------------------------------------------------------- To 
unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>

Reply via email to