Re: DataSourceWriter V2 Api questions
Hi, Thanks all for the comments and discussion regarding the API! It sounds like the current expectation for database systems is to populate a staging table in the tasks and the driver moves that data when commit is called. That would work for many usecases that our users have with the MongoDB connector, although for use it would be much slower than our v1 implementation. Many customers cite speed being paramount, that may be more from the reader perspective but it will be interesting to see the feedback. Another challenge I have is this approach requires all data to be in Spark and then written out to the database. With the current implementation moment users can update just parts of their documents in situ. That may have been a bad design decision but it came from customer requirements and means in that scenario I can't reliably use a staging table. I'm not sure if other connectors also allow that behaviour. Ross On Tue, Sep 11, 2018 at 5:23 AM Jungtaek Lim wrote: > IMHO that's up to how we would like to be strict about "exactly-once". > > Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > > Being said, there may be couple of alternatives other than the contract > Spark provides/requires, and I'd like to see how Spark community wants to > deal with others. Would we want to disallow alternatives, like "replay + > deduplicate write (per a batch/partition)" which ensures "eventually" > exactly-once but cannot ensure the contract? > > Btw, unless achieving exactly-once is light enough for given sink, I think > the sink should provide both at-least-once (also optimized for the > semantic) vs exactly-once, and let end users pick one. > > 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이 > 작성: > >> Why is atomic operations a requirement? I feel like doubling the amount >> of writes (with staging tables) is probably a tradeoff that the end user >> should make. >> >> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan wrote: >> >>> Regardless the API, to use Spark to write data atomically, it requires >>> 1. Write data distributedly, with a central coordinator at Spark driver. >>> 2. The distributed writers are not guaranteed to run together at the >>> same time. (This can be relaxed if we can extend the barrier scheduling >>> feature) >>> 3. The new data is visible if and only if all distributed writers >>> success. >>> >>> According to these requirements, I think using a staging table is the >>> most common way and maybe the only way. I'm not sure how 2PC can help, we >>> don't want users to read partial data, so we need a final step to commit >>> all the data together. >>> >>> For RDBMS data sources, I think a simple solution is to ask users to >>> coalesce the input RDD/DataFrame into one partition, then we don't need to >>> care about multi-client transaction. Or using a staging table like Ryan >>> described before. >>> >>> >>> >>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim wrote: >>> > And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase. I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that. We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch. So not an easy thing to integrate correctly. 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성: > >Well almost all relational databases you can move data in a > transactional way. That’s what transactions are for. > > It would work, but I suspect in most cases it would involve moving > data from temporary tables to the final tables > > Right now theres no mechanisms to let the individual tasks commit in a > two-phase manner (Not sure if the CommitCordinator might help). If such an > API is provided, the sources could use it as they wish (e.g. use XA > support > provided by mysql to implement it in a more efficient way than the driver > moving from temp tables to destination tables). > > Definitely there are complexities involved, but I am not sure if the > network partitioning comes into play here since the driver can act as the > co-ordinator and can run in HA mode. And regarding the issue that Jungtaek > brought up, 2PC doesn't require tasks to be running at the same time, we > need a mecha
Off Heap Memory
Hello, I recently start studying the Spark's memory management system. More spesifically I want to understand how spark use the off-Heap memory. Interanlly I saw, that there are two types of offHeap memory. (offHeapExecutionMemoryPool and offHeapStorageMemoryPool). How Spark use the offHeap memory ? How Spark use the offHeapExecutionMemoryPool and how the offHeapStorageMemoryPool ? Is there any good tutorial or a guide that explain internally the Spark Memory Management ? Thanks for your reply, -Iacovos - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: DataSourceWriter V2 Api questions
>Some being said it is exactly-once when the output is eventually exactly-once, whereas others being said there should be no side effect, like consumer shouldn't see partial write. I guess 2PC is former, since some partitions can commit earlier while other partitions fail to commit for some time. Yes its more about guaranteeing atomicity like all partitions eventually commit or none commits. The visibility of the data for the readers is orthogonal (e.g setting the isolation levels like serializable for XA) and in general its difficult to guarantee that data across partitions are visible at once. The approach like staging table and global commit works in a centralized set up but can be difficult to do in a distributed manner across partitions (e.g each partition output goes to a different database) On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim wrote: > IMHO that's up to how we would like to be strict about "exactly-once". > > Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > > Being said, there may be couple of alternatives other than the contract > Spark provides/requires, and I'd like to see how Spark community wants to > deal with others. Would we want to disallow alternatives, like "replay + > deduplicate write (per a batch/partition)" which ensures "eventually" > exactly-once but cannot ensure the contract? > > Btw, unless achieving exactly-once is light enough for given sink, I think > the sink should provide both at-least-once (also optimized for the > semantic) vs exactly-once, and let end users pick one. > > 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이 > 작성: > >> Why is atomic operations a requirement? I feel like doubling the amount >> of writes (with staging tables) is probably a tradeoff that the end user >> should make. >> >> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan wrote: >> >>> Regardless the API, to use Spark to write data atomically, it requires >>> 1. Write data distributedly, with a central coordinator at Spark driver. >>> 2. The distributed writers are not guaranteed to run together at the >>> same time. (This can be relaxed if we can extend the barrier scheduling >>> feature) >>> 3. The new data is visible if and only if all distributed writers >>> success. >>> >>> According to these requirements, I think using a staging table is the >>> most common way and maybe the only way. I'm not sure how 2PC can help, we >>> don't want users to read partial data, so we need a final step to commit >>> all the data together. >>> >>> For RDBMS data sources, I think a simple solution is to ask users to >>> coalesce the input RDD/DataFrame into one partition, then we don't need to >>> care about multi-client transaction. Or using a staging table like Ryan >>> described before. >>> >>> >>> >>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim wrote: >>> > And regarding the issue that Jungtaek brought up, 2PC doesn't require tasks to be running at the same time, we need a mechanism to take down tasks after they have prepared and bring up the tasks during the commit phase. I guess we already got into too much details here, but if it is based on client transaction Spark must assign "commit" tasks to the executor which task was finished "prepare", and if it loses executor it is not feasible to force committing. Staging should come into play for that. We should also have mechanism for "recovery": Spark needs to ensure it finalizes "commit" even in case of failures before starting a new batch. So not an easy thing to integrate correctly. 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성: > >Well almost all relational databases you can move data in a > transactional way. That’s what transactions are for. > > It would work, but I suspect in most cases it would involve moving > data from temporary tables to the final tables > > Right now theres no mechanisms to let the individual tasks commit in a > two-phase manner (Not sure if the CommitCordinator might help). If such an > API is provided, the sources could use it as they wish (e.g. use XA > support > provided by mysql to implement it in a more efficient way than the driver > moving from temp tables to destination tables). > > Definitely there are complexities involved, but I am not sure if the > network partitioning comes into play here since the driver can act as the > co-ordinator and can run in HA mode. And regarding the issue that Jungtaek > brought up, 2PC doesn't require tasks to be running at the same time, we > need a mechanism to take down tasks after they have prepared and bring up > the tasks during the commit phase. > > Most of the
Is CBO broken?
I was trying to enable CBO on one of our jobs (using Spark 2.3.1 with partitioned parquet data) but it seemed that the rowCount statistics were being ignored. I found this JIRA which seems to describe the same issue: https://issues.apache.org/jira/browse/SPARK-25185, but it has no response so far. Does CBO work with partitioned parquet data, or is this a known issue? Is there a workaround? Thanks, Emlyn -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: DataSourceWriter V2 Api questions
I'm still not sure how the staging table helps for databases which do not have such atomicity guarantees. For example in Cassandra if you wrote all of the data temporarily to a staging table, we would still have the same problem in moving the data from the staging table into the real table. We would likely have as similar a chance of failing and we still have no way of making the entire staging set simultaneously visible. On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan wrote: > >Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > Yes its more about guaranteeing atomicity like all partitions eventually > commit or none commits. The visibility of the data for the readers is > orthogonal (e.g setting the isolation levels like serializable for XA) and > in general its difficult to guarantee that data across partitions are > visible at once. The approach like staging table and global commit works in > a centralized set up but can be difficult to do in a distributed manner > across partitions (e.g each partition output goes to a different database) > > On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim wrote: > >> IMHO that's up to how we would like to be strict about "exactly-once". >> >> Some being said it is exactly-once when the output is eventually >> exactly-once, whereas others being said there should be no side effect, >> like consumer shouldn't see partial write. I guess 2PC is former, since >> some partitions can commit earlier while other partitions fail to commit >> for some time. >> >> Being said, there may be couple of alternatives other than the contract >> Spark provides/requires, and I'd like to see how Spark community wants to >> deal with others. Would we want to disallow alternatives, like "replay + >> deduplicate write (per a batch/partition)" which ensures "eventually" >> exactly-once but cannot ensure the contract? >> >> Btw, unless achieving exactly-once is light enough for given sink, I >> think the sink should provide both at-least-once (also optimized for the >> semantic) vs exactly-once, and let end users pick one. >> >> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이 >> 작성: >> >>> Why is atomic operations a requirement? I feel like doubling the amount >>> of writes (with staging tables) is probably a tradeoff that the end user >>> should make. >>> >>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan wrote: >>> Regardless the API, to use Spark to write data atomically, it requires 1. Write data distributedly, with a central coordinator at Spark driver. 2. The distributed writers are not guaranteed to run together at the same time. (This can be relaxed if we can extend the barrier scheduling feature) 3. The new data is visible if and only if all distributed writers success. According to these requirements, I think using a staging table is the most common way and maybe the only way. I'm not sure how 2PC can help, we don't want users to read partial data, so we need a final step to commit all the data together. For RDBMS data sources, I think a simple solution is to ask users to coalesce the input RDD/DataFrame into one partition, then we don't need to care about multi-client transaction. Or using a staging table like Ryan described before. On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim wrote: > > And regarding the issue that Jungtaek brought up, 2PC doesn't > require tasks to be running at the same time, we need a mechanism to take > down tasks after they have prepared and bring up the tasks during the > commit phase. > > I guess we already got into too much details here, but if it is based > on client transaction Spark must assign "commit" tasks to the executor > which task was finished "prepare", and if it loses executor it is not > feasible to force committing. Staging should come into play for that. > > We should also have mechanism for "recovery": Spark needs to ensure it > finalizes "commit" even in case of failures before starting a new batch. > > So not an easy thing to integrate correctly. > > 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성: > >> >Well almost all relational databases you can move data in a >> transactional way. That’s what transactions are for. >> >> It would work, but I suspect in most cases it would involve moving >> data from temporary tables to the final tables >> >> Right now theres no mechanisms to let the individual tasks commit in >> a two-phase manner (Not sure if the CommitCordinator might help). If such >> an API is provided, the sources could use it as they wish (e.g. use XA >> support provided by m
Re: DataSourceWriter V2 Api questions
So if Spark and the destination datastore are both non-transactional, you will have to resort to an external mechanism for “transactionality”. Here are some options for both RDBMS and non-transaction datastore destination. For now assuming that Spark is used in batch mode (and not streaming mode). RDBMS Options Use staging table as discussed in the thread. As an extension of the above, use partitioned destination tables and load data into a staging table and then use partition management to include the staging table into the partitioned table. This this implies a partition per Spark batch run. Non-transactional Datastore Options Use another metadata table. Load the data into a staging table equivalent or even Cassandra partition(s). Start the transaction by making a “start of transaction” entry into the metadata table along with partition keys to be populated. As part of Spark batch commit, update the metadata entry with appropriate details – e.g. partition load time, etc. In the event of a failed / incomplete batch, the metadata table entry will be incomplete and the corresponding partition keys can be dropped. So essentially you use the metadata table to load/drop/skip the data to be moved/retained into the final destination. Misc Another option is to use Spark to stage data into a filesystem (distributed, HDFS) and then use RDBMS utilities to transactionally load data into the destination table. From: Russell Spitzer Date: Tuesday, September 11, 2018 at 9:08 AM To: Arun Mahadevan Cc: Jungtaek Lim , Wenchen Fan , Reynold Xin , Ross Lawley , Ryan Blue , dev , Subject: Re: DataSourceWriter V2 Api questions I'm still not sure how the staging table helps for databases which do not have such atomicity guarantees. For example in Cassandra if you wrote all of the data temporarily to a staging table, we would still have the same problem in moving the data from the staging table into the real table. We would likely have as similar a chance of failing and we still have no way of making the entire staging set simultaneously visible. On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan mailto:ar...@apache.org>> wrote: >Some being said it is exactly-once when the output is eventually exactly-once, >whereas others being said there should be no side effect, like consumer >shouldn't see partial write. I guess 2PC is former, since some partitions can >commit earlier while other partitions fail to commit for some time. Yes its more about guaranteeing atomicity like all partitions eventually commit or none commits. The visibility of the data for the readers is orthogonal (e.g setting the isolation levels like serializable for XA) and in general its difficult to guarantee that data across partitions are visible at once. The approach like staging table and global commit works in a centralized set up but can be difficult to do in a distributed manner across partitions (e.g each partition output goes to a different database) On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim mailto:kabh...@gmail.com>> wrote: IMHO that's up to how we would like to be strict about "exactly-once". Some being said it is exactly-once when the output is eventually exactly-once, whereas others being said there should be no side effect, like consumer shouldn't see partial write. I guess 2PC is former, since some partitions can commit earlier while other partitions fail to commit for some time. Being said, there may be couple of alternatives other than the contract Spark provides/requires, and I'd like to see how Spark community wants to deal with others. Would we want to disallow alternatives, like "replay + deduplicate write (per a batch/partition)" which ensures "eventually" exactly-once but cannot ensure the contract? Btw, unless achieving exactly-once is light enough for given sink, I think the sink should provide both at-least-once (also optimized for the semantic) vs exactly-once, and let end users pick one. 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer mailto:russell.spit...@gmail.com>>님이 작성: Why is atomic operations a requirement? I feel like doubling the amount of writes (with staging tables) is probably a tradeoff that the end user should make. On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan mailto:cloud0...@gmail.com>> wrote: Regardless the API, to use Spark to write data atomically, it requires 1. Write data distributedly, with a central coordinator at Spark driver. 2. The distributed writers are not guaranteed to run together at the same time. (This can be relaxed if we can extend the barrier scheduling feature) 3. The new data is visible if and only if all distributed writers success. According to these requirements, I think using a staging table is the most common way and maybe the only way. I'm not sure how 2PC can help, we don't want users to read partial data, so we need a final step to commit all the data together. For RDBMS data sources, I think a simple solution is to ask users to coalesce
Re: DataSourceWriter V2 Api questions
That only works assuming that Spark is the only client of the table. It will be impossible to force an outside user to respect the special metadata table when reading so they will still see all of the data in transit. Additionally this would force the incoming data to only be written into new partitions which is not simple to do from a C* perspective as balancing the distribution of new rows would be non trivial. If we had to do something like this we would basically be forced to write to some disk format first and then when we move the data into C* we still have the same problem that we started with. On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh < jthak...@conversantmedia.com> wrote: > So if Spark and the destination datastore are both non-transactional, you > will have to resort to an external mechanism for “transactionality”. > > > > Here are some options for both RDBMS and non-transaction datastore > destination. > > For now assuming that Spark is used in batch mode (and not streaming mode). > > > > *RDBMS Options* > > Use staging table as discussed in the thread. > > > > As an extension of the above, use partitioned destination tables and load > data into a staging table and then use partition management to include the > staging table into the partitioned table. > > This this implies a partition per Spark batch run. > > > > *Non-transactional Datastore Options* > > Use another metadata table. > > Load the data into a staging table equivalent or even Cassandra > partition(s). > > Start the transaction by making a “start of transaction” entry into the > metadata table along with partition keys to be populated. > As part of Spark batch commit, update the metadata entry with appropriate > details – e.g. partition load time, etc. > In the event of a failed / incomplete batch, the metadata table entry will > be incomplete and the corresponding partition keys can be dropped. > > So essentially you use the metadata table to load/drop/skip the data to be > moved/retained into the final destination. > > > > *Misc* > > Another option is to use Spark to stage data into a filesystem > (distributed, HDFS) and then use RDBMS utilities to transactionally load > data into the destination table. > > > > > > *From: *Russell Spitzer > *Date: *Tuesday, September 11, 2018 at 9:08 AM > *To: *Arun Mahadevan > *Cc: *Jungtaek Lim , Wenchen Fan , > Reynold Xin , Ross Lawley , > Ryan Blue , dev , < > dbis...@us.ibm.com> > > > *Subject: *Re: DataSourceWriter V2 Api questions > > > > I'm still not sure how the staging table helps for databases which do not > have such atomicity guarantees. For example in Cassandra if you wrote all > of the data temporarily to a staging table, we would still have the same > problem in moving the data from the staging table into the real table. We > would likely have as similar a chance of failing and we still have no way > of making the entire staging set simultaneously visible. > > > > On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan wrote: > > >Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > > Yes its more about guaranteeing atomicity like all partitions eventually > commit or none commits. The visibility of the data for the readers is > orthogonal (e.g setting the isolation levels like serializable for XA) and > in general its difficult to guarantee that data across partitions are > visible at once. The approach like staging table and global commit works in > a centralized set up but can be difficult to do in a distributed manner > across partitions (e.g each partition output goes to a different database) > > > > On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim wrote: > > IMHO that's up to how we would like to be strict about "exactly-once". > > > > Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > > > > Being said, there may be couple of alternatives other than the contract > Spark provides/requires, and I'd like to see how Spark community wants to > deal with others. Would we want to disallow alternatives, like "replay + > deduplicate write (per a batch/partition)" which ensures "eventually" > exactly-once but cannot ensure the contract? > > > > Btw, unless achieving exactly-once is light enough for given sink, I think > the sink should provide both at-least-once (also optimized for the > semantic) vs exactly-once, and let end users pick one. > > > > 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이 > 작성: > > Why is atomic operations a requirement? I feel like doubling the amount of > writes
Speakers needed for Apache DC Roadshow
We need your help to make the Apache Washington DC Roadshow on Dec 4th a success. What do we need most? Speakers! We're bringing a unique DC flavor to this event by mixing Open Source Software with talks about Apache projects as well as OSS CyberSecurity, OSS in Government and and OSS Career advice. Please take a look at: http://www.apachecon.com/usroadshow18/ (Note: You are receiving this message because you are subscribed to one or more mailing lists at The Apache Software Foundation.) Rich, for the ApacheCon Planners -- rbo...@apache.org http://apachecon.com @ApacheCon - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org