The HadoopFileSystem appears to not be safe for multiprocessing. I'm
opening a bug report here:

https://issues.apache.org/jira/browse/ARROW-1986

In general, when things don't work as you expect and seems like it may
be a bug, please open a JIRA.

Thanks,
Wes

On Wed, Jan 10, 2018 at 3:42 AM, Eli <h5r...@protonmail.ch> wrote:
> Hi,
>
> I'm trying to paralellize reading columns from a parquet file, to serialize 
> it back to standard format in the quickest way.
>
> I'm currently using pool.map(), such that a reader/binarizer toplevel  
> accepts the filepath, column to be read and some other data as a tuple.
>
> It looks something like this:
> (prettier coloring on stackoverflow -
> [https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)](https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)
>
> import
>
> pyarrow
>
> as
>
> pa
>
> import
>
> pyarrow
>
> .
>
> parquet
>
> as
>
> pq
>
> from
>
> multiprocessing
>
> import
>
> Pool
>
> def
>
> binarizer
>
> (
>
> file_data_tuple
>
> ):
>
> ''' Read a Parquet column a file, binarize and return'''
>
> path
>
> ,
>
> col_name
>
> ,
>
> col_meta
>
> ,
>
> native
>
> =
>
> file_data_tuple
>
> if
>
> not
>
> native
>
> :
>
> # Either this or using a top level hdfs_con
>
> hdfs_con
>
> =
>
> pa
>
> .
>
> hdfs
>
> .
>
> connect
>
> (*
>
> hadoop_params
>
> )
>
> read_pq
>
> =
>
> pq
>
> .
>
> read_table
>
> if
>
> native
>
> else
>
> hdfs_con
>
> .
>
> read_parquet
>
> # Read the column, serialize and return
>    arrow_col
>
> =
>
> read_pq
>
> (
>
> filepath
>
> ,
>
> columns
>
> =
>
> (
>
> col_name
>
> ,))
>
> bin_col
>
> =
>
> imported_binarizng_function
>
> (
>
> arrow_col
>
> )
>
> return
>
> bin_col
>
> def
>
> read_binarize_parallel
>
> (
>
> filepaths
>
> ):
>
> ''' Setup parallel reading and binarizing of a parquet file'''
>
> # list of tuples containing the filepath, column name, meta, and mode
>
> pool_params
>
> =
>
> [(),..]
>
> pool
>
> =
>
> Pool
>
> ()
>
> for
>
> file
>
> in
>
> filepaths
>
> :
>
> binary_columns
>
> =
>
> pool
>
> .
>
> map
>
> (
>
> binarizer
>
> ,
>
> pool_params
>
> )
>
> chunk
>
> =
>
> b
>
> ''
>
> .
>
> join
>
> (
>
> binary_columns
>
> )
>
> send_over_socket
>
> (
>
> chunk
>
> )
>
> When this is used in "native" mode, reading with pyarrow from the local disk, 
> this works fine and saves a nice amount of time ( a little less than X4 on 
> machine) for the binarizing part.
>
> However, when I try to do the same but read from hdfs, I get the following 
> error stream:
> (Also less un-pretty on stackoverflow)
>
> [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message 
> of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required 
> fields: callId, status [libprotobuf ERROR 
> google/protobuf/message_lite.cc:123] Can't parse message of type 
> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: 
> callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't 
> parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is 
> missing required fields: callId, status [libprotobuf ERROR 
> google/protobuf/message_lite.cc:123] Can't parse message of type 
> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: 
> callId, status 2018-01-09 21:41:47.939006, p10007, th139965275871040, ERROR 
> Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": 
> RpcChannel.cpp: 703: HdfsRpcException: RPC channel to "192.168.0.101:9000" 
> got protocol mismatch: RPC channel cannot find pending call: id = 3. @ Unknown
>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ 
>> Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> 2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry idempotent 
>> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>> 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call 
>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: 
>> HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: 
>> RPC channel cannot parse response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> 2018
>>
>> -
>>
>> 01
>>
>> -
>>
>> 09
>>
>> 21
>>
>> :
>>
>> 41
>>
>> :
>>
>> 47.939406
>>
>> ,
>>
>> p10008
>>
>> ,
>>
>> th139965275871040
>>
>> ,
>>
>> ERROR
>>
>> Failed
>>
>> to invoke RPC call
>>
>> "getFileInfo"
>>
>> on server
>>
>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>> response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> 2018
>>
>> -
>>
>> 01
>>
>> -
>>
>> 09
>>
>> 21
>>
>> :
>>
>> 41
>>
>> :
>>
>> 47.939422
>>
>> ,
>>
>> p10013
>>
>> ,
>>
>> th139965275871040
>>
>> ,
>>
>> ERROR
>>
>> Failed
>>
>> to invoke RPC call
>>
>> "getFileInfo"
>>
>> on server
>>
>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>> response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> 2018
>>
>> -
>>
>> 01
>>
>> -
>>
>> 09
>>
>> 21
>>
>> :
>>
>> 41
>>
>> :
>>
>> 47.939431
>>
>> ,
>>
>> p10009
>>
>> ,
>>
>> th139965275871040
>>
>> ,
>>
>> ERROR
>>
>> Failed
>>
>> to invoke RPC call
>>
>> "getFileInfo"
>>
>> on server
>>
>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>> response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> @
>>
>> Unknown
>>
>> Unknown
>>
>> 2018
>>
>> -
>>
>> 01
>>
>> -
>>
>> 09
>>
>> 21
>>
>> :
>>
>> 41
>>
>> :
>>
>> 47.939457
>>
>> ,
>>
>> p10012
>>
>> ,
>>
>> th139965275871040
>>
>> ,
>>
>> ERROR
>>
>> Failed
>>
>> to invoke RPC call
>>
>> "getFileInfo"
>>
>> on server
>>
>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>> response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> binarizing process filepath
>>
>> :
>>
>> /
>>
>> parquet_430mb
>>
>> /
>>
>> 5e6
>>
>> .
>>
>> parquet
>>
>> @
>>
>> Unknown
>>
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> 2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry idempotent 
>> RPC call "getFileInfo" on server "192.168.0.101:9000"
>>
>> 2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry idempotent 
>> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>> 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call 
>> "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, 
>> p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on 
>> server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, 
>> th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server 
>> "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, 
>> ERROR Failed to invoke RPC call "getFileInfo" on server 
>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse 
>> response header. @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> 2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry idempotent 
>> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
>> 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call 
>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: 
>> HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server 
>> "192.168.0.101:9000" @ Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown
>>
>> @
>>
>> Unknown
>>
>> Caused
>>
>> by
>>
>> TcpSocket
>>
>> .
>>
>> cpp
>>
>> :
>>
>> 127
>>
>> :
>>
>> HdfsNetworkException
>>
>> :
>>
>> Write
>>
>> 124
>>
>> bytes failed to
>>
>> "192.168.0.101:9000"
>>
>> :
>>
>> (
>>
>> errno
>>
>> :
>>
>> 32
>>
>> )
>>
>> Broken
>>
>> pipe @ Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> @
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HadoopFileSystem
>>
>> ::
>>
>> GetPathInfo
>>
>> (
>>
>> std
>>
>> ::
>>
>> string const
>>
>> &,
>>
>> arrow
>>
>> ::
>>
>> io
>>
>> ::
>>
>> HdfsPathInfo
>>
>> *)
>>
>> @
>>
>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>>
>> (
>>
>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>>
>> *,
>>
>> _object*, arrow::io::HdfsPathInfo*) @ 
>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
>> Unknown @ Unknown
>>
>> @
>>
>> Unknown
>>
>> 2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry idempotent 
>> RPC call "getFileInfo" on server "192.168.0.101:9000" 
>> --------------------------------------------------------------------------- 
>> ArrowIOError Traceback (most recent call last)
>>
>> /home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 
>> sqparquet filepath:', filepath) 375 params_with_path_and_mode = 
>> [col_params+(filepath, native) for col_params in pool_params] --> 376 
>> bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got 
>> ('map complete') 378 num_rows = bin_col[0][2]
>>
>> /usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, 
>> chunksize) 249 ''' 250 assert self._state == RUN --> 251 return 
>> self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, 
>> iterable, chunksize=1):
>>
>> /usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return 
>> self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, 
>> obj):
>>
>> ArrowIOError: HDFS: GetPathInfo failed
>
> I'd be happy for any feedback on how to proceed to achieve some gains over 
> the hadoop read as well.
> I'm not a Python expert so pardon any noobness that I may have incurred with 
> this question :)
>
> Thanks!
> Eli
>
> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to