caldempsey commented on code in PR #152:
URL: https://github.com/apache/spark-connect-go/pull/152#discussion_r2869423875
##########
spark/client/client.go:
##########
@@ -434,6 +443,151 @@ func (c *ExecutePlanClient) ToTable() (*types.StructType,
arrow.Table, error) {
}
}
+func (c *ExecutePlanClient) ToRecordBatches(ctx context.Context) (<-chan
arrow.Record, <-chan error, *types.StructType) {
+ recordChan := make(chan arrow.Record, 10)
+ errorChan := make(chan error, 1)
+
+ go func() {
+ defer func() {
+ // Ensure channels are always closed to prevent
goroutine leaks
+ close(recordChan)
+ close(errorChan)
+ }()
+
+ // Explicitly needed when tracking re-attachable execution.
+ c.done = false
Review Comment:
Taking another look at this. I've spliced in the approach from ToTable(),
originally written by @grundprinzip. Instead of writing to the shared c.done
field, ToRecordSequence now tracks completion with a local done variable inside
the closure.
After EOF, it checks if c.opts.ReattachExecution && !done and yields an
error, the same way ToTable() does. This removes the race condition because
nothing shared is being mutated, and it behaves correctly in both reattachable
and non-reattachable modes.
I do think we should DRY this up eventually, but judiciously. I've kept the
code WET for now intentionally. I want both code paths to remain directly
comparable until I fully understand the domain. Having @grundprinzip's original
logic in ToTable() sitting side by side against my equivalent in
ToRecordSequence makes it much easier to reason about correctness and spot
differences.
So rather than DRY it up in this PR, I'd like to take ownership of a fast
follow-up where I consolidate ToRecordSequence and ToTable once this gets
merged.
That said, happy to take a crack. I'm being cautious since this is an open
source project, and I'd rather not introduce a negative diff that risks
breaking users when the current approach is correct, comparable, and maintains
parity between the two separate critical paths.
##########
spark/client/client.go:
##########
@@ -434,6 +443,151 @@ func (c *ExecutePlanClient) ToTable() (*types.StructType,
arrow.Table, error) {
}
}
+func (c *ExecutePlanClient) ToRecordBatches(ctx context.Context) (<-chan
arrow.Record, <-chan error, *types.StructType) {
+ recordChan := make(chan arrow.Record, 10)
+ errorChan := make(chan error, 1)
+
+ go func() {
+ defer func() {
+ // Ensure channels are always closed to prevent
goroutine leaks
+ close(recordChan)
+ close(errorChan)
+ }()
+
+ // Explicitly needed when tracking re-attachable execution.
+ c.done = false
Review Comment:
yes that's right, good catch. im still not sure how to test it 'works with
both modes' other than setting the value of the flag true/false. so i will do
at least that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]