jingz-db commented on code in PR #51036: URL: https://github.com/apache/spark/pull/51036#discussion_r2112753141
########## python/pyspark/sql/streaming/stateful_processor_api_client.py: ########## @@ -222,76 +224,96 @@ def delete_timer(self, expiry_time_stamp_ms: int) -> None: # TODO(SPARK-49233): Classify user facing errors. raise PySparkRuntimeError(f"Error deleting timer: " f"{response_message[1]}") - def get_list_timer_row(self, iterator_id: str) -> int: + def get_list_timer_row(self, iterator_id: str) -> Tuple[int, bool]: import pyspark.sql.streaming.proto.StateMessage_pb2 as stateMessage if iterator_id in self.list_timer_iterator_cursors: # if the iterator is already in the dictionary, return the next row - pandas_df, index = self.list_timer_iterator_cursors[iterator_id] + data_batch, index, require_next_fetch = self.list_timer_iterator_cursors[iterator_id] else: list_call = stateMessage.ListTimers(iteratorId=iterator_id) state_call_command = stateMessage.TimerStateCallCommand(list=list_call) call = stateMessage.StatefulProcessorCall(timerStateCall=state_call_command) message = stateMessage.StateRequest(statefulProcessorCall=call) self._send_proto_message(message.SerializeToString()) - response_message = self._receive_proto_message() + response_message = self._receive_proto_message_with_timers() status = response_message[0] if status == 0: - iterator = self._read_arrow_state() - # We need to exhaust the iterator here to make sure all the arrow batches are read, - # even though there is only one batch in the iterator. Otherwise, the stream might - # block further reads since it thinks there might still be some arrow batches left. - # We only need to read the first batch in the iterator because it's guaranteed that - # there would only be one batch sent from the JVM side. - data_batch = None - for batch in iterator: - if data_batch is None: - data_batch = batch - if data_batch is None: - # TODO(SPARK-49233): Classify user facing errors. - raise PySparkRuntimeError("Error getting map state entry.") - pandas_df = data_batch.to_pandas() + data_batch = list( Review Comment: Correct me if I am wrong: We are now not sending expired timer using arrow batch, but sending in batched list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org