Hi Shawn,

I'm not sure what you mean concretely by "realtime streaming data".  If
the data is in Arrow format, using Arrow IPC sounds most appropriate.
If the data is made of Python objects of various types, I'm not sure how
you're supposed to interpret and consume them on the Java side?
If the data has a deterministic structure, perhaps you can use something
such as Flatbuffers?

(of course, perhaps you need to combine Arrow IPC and flatbuffers if
your data is "mixed"... I don't know)

In any case, I don't think it's reasonable to expect that there exists a
framework to magically transfer and (most importantly) convert arbitrary
data from Python to Java.

Regards

Antoine.


Le 25/04/2019 à 13:56, Shawn Yang a écrit :
> Hi Antoine,
> Thanks, I'll try PEP 574 for python worker to python worker data transfer.
> But there is another question. In my scenario, the data is coming from java
> worker, and python worker is receiving streaming data from java. So pickle5
> is a great solution for python to python data transfer. But form java to
> python, there is still need a framework such as arrow to enable
> cross-language serialization for realtime streaming data. From the
> benchmark, it seems arrow is not appropriate
> for  realtime streaming data. So is there a better solution for this? Or I
> need use something such as flatbuffer to do my own?
> 
> On Thu, Apr 25, 2019 at 5:57 PM Antoine Pitrou <anto...@python.org> wrote:
> 
>>
>> Hi Shawn,
>>
>> So it seems that RecordBatch serialization is able to avoid copies,
>> otherwise there's no benefit to using Arrow over pickle.
>>
>> Perhaps would you like to try and use pickle5 with out-of-band buffers
>> in your benchmark.  See https://pypi.org/project/pickle5/
>>
>> Regards
>>
>> Antoine.
>>
>>
>> Le 25/04/2019 à 11:23, Shawn Yang a écrit :
>>> Hi Antoine,
>>> Here are the images:
>>> 1. use |UnionArray| benchmark:
>>>
>> https://user-images.githubusercontent.com/12445254/56651475-aaaea300-66bb-11e9-8b4f-4632e96bd079.png
>>>
>> https://user-images.githubusercontent.com/12445254/56651484-b5693800-66bb-11e9-9b1f-d004212e6aac.png
>>>
>> https://user-images.githubusercontent.com/12445254/56651490-b8fcbf00-66bb-11e9-8f01-ef4919b6af8b.png
>>> 2. use |RecordBatch|
>>>
>> https://user-images.githubusercontent.com/12445254/56629689-c9437880-6680-11e9-8756-02acb47fdb30.png
>>>
>>> Regards
>>> Shawn.
>>>
>>> On Thu, Apr 25, 2019 at 4:03 PM Antoine Pitrou <anto...@python.org
>>> <mailto:anto...@python.org>> wrote:
>>>
>>>
>>>     Hi Shawn,
>>>
>>>     Your images don't appear here.  It seems they weren't attached to
>> your
>>>     e-mail?
>>>
>>>     About serialization: I am still working on PEP 574 (*), which I hope
>>>     will be integrated in Python 3.8.  The standalone "pickle5" module is
>>>     also available as a backport.  Both Arrow and Numpy support it.  You
>> may
>>>     get different pickle performance using it, especially on large data.
>>>
>>>     (*) https://www.python.org/dev/peps/pep-0574/
>>>
>>>     Regards
>>>
>>>     Antoine.
>>>
>>>
>>>     Le 25/04/2019 à 05:19, Shawn Yang a écrit :
>>>     >
>>>     >     Motivate
>>>     >
>>>     > We want to use arrow as a general data serialization framework in
>>>     > distributed stream data processing. We are working on ray
>>>     > <https://github.com/ray-project/ray>, written in c++ in low-level
>> and
>>>     > java/python in high-level. We want to transfer streaming data
>> between
>>>     > java/python/c++ efficiently. Arrow is a great framework for
>>>     > cross-language data transfer. But it seems more appropriate for
>> batch
>>>     > columnar data. Is is appropriate for distributed stream data
>>>     processing?
>>>     > If not, will there be more support in stream data processing? Or is
>>>     > there something I miss?
>>>     >
>>>     >
>>>     >     Benchmark
>>>     >
>>>     > 1. if use |UnionArray|
>>>     > image.png
>>>     > image.png
>>>     > image.png
>>>     > 2. If use |RecordBatch|, the batch size need to be greater than
>> 50~200
>>>     > to have e better deserialization performance than pickle. But the
>>>     > latency won't be acceptable in streaming.
>>>     > image.png
>>>     >
>>>     > Seems neither is an appropriate way or is there a better way?
>>>     >
>>>     >
>>>     >     Benchmark code
>>>     >
>>>     > '''
>>>     > test arrow/pickle performance
>>>     > '''
>>>     > import pickle
>>>     > import pyarrow as pa
>>>     > import matplotlib.pyplot as plt
>>>     > import numpy as np
>>>     > import timeit
>>>     > import datetime
>>>     > import copy
>>>     > import os
>>>     > from collections import OrderedDict
>>>     > dir_path = os.path.dirname(os.path.realpath(__file__))
>>>     >
>>>     > def benchmark_ser(batches, number=10):
>>>     >     pickle_results = []
>>>     >     arrow_results = []
>>>     >     pickle_sizes = []
>>>     >     arrow_sizes = []
>>>     >     for obj_batch in batches:
>>>     >         pickle_serialize = timeit.timeit(
>>>     >             lambda: pickle.dumps(obj_batch,
>>>     protocol=pickle.HIGHEST_PROTOCOL),
>>>     >             number=number)
>>>     >         pickle_results.append(pickle_serialize)
>>>     >         pickle_sizes.append(len(pickle.dumps(obj_batch,
>>>     protocol=pickle.HIGHEST_PROTOCOL)))
>>>     >         arrow_serialize = timeit.timeit(
>>>     >             lambda: serialize_by_arrow_array(obj_batch),
>>>     number=number)
>>>     >         arrow_results.append(arrow_serialize)
>>>     >
>>  arrow_sizes.append(serialize_by_arrow_array(obj_batch).size)
>>>     >     return [pickle_results, arrow_results, pickle_sizes,
>> arrow_sizes]
>>>     >
>>>     > def benchmark_deser(batches, number=10):
>>>     >     pickle_results = []
>>>     >     arrow_results = []
>>>     >     for obj_batch in batches:
>>>     >         serialized_obj = pickle.dumps(obj_batch,
>>>     pickle.HIGHEST_PROTOCOL)
>>>     >         pickle_deserialize = timeit.timeit(lambda:
>>>     pickle.loads(serialized_obj),
>>>     >                                         number=number)
>>>     >         pickle_results.append(pickle_deserialize)
>>>     >         serialized_obj = serialize_by_arrow_array(obj_batch)
>>>     >         arrow_deserialize = timeit.timeit(
>>>     >             lambda: pa.deserialize(serialized_obj), number=number)
>>>     >         arrow_results.append(arrow_deserialize)
>>>     >     return [pickle_results, arrow_results]
>>>     >
>>>     > def serialize_by_arrow_array(obj_batch):
>>>     >     arrow_arrays = [pa.array(record) if not isinstance(record,
>>>     pa.Array) else record for record in obj_batch]
>>>     >     return pa.serialize(arrow_arrays).to_buffer()
>>>     >
>>>     >
>>>     > plot_dir = '{}/{}'.format(dir_path,
>>>     datetime.datetime.now().strftime('%m%d_%H%M_%S'))
>>>     > if not os.path.exists(plot_dir):
>>>     >     os.makedirs(plot_dir)
>>>     >
>>>     > def plot_time(pickle_times, arrow_times, batch_sizes, title,
>>>     filename):
>>>     >     fig, ax = plt.subplots()
>>>     >     fig.set_size_inches(10, 8)
>>>     >
>>>     >     bar_width = 0.35
>>>     >     n_groups = len(batch_sizes)
>>>     >     index = np.arange(n_groups)
>>>     >     opacity = 0.6
>>>     >
>>>     >     plt.bar(index, pickle_times, bar_width,
>>>     >             alpha=opacity, color='r', label='Pickle')
>>>     >
>>>     >     plt.bar(index + bar_width, arrow_times, bar_width,
>>>     >             alpha=opacity, color='c', label='Arrow')
>>>     >
>>>     >     plt.title(title, fontweight='bold')
>>>     >     plt.ylabel('Time (seconds)', fontsize=10)
>>>     >     plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10)
>>>     >     plt.legend(fontsize=10, bbox_to_anchor=(1, 1))
>>>     >     plt.tight_layout()
>>>     >     plt.yticks(fontsize=10)
>>>     >     plt.savefig(plot_dir + '/plot-' + filename + '.png',
>> format='png')
>>>     >
>>>     >
>>>     > def plot_size(pickle_sizes, arrow_sizes, batch_sizes, title,
>>>     filename):
>>>     >     fig, ax = plt.subplots()
>>>     >     fig.set_size_inches(10, 8)
>>>     >
>>>     >     bar_width = 0.35
>>>     >     n_groups = len(batch_sizes)
>>>     >     index = np.arange(n_groups)
>>>     >     opacity = 0.6
>>>     >
>>>     >     plt.bar(index, pickle_sizes, bar_width,
>>>     >             alpha=opacity, color='r', label='Pickle')
>>>     >
>>>     >     plt.bar(index + bar_width, arrow_sizes, bar_width,
>>>     >             alpha=opacity, color='c', label='Arrow')
>>>     >
>>>     >     plt.title(title, fontweight='bold')
>>>     >     plt.ylabel('Space (Byte)', fontsize=10)
>>>     >     plt.xticks(index + bar_width / 2, batch_sizes, fontsize=10)
>>>     >     plt.legend(fontsize=10, bbox_to_anchor=(1, 1))
>>>     >     plt.tight_layout()
>>>     >     plt.yticks(fontsize=10)
>>>     >     plt.savefig(plot_dir + '/plot-' + filename + '.png',
>> format='png')
>>>     >
>>>     > def get_union_obj():
>>>     >     size = 200
>>>     >     str_array = pa.array(['str-' + str(i) for i in range(size)])
>>>     >     int_array = pa.array(np.random.randn(size).tolist())
>>>     >     types = pa.array([0 for _ in range(size)]+[1 for _ in
>>>     range(size)], type=pa.int8())
>>>     >     offsets = pa.array(list(range(size))+list(range(size)),
>>>     type=pa.int32())
>>>     >     union_arr = pa.UnionArray.from_dense(types, offsets,
>>>     [str_array, int_array])
>>>     >     return union_arr
>>>     >
>>>     > test_objects_generater = [
>>>     >     lambda: np.random.randn(500),
>>>     >     lambda: np.random.randn(500).tolist(),
>>>     >     lambda: get_union_obj()
>>>     > ]
>>>     >
>>>     > titles = [
>>>     >     'numpy arrays',
>>>     >     'list of ints',
>>>     >     'union array of strings and ints'
>>>     > ]
>>>     >
>>>     > def plot_benchmark():
>>>     >     batch_sizes = list(OrderedDict.fromkeys(int(i) for i in
>>>     np.geomspace(1, 1000, num=25)))
>>>     >     for i in range(len(test_objects_generater)):
>>>     >         batches = [[test_objects_generater[i]() for _ in
>>>     range(batch_size)] for batch_size in batch_sizes]
>>>     >         ser_result = benchmark_ser(batches=batches)
>>>     >         plot_time(*ser_result[0:2], batch_sizes, 'serialization: '
>>>     + titles[i], 'ser_time'+str(i))
>>>     >         plot_size(*ser_result[2:], batch_sizes, 'serialization
>>>     byte size: ' + titles[i], 'ser_size'+str(i))
>>>     >         deser = benchmark_deser(batches=batches)
>>>     >         plot_time(*deser, batch_sizes, 'deserialization: ' +
>>>     titles[i], 'deser_time-'+str(i))
>>>     >
>>>     >
>>>     > if __name__ == "__main__":
>>>     >     plot_benchmark()
>>>     >
>>>     >
>>>     >     Question
>>>     >
>>>     > So if i want to use arrow  as data serialization framework in
>>>     > distributed stream data processing, what's the right way?
>>>     > Since streaming processing is a widespread scenario in
>>>     data processing,
>>>     > framework such as flink, spark structural streaming is becoming
>>>     more and
>>>     > more popular. Is there a possibility to add special support
>>>     > for streaming processing in arrow, such that we can also benefit
>> from
>>>     > cross-language and efficient memory layout.
>>>     >
>>>     >
>>>     >
>>>     >
>>>
>>
> 

Reply via email to