HyukjinKwon commented on code in PR #50466: URL: https://github.com/apache/spark/pull/50466#discussion_r2027863851
########## python/pyspark/sql/streaming/stateful_processor_api_client.py: ########## @@ -49,22 +49,26 @@ class StatefulProcessorHandleState(Enum): class StatefulProcessorApiClient: def __init__( - self, state_server_port: int, key_schema: StructType, is_driver: bool = False + self, state_server_port: Union[int, str], key_schema: StructType, is_driver: bool = False ) -> None: self.key_schema = key_schema - self._client_socket = socket.socket() - self._client_socket.connect(("localhost", state_server_port)) - - # SPARK-51667: We have a pattern of sending messages continuously from one side - # (Python -> JVM, and vice versa) before getting response from other side. Since most - # messages we are sending are small, this triggers the bad combination of Nagle's algorithm - # and delayed ACKs, which can cause a significant delay on the latency. - # See SPARK-51667 for more details on how this can be a problem. - # - # Disabling either would work, but it's more common to disable Nagle's algorithm; there is - # lot less reference to disabling delayed ACKs, while there are lots of resources to - # disable Nagle's algorithm. - self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + if isinstance(state_server_port, str): + self._client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._client_socket.connect(state_server_port) + else: + self._client_socket = socket.socket() + self._client_socket.connect(("localhost", state_server_port)) + + # SPARK-51667: We have a pattern of sending messages continuously from one side + # (Python -> JVM, and vice versa) before getting response from other side. Since most + # messages we are sending are small, this triggers the bad combination of Nagle's + # algorithm and delayed ACKs, which can cause a significant delay on the latency. + # See SPARK-51667 for more details on how this can be a problem. + # + # Disabling either would work, but it's more common to disable Nagle's algorithm; there + # is lot less reference to disabling delayed ACKs, while there are lots of resources to + # disable Nagle's algorithm. + self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) Review Comment: This is unsupported by UDS. This was more an improvement in its performance so I think it's fine cc @HeartSaVioR FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org