Sure, I'm kinda new to multiprocessing so I thought it was perhaps something 
I've missed on my own part.

Thanks,
Eli

Sent with [ProtonMail](https://protonmail.com) Secure Email.

> -------- Original Message --------
> Subject: Re: Trying to figure out how to use multiprocessing with HdfsClient
> Local Time: January 11, 2018 3:21 AM
> UTC Time: January 11, 2018 1:21 AM
> From: wesmck...@gmail.com
> To: dev@arrow.apache.org, Eli <h5r...@protonmail.ch>
>
> The HadoopFileSystem appears to not be safe for multiprocessing. I'm
> opening a bug report here:
>
> https://issues.apache.org/jira/browse/ARROW-1986
>
> In general, when things don't work as you expect and seems like it may
> be a bug, please open a JIRA.
>
> Thanks,
> Wes
>
> On Wed, Jan 10, 2018 at 3:42 AM, Eli h5r...@protonmail.ch wrote:
>
>> Hi,
>> I'm trying to paralellize reading columns from a parquet file, to serialize 
>> it back to standard format in the quickest way.
>> I'm currently using pool.map(), such that a reader/binarizer toplevel 
>> accepts the filepath, column to be read and some other data as a tuple.
>> It looks something like this:
>> (prettier coloring on stackoverflow -
>> [https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)](https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)
>> import
>> pyarrow
>> as
>> pa
>> import
>> pyarrow
>> .
>> parquet
>> as
>> pq
>> from
>> multiprocessing
>> import
>> Pool
>> def
>> binarizer
>> (
>> file_data_tuple
>> ):
>> ''' Read a Parquet column a file, binarize and return'''
>> path
>> ,
>> col_name
>> ,
>> col_meta
>> ,
>> native
>> =
>> file_data_tuple
>> if
>> not
>> native
>> :
>>
>> Either this or using a top level hdfs_con
>>
>> hdfs_con
>> =
>> pa
>> .
>> hdfs
>> .
>> connect
>> (*
>> hadoop_params
>> )
>> read_pq
>> =
>> pq
>> .
>> read_table
>> if
>> native
>> else
>> hdfs_con
>> .
>> read_parquet
>>
>> Read the column, serialize and return
>>
>> arrow_col
>> =
>> read_pq
>> (
>> filepath
>> ,
>> columns
>> =
>> (
>> col_name
>> ,))
>> bin_col
>> =
>> imported_binarizng_function
>> (
>> arrow_col
>> )
>> return
>> bin_col
>> def
>> read_binarize_parallel
>> (
>> filepaths
>> ):
>> ''' Setup parallel reading and binarizing of a parquet file'''
>>
>> list of tuples containing the filepath, column name, meta, and mode
>>
>> pool_params
>> =
>> [(),..]
>> pool
>> =
>> Pool
>> ()
>> for
>> file
>> in
>> filepaths
>> :
>> binary_columns
>> =
>> pool
>> .
>> map
>> (
>> binarizer
>> ,
>> pool_params
>> )
>> chunk
>> =
>> b
>> ''
>> .
>> join
>> (
>> binary_columns
>> )
>> send_over_socket
>> (
>> chunk
>> )
>> When this is used in "native" mode, reading with pyarrow from the local 
>> disk, this works fine and saves a nice amount of time ( a little less than 
>> X4 on machine) for the binarizing part.
>> However, when I try to do the same but read from hdfs, I get the following 
>> error stream:
>> (Also less un-pretty on stackoverflow)
>> [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message 
>> of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing 
>> required fields: callId, status [libprotobuf ERROR 
>> google/protobuf/message_lite.cc:123] Can't parse message of type 
>> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required 
>> fields: callId, status [libprotobuf ERROR 
>> google/protobuf/message_lite.cc:123] Can't parse message of type 
>> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required 
>> fields: callId, status [libprotobuf ERROR 
>> google/protobuf/message_lite.cc:123] Can't parse message of type 
>> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required 
>> fields: callId, status 2018-01-09 21:41:47.939006, p10007, 
>> th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server 
>> "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot find pending 
>> call: id = 3. @ Unknown
>>
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ 
>>> Unknown @ Unknown
>>> @
>>> Unknown
>>> 2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry 
>>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>>> 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call 
>>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: 
>>> HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol 
>>> mismatch: RPC channel cannot parse response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown @ Unknown
>>> @
>>> Unknown
>>> @
>>> 2018
>>>
>>> -
>>>
>>> 01
>>>
>>> -
>>>
>>> 09
>>> 21
>>> :
>>> 41
>>> :
>>> 47.939406
>>> ,
>>> p10008
>>> ,
>>> th139965275871040
>>> ,
>>> ERROR
>>> Failed
>>> to invoke RPC call
>>> "getFileInfo"
>>> on server
>>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>>> response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown
>>> @
>>> Unknown
>>> 2018
>>>
>>> -
>>>
>>> 01
>>>
>>> -
>>>
>>> 09
>>> 21
>>> :
>>> 41
>>> :
>>> 47.939422
>>> ,
>>> p10013
>>> ,
>>> th139965275871040
>>> ,
>>> ERROR
>>> Failed
>>> to invoke RPC call
>>> "getFileInfo"
>>> on server
>>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>>> response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown
>>> @
>>> Unknown
>>> @
>>> 2018
>>>
>>> -
>>>
>>> 01
>>>
>>> -
>>>
>>> 09
>>> 21
>>> :
>>> 41
>>> :
>>> 47.939431
>>> ,
>>> p10009
>>> ,
>>> th139965275871040
>>> ,
>>> ERROR
>>> Failed
>>> to invoke RPC call
>>> "getFileInfo"
>>> on server
>>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>>> response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown
>>> @
>>> Unknown
>>> @
>>> @
>>> Unknown
>>> Unknown
>>> 2018
>>>
>>> -
>>>
>>> 01
>>>
>>> -
>>>
>>> 09
>>> 21
>>> :
>>> 41
>>> :
>>> 47.939457
>>> ,
>>> p10012
>>> ,
>>> th139965275871040
>>> ,
>>> ERROR
>>> Failed
>>> to invoke RPC call
>>> "getFileInfo"
>>> on server
>>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>>> response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown @ Unknown
>>> @
>>> Unknown
>>> @
>>> Unknown
>>> Unknown
>>> @
>>> Unknown
>>> binarizing process filepath
>>> :
>>> /
>>> parquet_430mb
>>> /
>>> 5e6
>>> .
>>> parquet
>>> @
>>> Unknown
>>> Unknown
>>> @
>>> Unknown
>>> @
>>> Unknown
>>> @
>>> Unknown
>>> 2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry 
>>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
>>> 2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry 
>>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>>> 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call 
>>> "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, 
>>> p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on 
>>> server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, 
>>> th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server 
>>> "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, 
>>> ERROR Failed to invoke RPC call "getFileInfo" on server 
>>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>>> response header. @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown
>>> @
>>> Unknown
>>> 2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry 
>>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>>> 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call 
>>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: 
>>> HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server 
>>> "192.168.0.101:9000" @ Unknown @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown
>>> @
>>> Unknown
>>> Caused
>>> by
>>> TcpSocket
>>> .
>>> cpp
>>> :
>>> 127
>>> :
>>> HdfsNetworkException
>>> :
>>> Write
>>> 124
>>> bytes failed to
>>> "192.168.0.101:9000"
>>> :
>>> (
>>> errno
>>> :
>>> 32
>>> )
>>> Broken
>>> pipe @ Unknown @ Unknown
>>> @
>>> Unknown
>>> @
>>> arrow
>>> ::
>>> io
>>> ::
>>> HadoopFileSystem
>>> ::
>>> GetPathInfo
>>> (
>>> std
>>> ::
>>> string const
>>> &,
>>> arrow
>>> ::
>>> io
>>> ::
>>> HdfsPathInfo
>>> *)
>>> @
>>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>> (
>>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>> *,
>>> _object*, arrow::io::HdfsPathInfo*) @ 
>>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>>> Unknown @ Unknown
>>> @
>>> Unknown
>>> 2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry 
>>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 
>>> --------------------------------------------------------------------------- 
>>> ArrowIOError Traceback (most recent call last)
>>> /home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 
>>> sqparquet filepath:', filepath) 375 params_with_path_and_mode = 
>>> [col_params+(filepath, native) for col_params in pool_params] --> 376 
>>> bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got 
>>> ('map complete') 378 num_rows = bin_col[0][2]
>>> /usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, 
>>> chunksize) 249 ''' 250 assert self._state == RUN --> 251 return 
>>> self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, 
>>> func, iterable, chunksize=1):
>>> /usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 
>>> return self._value 557 else: --> 558 raise self._value 559 560 def 
>>> _set(self, i, obj):
>>> ArrowIOError: HDFS: GetPathInfo failed
>>
>> I'd be happy for any feedback on how to proceed to achieve some gains over 
>> the hadoop read as well.
>> I'm not a Python expert so pardon any noobness that I may have incurred with 
>> this question :)
>> Thanks!
>> Eli
>> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to