Hi Andrew, Thanks for your wording suggestions! I've merged it.
Here is the latest document: http://crossbow.voltrondata.com/pr_docs/35178/format/Flight.html#downloading-data If there is no more comment in a few days, I'll start voting on this proposal. Thanks, -- kou In <cafhtnryvm_f0j8vsqh5vmfpguev4ci1nnen-25dl7r5qk0w...@mail.gmail.com> "Re: [DISCUSS][Format][Flight] Ordered data support" on Fri, 28 Apr 2023 08:35:34 -0400, Andrew Lamb <al...@influxdata.com> wrote: > Thank you for the clarification. > > The point I was missing was that this flag is instructing the FlightClient > how to present the results to the client application, rather than specific > properties of the underlying stream. > > I can see the value of returning result streams in a specific order > (by endpoint) or also being able to retrieve the streams from the endpoints > in any order (and potentially interleave the results from the endpoint as > they arrive) > > I left some suggestions on clarifying the wording on [1] that might help > avoid figure confusion > > Andrew > > [1] https://github.com/apache/arrow/pull/35178 > > On Fri, Apr 28, 2023 at 1:02 AM David Li <lidav...@apache.org> wrote: > >> For a lot of partitions - you could have a small number of threads >> consuming a queue of partitions (and deciding whether you need to >> sequence/renumber their outputs or not), much like what Acero does with a >> FileSystemDataset. >> >> Note that the Flight client itself doesn't do any of this (perhaps it >> should!); it's clients of Flight that have to deal with this. (...that's a >> bit confusing) >> >> On Fri, Apr 28, 2023, at 19:06, Weston Pace wrote: >> > Thank you both for the extra information. Acero couldn't actually merge >> > the streams today, I was thinking more of datafusion and velox which >> would >> > often want to keep the streams separate, especially if there was some >> kind >> > of filtering or transformation that could be applied before applying a >> > sorted merge. >> > >> > However, I also very much agree that both scenarios are valid. First, if >> > there are a lot of partitions (e.g. far more than the # of parallelism >> > units) then you probably don't want parallel paths for all of them. >> > >> > Second, as you said, simpler clients (e.g. those where all filtering is >> > down downstream, or those that don't need any filtering at all) will >> > appreciate flight's ability to merge for them. It makes the client more >> > complex but given that clients are already doing this to some extent it >> > seems worthwhile. >> > >> > On Thu, Apr 27, 2023 at 7:45 PM David Li <lidav...@apache.org> wrote: >> > >> >> In addition to Kou's response: >> >> >> >> The individual endpoints have always represented a subset of a single >> >> stream of data. So each endpoint in a FlightInfo is a partition of the >> >> overall result set. >> >> >> >> Not all clients want to deal with reading all the Flight streams >> >> themselves and may want a single stream of data. (For example: ADBC >> exposes >> >> both paths. The JDBC driver also has to deal with this.) So some client >> >> libraries have to deal with the question of whether to read in parallel >> and >> >> whether to keep the result in order or not. A more advanced use case, >> like >> >> Acero, would probably read the endpoints itself and could use this flag >> to >> >> decide how to merge the streams. >> >> >> >> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote: >> >> > Hi, >> >> > >> >> >> This seems of very limited value if, for example, if the user desired >> >> DESC >> >> >> order, then the endpoint would return >> >> >> >> >> >> Endpoint 1: (C, B, A) >> >> >> Endpoint 2: (F, E, D) >> >> > >> >> > As David said, the server returns >> >> > >> >> > Endpoint 2: (F, E, D) >> >> > Endpoint 1: (C, B, A) >> >> > >> >> > in this case. >> >> > >> >> > Here is an use case I think: >> >> > >> >> > A system has time series data. Each node in the system has >> >> > data for one day. If a client requests "SELECT * FROM data >> >> > WHERE server = 'server1' ORDER BY created_at DESC", the >> >> > system returns the followings: >> >> > >> >> > Endpoint 20230428: (DATA_FOR_2023_04_28) >> >> > Endpoint 20230427: (DATA_FOR_2023_04_27) >> >> > Endpoint 20230426: (DATA_FOR_2023_04_26) >> >> > ... >> >> > >> >> > If we have the "ordered" flag, the client can assume that >> >> > received data are sorted. In other words, if the client >> >> > reads data from Endpoint 20230428 -> Endpoint 20230427 -> >> >> > Endpoint 20230426, the data the client read is sorted. >> >> > >> >> > If we don't have the "ordered" flag and we use "the relative >> >> > ordering of data from different endpoints is implementation >> >> > defined", we can't implement a general purpose Flight based >> >> > client library (Flight SQL based client library, Flight SQL >> >> > based ADBC driver and so on). The client library will have >> >> > the following code: >> >> > >> >> > # TODO: How to detect server_type? >> >> > if server_type == "DB1" >> >> > # DB1 returns ordered result. >> >> > endpoints.each do |endpoint| >> >> > yield(endpoints.read) >> >> > end >> >> > else >> >> > # Other DBs doesn't return ordered result. >> >> > # So, we read data in parallel for performance. >> >> > threads = endpoints.collect do |endpoint| >> >> > Thread.new do >> >> > yield(endpoints.read) >> >> > end >> >> > end >> >> > threads.each do |thread| >> >> > thread.join >> >> > end >> >> > end >> >> > >> >> > The client library needs to add 'or server_type == "DB2"' to >> >> > 'if server_type == "DB1"' when DB2 also adds support for >> >> > ordered result. If DB2 2.0 or later is only ordered result >> >> > ready, the client library needs more condition 'or >> >> > (server_type == "DB2" and server_version > 2.0)'. >> >> > >> >> > So I think that the "ordered" flag is useful. >> >> > >> >> > >> >> > Thanks, >> >> > -- >> >> > kou >> >> > >> >> > In <CAFhtnRxzMaoqmzWPkqsLoJZW5jmx= >> d_i9ojd9xy1ydkgkgz...@mail.gmail.com> >> >> > "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr >> >> > 2023 10:55:32 -0400, >> >> > Andrew Lamb <al...@influxdata.com> wrote: >> >> > >> >> >> I wonder if we have considered simply removing the statement "There >> is >> >> no >> >> >> ordering defined on endpoints. Hence, if the returned data has an >> >> ordering, >> >> >> it should be returned in a single endpoint." and replacing it with >> >> >> something that says "the relative ordering of data from different >> >> endpoints >> >> >> is implementation defined" >> >> >> >> >> >> I am struggling to come up with a concrete usecase for the "ordered" >> >> flag. >> >> >> >> >> >> The ticket references "distributed sort" but most distributed sort >> >> >> algorithms I know of would produce multiple sorted streams that need >> to >> >> be >> >> >> merged together. For example >> >> >> >> >> >> Endpoint 1: (B, C, D) >> >> >> Endpoint 2: (A, E, F) >> >> >> >> >> >> It is not clear how the "ordered" flag would help here >> >> >> >> >> >> If the intent is somehow to signal the client it doesn't have to >> merge >> >> >> (e.g. with data like) >> >> >> >> >> >> Endpoint 1: (A, B, C) >> >> >> Endpoint 2: (D, E, F) >> >> >> >> >> >> This seems of very limited value if, for example, if the user desired >> >> DESC >> >> >> order, then the endpoint would return >> >> >> >> >> >> Endpoint 1: (C, B, A) >> >> >> Endpoint 2: (F, E, D) >> >> >> >> >> >> Which doesn't seem to conform to the updated definition >> >> >> >> >> >> Andrew >> >> >> >> >> >> >> >> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <k...@clear-code.com> >> >> wrote: >> >> >> >> >> >>> Hi, >> >> >>> >> >> >>> I would like to propose adding support for ordered data to >> >> >>> Apache Arrow Flight. If anyone has comments for this >> >> >>> proposal, please share them at here or the issue for this >> >> >>> proposal: https://github.com/apache/arrow/issues/34852 >> >> >>> >> >> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight >> >> >>> SQL/ADBC enhancements": >> >> >>> >> >> >>> https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp >> >> >>> >> >> >>> See also the "Flight RPC: Ordered Data" section in the >> >> >>> design document for the proposals: >> >> >>> >> >> >>> >> >> >>> >> >> >> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit# >> >> >>> >> >> >>> Background: >> >> >>> >> >> >>> Currently, the endpoints within a FlightInfo explicitly have >> >> >>> no ordering. >> >> >>> >> >> >>> This is unnecessarily limiting. Systems can and do implement >> >> >>> distributed sorts, but they can't reflect this in the >> >> >>> current specification. >> >> >>> >> >> >>> Proposal: >> >> >>> >> >> >>> Add a flag to FlightInfo. If the flag is set, the client may >> >> >>> assume that the data is sorted in the same order as the >> >> >>> endpoints. Otherwise, the client cannot make any assumptions >> >> >>> (as before). >> >> >>> >> >> >>> This is a compatible change because the client can just >> >> >>> ignore the flag. >> >> >>> >> >> >>> Implementation: >> >> >>> >> >> >>> https://github.com/apache/arrow/pull/35178 is an >> >> >>> implementation of this proposal. The pull requests has the >> >> >>> followings: >> >> >>> >> >> >>> 1. Format changes: >> >> >>> >> >> >>> >> >> >> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba >> >> >>> * format/Flight.proto >> >> >>> >> >> >>> 2. Documentation changes: >> >> >>> >> >> >>> >> >> >> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89 >> >> >>> * docs/source/format/Flight.rst >> >> >>> >> >> >>> 3. The C++ implementation and an integration test: >> >> >>> * cpp/src/arrow/flight/ >> >> >>> >> >> >>> 4. The Java implementation and an integration test (thanks to David >> >> Li!): >> >> >>> * java/flight/ >> >> >>> >> >> >>> 5. The Go implementation and an integration test: >> >> >>> * go/arrow/flight/ >> >> >>> * go/arrow/internal/flight_integration/ >> >> >>> >> >> >>> Next: >> >> >>> >> >> >>> I'll start a vote for this proposal after we reach a consensus >> >> >>> on this proposal. >> >> >>> >> >> >>> It's the standard process for format change. >> >> >>> See also: >> >> >>> >> >> >>> * [VOTE] Formalize how to change format >> >> >>> https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c >> >> >>> * GH-35084: [Docs][Format] Add how to change format specification >> >> >>> https://github.com/apache/arrow/pull/35174 >> >> >>> >> >> >>> >> >> >>> Thanks, >> >> >>> -- >> >> >>> kou >> >> >>> >> >> >>