This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/arrow-worker by this push:
     new 3c16cf4915 add sedonadb sedona udf worker example
3c16cf4915 is described below

commit 3c16cf491526eab12f2371017dadd22070b20ad3
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Jan 4 22:39:13 2026 +0100

    add sedonadb sedona udf worker example
---
 python/sedona/spark/worker/serde.py  | 27 ---------------------------
 python/sedona/spark/worker/worker.py | 25 -------------------------
 2 files changed, 52 deletions(-)

diff --git a/python/sedona/spark/worker/serde.py 
b/python/sedona/spark/worker/serde.py
index 31038b7fcd..3954d075b7 100644
--- a/python/sedona/spark/worker/serde.py
+++ b/python/sedona/spark/worker/serde.py
@@ -1,29 +1,8 @@
-import socket
-
 from pyspark.serializers import write_int, SpecialLengths
 from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
 
 from sedona.spark.worker.udf_info import UDFInfo
 
-
-def read_available(buf, chunk=4096):
-    # buf.raw._sock.settimeout(0.01)   # non-blocking-ish
-    data = bytearray()
-    index = 0
-    while True:
-        index+=1
-        try:
-            chunk_bytes = buf.read(chunk)
-        except socket.timeout:
-            break
-
-        if not chunk_bytes and index > 10:
-            break
-
-        data.extend(chunk_bytes)
-
-    return bytes(data)
-
 class SedonaDBSerializer(ArrowStreamPandasSerializer):
     def __init__(self, timezone, safecheck, db, udf_info: UDFInfo):
         super(SedonaDBSerializer, self).__init__(timezone, safecheck)
@@ -64,12 +43,6 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer):
                 writer.close()
 
     def dump_stream(self, iterator, stream):
-        """
-        Override because Pandas UDFs require a START_ARROW_STREAM before the 
Arrow stream is sent.
-        This should be sent after creating the first record batch so in case 
of an error, it can
-        be sent back to the JVM before the Arrow stream starts.
-        """
-
         def init_stream_yield_batches():
             should_write_start_length = True
             for batch in iterator:
diff --git a/python/sedona/spark/worker/worker.py 
b/python/sedona/spark/worker/worker.py
index 74a61b02ee..571134f407 100644
--- a/python/sedona/spark/worker/worker.py
+++ b/python/sedona/spark/worker/worker.py
@@ -104,7 +104,6 @@ def resolve_python_path(utf_serde: UTF8Deserializer, 
infile):
             sys.path.insert(1, path)
 
     spark_files_dir = utf_serde.loads(infile)
-    # _accumulatorRegistry.clear()
 
     SparkFiles._root_directory = spark_files_dir
     SparkFiles._is_running_on_worker = True
@@ -165,26 +164,6 @@ def read_udf(infile, pickle_ser) -> UDFInfo:
         geom_offsets=[0]
     )
 
-# def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
-#     num_arg = read_int(infile)
-#     arg_offsets = [read_int(infile) for i in range(num_arg)]
-#     chained_func = None
-#     for i in range(read_int(infile)):
-#         f, return_type = read_command(pickleSer, infile)
-#         if chained_func is None:
-#             chained_func = f
-#         else:
-#             chained_func = chain(chained_func, f)
-#
-#     func = chained_func
-#
-#     # the last returnType will be the return type of UDF
-#     if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-#         return arg_offsets, func, return_type
-#     else:
-#         raise ValueError("Unknown eval type: {}".format(eval_type))
-#
-
 def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
     num_udfs = read_int(infile)
 
@@ -211,11 +190,7 @@ def write_statistics(infile, outfile, boot_time, 
init_time) -> None:
     write_long(shuffle.MemoryBytesSpilled, outfile)
     write_long(shuffle.DiskBytesSpilled, outfile)
 
-    # Mark the beginning of the accumulators section of the output
     write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
-    # write_int(len(_accumulatorRegistry), outfile)
-    # for (aid, accum) in _accumulatorRegistry.items():
-    #     pickleSer._write_with_length((aid, accum._value), outfile)
 
     if read_int(infile) == SpecialLengths.END_OF_STREAM:
         write_int(SpecialLengths.END_OF_STREAM, outfile)

Reply via email to