Hi,

I'm trying to paralellize reading columns from a parquet file, to serialize it 
back to standard format in the quickest way.

I'm currently using pool.map(), such that a reader/binarizer toplevel  accepts 
the filepath, column to be read and some other data as a tuple.

It looks something like this:
(prettier coloring on stackoverflow -
[https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)](https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)

import

pyarrow

as

pa

import

pyarrow

.

parquet

as

pq

from

multiprocessing

import

Pool

def

binarizer

(

file_data_tuple

):

''' Read a Parquet column a file, binarize and return'''

path

,

col_name

,

col_meta

,

native

=

file_data_tuple

if

not

native

:

# Either this or using a top level hdfs_con

hdfs_con

=

pa

.

hdfs

.

connect

(*

hadoop_params

)

read_pq

=

pq

.

read_table

if

native

else

hdfs_con

.

read_parquet

# Read the column, serialize and return
   arrow_col

=

read_pq

(

filepath

,

columns

=

(

col_name

,))

bin_col

=

imported_binarizng_function

(

arrow_col

)

return

bin_col

def

read_binarize_parallel

(

filepaths

):

''' Setup parallel reading and binarizing of a parquet file'''

# list of tuples containing the filepath, column name, meta, and mode

pool_params

=

[(),..]

pool

=

Pool

()

for

file

in

filepaths

:

binary_columns

=

pool

.

map

(

binarizer

,

pool_params

)

chunk

=

b

''

.

join

(

binary_columns

)

send_over_socket

(

chunk

)

When this is used in "native" mode, reading with pyarrow from the local disk, 
this works fine and saves a nice amount of time ( a little less than X4 on 
machine) for the binarizing part.

However, when I try to do the same but read from hdfs, I get the following 
error stream:
(Also less un-pretty on stackoverflow)

[libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of 
type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required 
fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] 
Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it 
is missing required fields: callId, status [libprotobuf ERROR 
google/protobuf/message_lite.cc:123] Can't parse message of type 
"Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: 
callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't 
parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is 
missing required fields: callId, status 2018-01-09 21:41:47.939006, p10007, 
th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server 
"192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: RPC channel to 
"192.168.0.101:9000" got protocol mismatch: RPC channel cannot find pending 
call: id = 3. @ Unknown

> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ 
> Unknown @ Unknown
>
> @
>
> Unknown
>
> 2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry idempotent 
> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
> 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call 
> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: 
> HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: 
> RPC channel cannot parse response header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown @ Unknown
>
> @
>
> Unknown
>
> @
>
> 2018
>
> -
>
> 01
>
> -
>
> 09
>
> 21
>
> :
>
> 41
>
> :
>
> 47.939406
>
> ,
>
> p10008
>
> ,
>
> th139965275871040
>
> ,
>
> ERROR
>
> Failed
>
> to invoke RPC call
>
> "getFileInfo"
>
> on server
>
> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response 
> header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown
>
> @
>
> Unknown
>
> 2018
>
> -
>
> 01
>
> -
>
> 09
>
> 21
>
> :
>
> 41
>
> :
>
> 47.939422
>
> ,
>
> p10013
>
> ,
>
> th139965275871040
>
> ,
>
> ERROR
>
> Failed
>
> to invoke RPC call
>
> "getFileInfo"
>
> on server
>
> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response 
> header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown
>
> @
>
> Unknown
>
> @
>
> 2018
>
> -
>
> 01
>
> -
>
> 09
>
> 21
>
> :
>
> 41
>
> :
>
> 47.939431
>
> ,
>
> p10009
>
> ,
>
> th139965275871040
>
> ,
>
> ERROR
>
> Failed
>
> to invoke RPC call
>
> "getFileInfo"
>
> on server
>
> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response 
> header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown
>
> @
>
> Unknown
>
> @
>
> @
>
> Unknown
>
> Unknown
>
> 2018
>
> -
>
> 01
>
> -
>
> 09
>
> 21
>
> :
>
> 41
>
> :
>
> 47.939457
>
> ,
>
> p10012
>
> ,
>
> th139965275871040
>
> ,
>
> ERROR
>
> Failed
>
> to invoke RPC call
>
> "getFileInfo"
>
> on server
>
> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to 
> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response 
> header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown @ Unknown
>
> @
>
> Unknown
>
> @
>
> Unknown
>
> Unknown
>
> @
>
> Unknown
>
> binarizing process filepath
>
> :
>
> /
>
> parquet_430mb
>
> /
>
> 5e6
>
> .
>
> parquet
>
> @
>
> Unknown
>
> Unknown
>
> @
>
> Unknown
>
> @
>
> Unknown
>
> @
>
> Unknown
>
> 2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry idempotent 
> RPC call "getFileInfo" on server "192.168.0.101:9000"
>
> 2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry idempotent 
> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
> 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call 
> "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, 
> p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on 
> server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, 
> th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server 
> "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, 
> ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": 
> RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" 
> got protocol mismatch: RPC channel cannot parse response header. @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown
>
> @
>
> Unknown
>
> 2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry idempotent 
> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 
> 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call 
> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: 
> HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server 
> "192.168.0.101:9000" @ Unknown @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown
>
> @
>
> Unknown
>
> Caused
>
> by
>
> TcpSocket
>
> .
>
> cpp
>
> :
>
> 127
>
> :
>
> HdfsNetworkException
>
> :
>
> Write
>
> 124
>
> bytes failed to
>
> "192.168.0.101:9000"
>
> :
>
> (
>
> errno
>
> :
>
> 32
>
> )
>
> Broken
>
> pipe @ Unknown @ Unknown
>
> @
>
> Unknown
>
> @
>
> arrow
>
> ::
>
> io
>
> ::
>
> HadoopFileSystem
>
> ::
>
> GetPathInfo
>
> (
>
> std
>
> ::
>
> string const
>
> &,
>
> arrow
>
> ::
>
> io
>
> ::
>
> HdfsPathInfo
>
> *)
>
> @
>
> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info
>
> (
>
> __pyx_obj_7pyarrow_3lib_HadoopFileSystem
>
> *,
>
> _object*, arrow::io::HdfsPathInfo*) @ 
> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 
> Unknown @ Unknown
>
> @
>
> Unknown
>
> 2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry idempotent 
> RPC call "getFileInfo" on server "192.168.0.101:9000" 
> --------------------------------------------------------------------------- 
> ArrowIOError Traceback (most recent call last)
>
> /home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 
> sqparquet filepath:', filepath) 375 params_with_path_and_mode = 
> [col_params+(filepath, native) for col_params in pool_params] --> 376 bin_col 
> = self.pool.map(read_binarize, params_with_path_and_mode) 377 got ('map 
> complete') 378 num_rows = bin_col[0][2]
>
> /usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, 
> chunksize) 249 ''' 250 assert self._state == RUN --> 251 return 
> self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, 
> iterable, chunksize=1):
>
> /usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return 
> self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, 
> obj):
>
> ArrowIOError: HDFS: GetPathInfo failed

I'd be happy for any feedback on how to proceed to achieve some gains over the 
hadoop read as well.
I'm not a Python expert so pardon any noobness that I may have incurred with 
this question :)

Thanks!
Eli

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to