[ https://issues.apache.org/jira/browse/ARROW-10957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255968#comment-17255968 ]
Liya Fan commented on ARROW-10957: ---------------------------------- Sure. Please find the test case here: https://github.com/apache/arrow/blob/master/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java#L254 > Expanding pyarrow buffer size more than 2GB for pandas_udf functions > -------------------------------------------------------------------- > > Key: ARROW-10957 > URL: https://issues.apache.org/jira/browse/ARROW-10957 > Project: Apache Arrow > Issue Type: Improvement > Components: C++, Java, Python > Affects Versions: 2.0.0 > Environment: Spark: 2.4.4 > Python: > Dcycler (0.10.0) > glmnet-py (0.1.0b2) > joblib (1.0.0) > kiwisolver (1.3.1) > lightgbm (3.1.1) EPRECATION > matplotlib (3.0.3) > numpy (1.19.4) > pandas (1.1.5) > pip (9.0.3: The default format will switch to columns in the future. You can) > pyarrow 2.0.0 > pyparsing (2.4.7) use --format=(legacy|columns) (or define a > format=(python-dateutil (2.8.1) > pytz (202legacy|columns) in yo0.4) > scikit-learn (0.23.2) > scipy (1.5.4) > setuptools (51.0.0) ur pip.conf under the [list] section) to disable this > warnsix (1.15.0) > sklearn (0.0) > threadpoolctl (2.1.0) > venv-paing. ck (0.2.0) > wheel (0.36.2) > Reporter: Dmitry Kravchuk > Priority: Major > Labels: features, patch, performance > Fix For: 2.0.1 > > Original Estimate: 672h > Remaining Estimate: 672h > > There is 2GB limit for data that can be passed to any pandas_udf function and > the aim of this issue is to expand this limit. It's very small buffer size if > we use pyspark and our goal is fitting machine learning models. > Steps to reproduce - just use following spark-submit for executing following > after python function. > {code:java} > %sh > cd /home/zeppelin/code && \ > export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \ > export PYSPARK_PYTHON=./env3/bin/python && \ > export ARROW_PRE_0_15_IPC_FORMAT=1 && \ > spark-submit \ > --master yarn \ > --deploy-mode client \ > --num-executors 5 \ > --executor-cores 5 \ > --driver-memory 8G \ > --executor-memory 8G \ > --conf spark.executor.memoryOverhead=4G \ > --conf spark.driver.memoryOverhead=4G \ > --archives /home/zeppelin/env3.tar.gz#env3 \ > --jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \ > --py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \ > --job temp > {code} > > {code:java|title=Bar.Python|borderStyle=solid} > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > def analyze(spark): > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > import os > os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1" > return df > df4 = df3 \ > .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \ > .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \ > .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \ > .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \ > .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \ > .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \ > .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \ > .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \ > .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \ > .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType())) > print(df4.printSchema()) > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > {code} > If you need more details please let me know. -- This message was sent by Atlassian Jira (v8.3.4#803005)