Hi Hongze,

I am not too familiar with distributed systems in general, but I did
work on using the Arrow Dataset API in the python Dask library which
can work in a distributed way (https://dask.org/).

For dask, we used the second idea of sending serialized data to the
workers, but on the level of the dataset fragments, and not tasks.
So here, I create a Dataset object on machine 1, get the Dataset's
fragments (potentially already filtered), and then send the fragments
to the different workers (in the pyarrow interface, fragments are
already serializable, in contrast to scan tasks), and then on each
machine a scan is done for that fragment (and then afterwards the
results are combined, but in this case this is handled by dask's
scheduler and dataframe objects).

See https://github.com/dask/dask/issues/6174,
https://github.com/dask/dask/pull/6172 and
https://github.com/dask/dask/pull/6346 for some related github issues
and PRs about this.

I took the route of using fragments, as this seemed easier to achieve
(it was relatively straightforward to make fragments serializable, as
you note for ScanTasks this might be more difficult). But I suppose
this approach will only work if your fragments are small enough (eg
for Parquet, we can split it up into different fragments for each row
group in case you have huge Parquet files, and want to distribute
reading one file over different workers).
So it might be interesting to also explore the serialization of
ScanTasks (but not directly sure what the blockers would be for this).

Best,
Joris

On Tue, 21 Jul 2020 at 09:57, Hongze Zhang <notify...@126.com> wrote:
>
> Hi all,
>
> Does anyone ever try using Arrow Dataset API in a distributed system? E.g. 
> create scan tasks in machine 1, then send and execute these tasks from 
> machine 2, 3, 4.
>
> So far I think a possible workaround is to:
>
> 1. Create Dataset on machine 1;
> 2. Call Scan(), collect all scan tasks from scan task iterator;
> 3. Say we have 5 tasks with number 1, 2, 3, 4, 5 here, and we decide to run 
> task 1, 2 on machine 2, run task 3, 4, 5 on machine 3;
> 4. Send the target task numbers to machine 2, 3 respectively;
> 5. Create Dataset with the same configuration on machine 2 and 3, and Call 
> Scan() to create 5 tasks for each machine;
> 6. On machine 2, run task 1, 2, skip 3, 4, 5
> 7. On machine 3, skip task 1, 2, run 3, 4, 5
>
> This should work correctly only if we assume that the method 
> `Dataset::Scan()` returns exactly the same task iterator on different 
> machines. And not sure if unnecessary overheads will be brought during the 
> process, alter all we'll run the scan method N times for N machines.
>
> A better solution I could think about is to make scan tasks serializable so 
> we could distribute them directly to machines. Currently they don't seem to 
> be designed in such way since we allow contextual stuffs to be used to create 
> the tasks, e.g. the opening readers in ParquetScanTask[1]. At the same time a 
> built-in ser/de mechanism will be needed. Anyway a bunch of work has to be 
> done.
>
> So far I am not sure which way is more reasonable or there is a better one 
> than both. Any thoughts please let me know.
>
> Best,
> Hongze
>
> [1] 
> https://github.com/apache/arrow/blob/c09a82a388e79ddf4377f44ecfe515604f147270/cpp/src/arrow/dataset/file_parquet.cc#L52-L74

Reply via email to