joao-r-reis commented on code in PR #1946:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1946#discussion_r3273822955
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
Review Comment:
The code in `writeFlusherImpl` has fewer loops for essentially the same
thing because as soon as the writerequest comes in the buffer is appended to a
`net.Buffers` object and the channel is appended to a `[]chan<-` object. Then
the only loop that happens is to send the result to the channels after the
write succeeds.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]