joao-r-reis commented on code in PR #1946:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1946#discussion_r3273902227


##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx 
context.Context, timeout time
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// segmentWriter allows batching multiple frames into a signle segment before 
flushing them to the connection.
+type segmentWriter struct {
+       w    contextWriter
+       quit <-chan struct{}
+
+       // Holds write requests for the current segment.
+       writeRequests     []writeRequest
+       totalFramesLength int
+       writeCh           chan writeRequest
+
+       segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit 
<-chan struct{}, compressor Compressor) *segmentWriter {
+       sw := &segmentWriter{
+               w:            w,
+               quit:         quit,
+               writeCh:      make(chan writeRequest),
+               segmentCodec: newSegmentCodec(compressor),
+       }
+
+       go sw.runFlusher(writeInterval)
+
+       return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int, 
error) {
+       resultChan := make(chan writeResult, 1)
+       req := writeRequest{
+               resultChan: resultChan,
+               data:       frame,
+       }
+
+       select {
+       case <-ctx.Done():
+               return 0, ctx.Err()
+       case <-sw.quit:
+               return 0, ErrConnectionClosed
+       case sw.writeCh <- req:
+               // Enqueued for writing
+       }
+
+       result := <-resultChan
+       return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+       timer := time.NewTimer(interval)
+       defer timer.Stop()
+
+       if !timer.Stop() {
+               <-timer.C
+       }
+
+       // Indicates whether the flush timer is running
+       running := false
+
+       for {
+               select {
+               case <-sw.quit:
+                       return
+               case req := <-sw.writeCh:
+                       frame := req.data
+                       if len(frame) > maxSegmentPayloadSize {
+                               sw.flushBigFrameImmediately(req)

Review Comment:
   Interesting that the java driver behaves that way but I personally think 
it's better to preserve the order. In a regular application you would probably 
not notice it ever but if an app is sending requests in a sequential order 
(without waiting for the response) and the server receives the requests in a 
different order I'd consider that odd even if it's technically not against the 
protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to