caldempsey commented on code in PR #152:
URL: https://github.com/apache/spark-connect-go/pull/152#discussion_r2869423875


##########
spark/client/client.go:
##########
@@ -434,6 +443,151 @@ func (c *ExecutePlanClient) ToTable() (*types.StructType, 
arrow.Table, error) {
        }
 }
 
+func (c *ExecutePlanClient) ToRecordBatches(ctx context.Context) (<-chan 
arrow.Record, <-chan error, *types.StructType) {
+       recordChan := make(chan arrow.Record, 10)
+       errorChan := make(chan error, 1)
+
+       go func() {
+               defer func() {
+                       // Ensure channels are always closed to prevent 
goroutine leaks
+                       close(recordChan)
+                       close(errorChan)
+               }()
+
+               // Explicitly needed when tracking re-attachable execution.
+               c.done = false

Review Comment:
   Taking another look at this. I've spliced in the approach from ToTable(), 
originally written by @grundprinzip. Instead of writing to the shared c.done 
field, ToRecordSequence now tracks completion with a local done variable inside 
the closure.
   
   After EOF, it checks if c.opts.ReattachExecution && !done and yields an 
error, the same way ToTable() does. This removes the race condition because 
nothing shared is being mutated, and it behaves correctly in both reattachable 
and non-reattachable modes.
   
   I do think we should DRY this up eventually, but judiciously. I've kept the 
code WET for now intentionally. I want both code paths to remain directly 
comparable until I fully understand the domain. Having @grundprinzip's original 
logic in ToTable() sitting side by side against my equivalent in 
ToRecordSequence makes it much easier to reason about correctness and spot 
differences.
   
   So rather than DRY it up in this PR, I'd like to take ownership of a fast 
follow-up where I consolidate ToRecordSequence and ToTable once this gets 
merged.
   
   That said, happy to take a crack. I'm being cautious since this is an open 
source project, and I'd rather not introduce a negative diff that risks 
breaking users when the current approach is correct, comparable, and maintains 
parity between the two separate critical paths.



##########
spark/client/client.go:
##########
@@ -434,6 +443,151 @@ func (c *ExecutePlanClient) ToTable() (*types.StructType, 
arrow.Table, error) {
        }
 }
 
+func (c *ExecutePlanClient) ToRecordBatches(ctx context.Context) (<-chan 
arrow.Record, <-chan error, *types.StructType) {
+       recordChan := make(chan arrow.Record, 10)
+       errorChan := make(chan error, 1)
+
+       go func() {
+               defer func() {
+                       // Ensure channels are always closed to prevent 
goroutine leaks
+                       close(recordChan)
+                       close(errorChan)
+               }()
+
+               // Explicitly needed when tracking re-attachable execution.
+               c.done = false

Review Comment:
   yes that's right, good catch. im still not sure how to test it 'works with 
both modes' other than setting the value of the flag true/false. so i will do 
at least that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to