[ https://issues.apache.org/jira/browse/SPARK-51690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-51690. ---------------------------------- Fix Version/s: 4.1.0 Resolution: Fixed Issue resolved by pull request 50488 [https://github.com/apache/spark/pull/50488] > Change the protocol of ListState.put()/get()/appendList() from Arrow to > simple custom protocol > ---------------------------------------------------------------------------------------------- > > Key: SPARK-51690 > URL: https://issues.apache.org/jira/browse/SPARK-51690 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 4.1.0 > Reporter: Jungtaek Lim > Assignee: Jungtaek Lim > Priority: Major > Labels: pull-request-available > Fix For: 4.1.0 > > > We figured out Arrow codepath to send the multiple elements for ListState is > introducing some overhead. > The conversion in ListState.put()/ListState.appendList() is, List[Tuple] -> > Arrow RecordBatch from Python worker to send to JVM, and Arrow RecordBatch -> > List[Row] in JVM to save them to state store. It happens opposite way for > ListState.get(). > We use Arrow for intercommunication between Python worker and JVM, but that > is based on the fact there are large amount of data to send, and the batch > size for Arrow is 10000 by default. It might not be impossible to have such a > huge list, but given the performance of UDF being used for PySpark, I > wouldn't anticipate that the massive workload will work in transformWithState > in PySpark anyway. > Since this is just to send multiple elements, there is simple protocol to > achieve similar thing. From the benchmark, the simple protocol cuts the > measured time for ListState.put() / ListState.get() by 1/3. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org