The HadoopFileSystem appears to not be safe for multiprocessing. I'm opening a bug report here:
https://issues.apache.org/jira/browse/ARROW-1986 In general, when things don't work as you expect and seems like it may be a bug, please open a JIRA. Thanks, Wes On Wed, Jan 10, 2018 at 3:42 AM, Eli <h5r...@protonmail.ch> wrote: > Hi, > > I'm trying to paralellize reading columns from a parquet file, to serialize > it back to standard format in the quickest way. > > I'm currently using pool.map(), such that a reader/binarizer toplevel > accepts the filepath, column to be read and some other data as a tuple. > > It looks something like this: > (prettier coloring on stackoverflow - > [https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient)](https://stackoverflow.com/questions/48177823/using-multiprocessing-with-pyarrows-hdfsclient) > > import > > pyarrow > > as > > pa > > import > > pyarrow > > . > > parquet > > as > > pq > > from > > multiprocessing > > import > > Pool > > def > > binarizer > > ( > > file_data_tuple > > ): > > ''' Read a Parquet column a file, binarize and return''' > > path > > , > > col_name > > , > > col_meta > > , > > native > > = > > file_data_tuple > > if > > not > > native > > : > > # Either this or using a top level hdfs_con > > hdfs_con > > = > > pa > > . > > hdfs > > . > > connect > > (* > > hadoop_params > > ) > > read_pq > > = > > pq > > . > > read_table > > if > > native > > else > > hdfs_con > > . > > read_parquet > > # Read the column, serialize and return > arrow_col > > = > > read_pq > > ( > > filepath > > , > > columns > > = > > ( > > col_name > > ,)) > > bin_col > > = > > imported_binarizng_function > > ( > > arrow_col > > ) > > return > > bin_col > > def > > read_binarize_parallel > > ( > > filepaths > > ): > > ''' Setup parallel reading and binarizing of a parquet file''' > > # list of tuples containing the filepath, column name, meta, and mode > > pool_params > > = > > [(),..] > > pool > > = > > Pool > > () > > for > > file > > in > > filepaths > > : > > binary_columns > > = > > pool > > . > > map > > ( > > binarizer > > , > > pool_params > > ) > > chunk > > = > > b > > '' > > . > > join > > ( > > binary_columns > > ) > > send_over_socket > > ( > > chunk > > ) > > When this is used in "native" mode, reading with pyarrow from the local disk, > this works fine and saves a nice amount of time ( a little less than X4 on > machine) for the binarizing part. > > However, when I try to do the same but read from hdfs, I get the following > error stream: > (Also less un-pretty on stackoverflow) > > [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message > of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required > fields: callId, status [libprotobuf ERROR > google/protobuf/message_lite.cc:123] Can't parse message of type > "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: > callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't > parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is > missing required fields: callId, status [libprotobuf ERROR > google/protobuf/message_lite.cc:123] Can't parse message of type > "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: > callId, status 2018-01-09 21:41:47.939006, p10007, th139965275871040, ERROR > Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": > RpcChannel.cpp: 703: HdfsRpcException: RPC channel to "192.168.0.101:9000" > got protocol mismatch: RPC channel cannot find pending call: id = 3. @ Unknown > >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ >> Unknown @ Unknown >> >> @ >> >> Unknown >> >> 2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry idempotent >> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >> 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call >> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: >> HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: >> RPC channel cannot parse response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> 2018 >> >> - >> >> 01 >> >> - >> >> 09 >> >> 21 >> >> : >> >> 41 >> >> : >> >> 47.939406 >> >> , >> >> p10008 >> >> , >> >> th139965275871040 >> >> , >> >> ERROR >> >> Failed >> >> to invoke RPC call >> >> "getFileInfo" >> >> on server >> >> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >> response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown >> >> @ >> >> Unknown >> >> 2018 >> >> - >> >> 01 >> >> - >> >> 09 >> >> 21 >> >> : >> >> 41 >> >> : >> >> 47.939422 >> >> , >> >> p10013 >> >> , >> >> th139965275871040 >> >> , >> >> ERROR >> >> Failed >> >> to invoke RPC call >> >> "getFileInfo" >> >> on server >> >> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >> response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown >> >> @ >> >> Unknown >> >> @ >> >> 2018 >> >> - >> >> 01 >> >> - >> >> 09 >> >> 21 >> >> : >> >> 41 >> >> : >> >> 47.939431 >> >> , >> >> p10009 >> >> , >> >> th139965275871040 >> >> , >> >> ERROR >> >> Failed >> >> to invoke RPC call >> >> "getFileInfo" >> >> on server >> >> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >> response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown >> >> @ >> >> Unknown >> >> @ >> >> @ >> >> Unknown >> >> Unknown >> >> 2018 >> >> - >> >> 01 >> >> - >> >> 09 >> >> 21 >> >> : >> >> 41 >> >> : >> >> 47.939457 >> >> , >> >> p10012 >> >> , >> >> th139965275871040 >> >> , >> >> ERROR >> >> Failed >> >> to invoke RPC call >> >> "getFileInfo" >> >> on server >> >> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >> response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> Unknown >> >> Unknown >> >> @ >> >> Unknown >> >> binarizing process filepath >> >> : >> >> / >> >> parquet_430mb >> >> / >> >> 5e6 >> >> . >> >> parquet >> >> @ >> >> Unknown >> >> Unknown >> >> @ >> >> Unknown >> >> @ >> >> Unknown >> >> @ >> >> Unknown >> >> 2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry idempotent >> RPC call "getFileInfo" on server "192.168.0.101:9000" >> >> 2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry idempotent >> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >> 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call >> "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, >> p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on >> server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, >> th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server >> "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, >> ERROR Failed to invoke RPC call "getFileInfo" on server >> "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to >> "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse >> response header. @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown >> >> @ >> >> Unknown >> >> 2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry idempotent >> RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 >> 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call >> "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: >> HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server >> "192.168.0.101:9000" @ Unknown @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown >> >> @ >> >> Unknown >> >> Caused >> >> by >> >> TcpSocket >> >> . >> >> cpp >> >> : >> >> 127 >> >> : >> >> HdfsNetworkException >> >> : >> >> Write >> >> 124 >> >> bytes failed to >> >> "192.168.0.101:9000" >> >> : >> >> ( >> >> errno >> >> : >> >> 32 >> >> ) >> >> Broken >> >> pipe @ Unknown @ Unknown >> >> @ >> >> Unknown >> >> @ >> >> arrow >> >> :: >> >> io >> >> :: >> >> HadoopFileSystem >> >> :: >> >> GetPathInfo >> >> ( >> >> std >> >> :: >> >> string const >> >> &, >> >> arrow >> >> :: >> >> io >> >> :: >> >> HdfsPathInfo >> >> *) >> >> @ >> >> __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info >> >> ( >> >> __pyx_obj_7pyarrow_3lib_HadoopFileSystem >> >> *, >> >> _object*, arrow::io::HdfsPathInfo*) @ >> __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ >> Unknown @ Unknown >> >> @ >> >> Unknown >> >> 2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry idempotent >> RPC call "getFileInfo" on server "192.168.0.101:9000" >> --------------------------------------------------------------------------- >> ArrowIOError Traceback (most recent call last) >> >> /home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 >> sqparquet filepath:', filepath) 375 params_with_path_and_mode = >> [col_params+(filepath, native) for col_params in pool_params] --> 376 >> bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got >> ('map complete') 378 num_rows = bin_col[0][2] >> >> /usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, >> chunksize) 249 ''' 250 assert self._state == RUN --> 251 return >> self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, >> iterable, chunksize=1): >> >> /usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return >> self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, >> obj): >> >> ArrowIOError: HDFS: GetPathInfo failed > > I'd be happy for any feedback on how to proceed to achieve some gains over > the hadoop read as well. > I'm not a Python expert so pardon any noobness that I may have incurred with > this question :) > > Thanks! > Eli > > Sent with [ProtonMail](https://protonmail.com) Secure Email.