ntjohnson1 opened a new issue, #1339:
URL: https://github.com/apache/datafusion-python/issues/1339
**Describe the bug**
I'm trying to generate a udaf that returns multiple timestamps for each
partition id.
**To Reproduce**
```python
import datafusion as dfn
from datafusion import udf, udaf, Accumulator, col
import pyarrow as pa
import pyarrow.compute as pc
import numpy as np
class ResampleAccumulator(Accumulator):
def __init__(self):
self._min = float('inf')
self._max = 0
# 10 Hz
self._timestep = 100 # ms
def update(self, array):
# Logic to update the sum and count from an input array
# In a real implementation, you would process the pyarrow array
efficiently
print("Enter update")
local_min, local_max = pc.min_max(array).values()
local_min_ns = local_min.cast(pa.timestamp('ns')).value
local_max_ns = local_max.cast(pa.timestamp('ns')).value
self._min = min(local_min_ns, self._min)
self._max = max(local_max_ns, self._max)
print(f"update {self._min=}, {self._max=}")
def merge(self, states_array):
print("Enter merge")
# Is there a better way to do this with pc?
# or maybe just throw it into numpy
self._min = min(states_array[0][0].as_py(), self._min)
self._max = max(states_array[1][0].as_py(), self._max)
print(f"merge {self._min=}, {self._max=}")
def state(self):
print("Enter state")
# Return the current state as a list of scalars
return pa.array([self._min, self._max], type=pa.int64())
def evaluate(self):
print("Enter evaluate")
desired_timestamps = np.arange(np.datetime64(self._min, 'ns'),
np.datetime64(self._max, 'ns'), np.timedelta64(self._timestep, "ms"))
print(f"{len(desired_timestamps)=}")
array_result = pa.array(desired_timestamps, type=pa.timestamp('ns'))
print(array_result)
return array_result
resample_udaf = udaf(ResampleAccumulator, [pa.timestamp('ns')],
pa.list_(pa.timestamp('ns')), [pa.int64(), pa.int64()], volatility="stable")
ctx = dfn.SessionContext()
df = ctx.from_pydict({"id": [0,1], "time": [np.datetime64(0, 'ns'),
np.datetime64(1_000_000_000, 'ns')]})
print(df)
result = df.aggregate(
"id",
[resample_udaf(col("time"))]
)
print(result.schema())
result.collect()
```
Output
```bash
Traceback (most recent call last):
File "<path>/<file>.py", line 60, in <module>
result.collect()
File "<path>/.venv/lib/python3.12/site-packages/datafusion/dataframe.py",
line 729, in collect
return self.df.collect()
^^^^^^^^^^^^^^^^^
Exception: DataFusion error: Execution("ArrowTypeError: object of type
<class 'pyarrow.lib.TimestampArray'> cannot be converted to int")
```
**Expected behavior**
This works or provides a clearer error.
**Additional context**
Fails on datafusion 51. If I return just a single timestamp and update the
udaf call then this works.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]