Sure, I'm kinda new to multiprocessing so I thought it was perhaps something I've missed on my own part.
Thanks, Eli Sent with [ProtonMail](https://protonmail.com) Secure Email. > -------- Original Message -------- > Subject: Re: Trying to figure out how to use multiprocessing with HdfsClient > Local Time: January 11, 2018 3:21 AM > UTC Time: January 11, 2018 1:21 AM > From: wesmck...@gmail.com > To: dev@arrow.apache.org, Eli <h5r...@protonmail.ch> > > The HadoopFileSystem appears to not be safe for multiprocessing. I'm > opening a bug report here: > > https://issues.apache.org/jira/browse/ARROW-1986 > > In general, when things don't work as you expect and seems like it may > be a bug, please open a JIRA. > > Thanks, > Wes > > On Wed, Jan 10, 2018 at 3:42 AM, Eli h5r...@protonmail.ch wrote: > >> Hi, >> I'm trying to paralellize reading columns from a parquet file, to serialize >> it back to standard format in the quickest way. >> I'm currently using pool.map(), such that a reader/binarizer toplevel >> accepts the filepath, column to be read and some other data as a tuple. >> It looks something like this: >> (prettier coloring on stackoverflow - >> [https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)](https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient) >> import >> pyarrow >> as >> pa >> import >> pyarrow >> . >> parquet >> as >> pq >> from >> multiprocessing >> import >> Pool >> def >> binarizer >> ( >> file_data_tuple >> ): >> ''' Read a Parquet column a file, binarize and return''' >> path >> , >> col_name >> , >> col_meta >> , >> native >> = >> file_data_tuple >> if >> not >> native >> : >> >> Either this or using a top level hdfs_con >> >> hdfs_con >> = >> pa >> . >> hdfs >> . >> connect >> (* >> hadoop_params >> ) >> read_pq >> = >> pq >> . >> read_table >> if >> native >> else >> hdfs_con >> . >> read_parquet >> >> Read the column, serialize and return >> >> arrow_col >> = >> read_pq >> ( >> filepath >> , >> columns >> = >> ( >> col_name >> ,)) >> bin_col >> = >> imported_binarizng_function >> ( >> arrow_col >> ) >> return >> bin_col >> def >> read_binarize_parallel >> ( >> filepaths >> ): >> ''' Setup parallel reading and binarizing of a parquet file''' >> >> list of tuples containing the filepath, column name, meta, and mode >> >> pool_params >> = >> [(),..] >> pool >> = >> Pool >> () >> for >> file >> in >> filepaths >> : >> binary_columns >> = >> pool >> . >> map >> ( >> binarizer >> , >> pool_params >> ) >> chunk >> = >> b >> '' >> . >> join >> ( >> binary_columns >> ) >> send_over_socket >> ( >> chunk >> ) >> When this is used in "native" mode, reading with pyarrow from the local >> disk, this works fine and saves a nice amount of time ( a little less than >> X4 on machine) for the binarizing part. >> However, when I try to do the same but read from hdfs, I get the following >> error stream: >> (Also less un-pretty on stackoverflow) >> [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message >> of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing >> required fields: callId, status [libprotobuf ERROR >> google/protobuf/message_lite.cc:123] Can't parse message of type >> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required >> fields: callId, status [libprotobuf ERROR >> google/protobuf/message_lite.cc:123] Can't parse message of type >> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required >> fields: callId, status [libprotobuf ERROR >> google/protobuf/message_lite.cc:123] Can't parse message of type >> "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required >> fields: callId, status 2018-01-09 21:41:47.939006, p10007, >> th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server >> "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot find pending >> call: id = 3. @ Unknown >> >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ >>> Unknown @ Unknown >>> @ >>> Unknown >>> 2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry >>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >>> 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call >>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: >>> HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol >>> mismatch: RPC channel cannot parse response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown @ Unknown >>> @ >>> Unknown >>> @ >>> 2018 >>> >>> - >>> >>> 01 >>> >>> - >>> >>> 09 >>> 21 >>> : >>> 41 >>> : >>> 47.939406 >>> , >>> p10008 >>> , >>> th139965275871040 >>> , >>> ERROR >>> Failed >>> to invoke RPC call >>> "getFileInfo" >>> on server >>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >>> response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown >>> @ >>> Unknown >>> 2018 >>> >>> - >>> >>> 01 >>> >>> - >>> >>> 09 >>> 21 >>> : >>> 41 >>> : >>> 47.939422 >>> , >>> p10013 >>> , >>> th139965275871040 >>> , >>> ERROR >>> Failed >>> to invoke RPC call >>> "getFileInfo" >>> on server >>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >>> response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown >>> @ >>> Unknown >>> @ >>> 2018 >>> >>> - >>> >>> 01 >>> >>> - >>> >>> 09 >>> 21 >>> : >>> 41 >>> : >>> 47.939431 >>> , >>> p10009 >>> , >>> th139965275871040 >>> , >>> ERROR >>> Failed >>> to invoke RPC call >>> "getFileInfo" >>> on server >>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >>> response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown >>> @ >>> Unknown >>> @ >>> @ >>> Unknown >>> Unknown >>> 2018 >>> >>> - >>> >>> 01 >>> >>> - >>> >>> 09 >>> 21 >>> : >>> 41 >>> : >>> 47.939457 >>> , >>> p10012 >>> , >>> th139965275871040 >>> , >>> ERROR >>> Failed >>> to invoke RPC call >>> "getFileInfo" >>> on server >>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >>> response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown @ Unknown >>> @ >>> Unknown >>> @ >>> Unknown >>> Unknown >>> @ >>> Unknown >>> binarizing process filepath >>> : >>> / >>> parquet_430mb >>> / >>> 5e6 >>> . >>> parquet >>> @ >>> Unknown >>> Unknown >>> @ >>> Unknown >>> @ >>> Unknown >>> @ >>> Unknown >>> 2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry >>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" >>> 2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry >>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >>> 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call >>> "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, >>> p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on >>> server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, >>> th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server >>> "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, >>> ERROR Failed to invoke RPC call "getFileInfo" on server >>> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >>> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >>> response header. @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown >>> @ >>> Unknown >>> 2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry >>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >>> 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call >>> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: >>> HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server >>> "192.168.0.101:9000" @ Unknown @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown >>> @ >>> Unknown >>> Caused >>> by >>> TcpSocket >>> . >>> cpp >>> : >>> 127 >>> : >>> HdfsNetworkException >>> : >>> Write >>> 124 >>> bytes failed to >>> "192.168.0.101:9000" >>> : >>> ( >>> errno >>> : >>> 32 >>> ) >>> Broken >>> pipe @ Unknown @ Unknown >>> @ >>> Unknown >>> @ >>> arrow >>> :: >>> io >>> :: >>> HadoopFileSystem >>> :: >>> GetPathInfo >>> ( >>> std >>> :: >>> string const >>> &, >>> arrow >>> :: >>> io >>> :: >>> HdfsPathInfo >>> *) >>> @ >>> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >>> ( >>> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >>> *, >>> _object*, arrow::io::HdfsPathInfo*) @ >>> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >>> Unknown @ Unknown >>> @ >>> Unknown >>> 2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry >>> idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" >>> --------------------------------------------------------------------------- >>> ArrowIOError Traceback (most recent call last) >>> /home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 >>> sqparquet filepath:', filepath) 375 params_with_path_and_mode = >>> [col_params+(filepath, native) for col_params in pool_params] --> 376 >>> bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got >>> ('map complete') 378 num_rows = bin_col[0][2] >>> /usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, >>> chunksize) 249 ''' 250 assert self._state == RUN --> 251 return >>> self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, >>> func, iterable, chunksize=1): >>> /usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 >>> return self._value 557 else: --> 558 raise self._value 559 560 def >>> _set(self, i, obj): >>> ArrowIOError: HDFS: GetPathInfo failed >> >> I'd be happy for any feedback on how to proceed to achieve some gains over >> the hadoop read as well. >> I'm not a Python expert so pardon any noobness that I may have incurred with >> this question :) >> Thanks! >> Eli >> Sent with [ProtonMail](https://protonmail.com) Secure Email.