HeartSaVioR commented on code in PR #50466:
URL: https://github.com/apache/spark/pull/50466#discussion_r2027946678


##########
python/pyspark/sql/streaming/stateful_processor_api_client.py:
##########
@@ -49,22 +49,26 @@ class StatefulProcessorHandleState(Enum):
 
 class StatefulProcessorApiClient:
     def __init__(
-        self, state_server_port: int, key_schema: StructType, is_driver: bool 
= False
+        self, state_server_port: Union[int, str], key_schema: StructType, 
is_driver: bool = False
     ) -> None:
         self.key_schema = key_schema
-        self._client_socket = socket.socket()
-        self._client_socket.connect(("localhost", state_server_port))
-
-        # SPARK-51667: We have a pattern of sending messages continuously from 
one side
-        # (Python -> JVM, and vice versa) before getting response from other 
side. Since most
-        # messages we are sending are small, this triggers the bad combination 
of Nagle's algorithm
-        # and delayed ACKs, which can cause a significant delay on the latency.
-        # See SPARK-51667 for more details on how this can be a problem.
-        #
-        # Disabling either would work, but it's more common to disable Nagle's 
algorithm; there is
-        # lot less reference to disabling delayed ACKs, while there are lots 
of resources to
-        # disable Nagle's algorithm.
-        self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 
1)
+        if isinstance(state_server_port, str):
+            self._client_socket = socket.socket(socket.AF_UNIX, 
socket.SOCK_STREAM)
+            self._client_socket.connect(state_server_port)
+        else:
+            self._client_socket = socket.socket()
+            self._client_socket.connect(("localhost", state_server_port))
+
+            # SPARK-51667: We have a pattern of sending messages continuously 
from one side
+            # (Python -> JVM, and vice versa) before getting response from 
other side. Since most
+            # messages we are sending are small, this triggers the bad 
combination of Nagle's
+            # algorithm and delayed ACKs, which can cause a significant delay 
on the latency.
+            # See SPARK-51667 for more details on how this can be a problem.
+            #
+            # Disabling either would work, but it's more common to disable 
Nagle's algorithm; there
+            # is lot less reference to disabling delayed ACKs, while there are 
lots of resources to
+            # disable Nagle's algorithm.
+            self._client_socket.setsockopt(socket.IPPROTO_TCP, 
socket.TCP_NODELAY, 1)

Review Comment:
   This is only applied with TCP. If we leave this to TCP socket path, this 
should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to