joao-r-reis commented on code in PR #1946:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1946#discussion_r3267933871
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
Review Comment:
If I'm reading this correctly this will ignore the order in which the frames
come in. If there's already write requests that were appended then we should
flush first.
##########
conn.go:
##########
@@ -795,91 +774,14 @@ func (c *Conn) releaseStream(call *callReq) {
}
}
-func (c *Conn) recvSegment(ctx context.Context) error {
- var (
- frame []byte
- isSelfContained bool
- err error
- )
-
- // Read frame based on compression
- if c.compressor != nil {
- frame, isSelfContained, err = readCompressedSegment(c.r,
c.compressor)
- } else {
- frame, isSelfContained, err = readUncompressedSegment(c.r)
- }
- if err != nil {
- return err
- }
-
- if isSelfContained {
- return c.processAllFramesInSegment(ctx, bytes.NewReader(frame))
- }
-
- head, err := readHeader(bytes.NewReader(frame), c.headerBuf[:])
- if err != nil {
- return err
- }
-
- buf := bytes.NewBuffer(make([]byte, 0, head.length+frameHeadSize))
- buf.Write(frame)
-
- // Computing how many bytes of message left to read
- bytesToRead := head.length - len(frame) + frameHeadSize
-
- err = c.recvPartialFrames(buf, bytesToRead)
- if err != nil {
- return err
+func (c *Conn) maybeSwitchToSegments() {
+ if c.version >= protoVersion5 {
+ // Use segments writter which basically batches multiple frames
into a single segment before flushing them to the connection.
Review Comment:
writer*
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
Review Comment:
single*
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
Review Comment:
Can we not just reuse a single instance of `bytes.Buffer`? If I'm not
mistaken this slice is only needed until `encodeAndWrite` below
##########
segment_codec.go:
##########
@@ -0,0 +1,277 @@
+// segment_codec.go
Review Comment:
license header
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
Review Comment:
do we need `writeRequests`? can we not just append to the buffer directly
when the request comes?
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
+ // TODO: interesting if compiler optimizes this
+ framesBuf = append(framesBuf, req.data...)
+ }
+
+ err := sw.encodeAndWrite(framesBuf, true)
+ if err != nil {
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: 0,
+ err: err,
+ }
+ }
+ return
+ }
+
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: len(req.data),
+ err: nil,
+ }
+ }
+}
+
+func (sw *segmentWriter) reset() {
+ sw.writeRequests = nil
+ sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+ // Calculate the number of segment the frame will be split into
+ segmentsCount := 0
+ frame := req.data
+ frameLength := len(frame)
+ exactFit := frameLength%maxSegmentPayloadSize == 0
+ if exactFit {
+ segmentsCount = frameLength / maxSegmentPayloadSize
+ } else {
+ // An extra segment for the remainder of the frame
+ segmentsCount = frameLength/maxSegmentPayloadSize + 1
+ }
+
+ var flushErr error
+
+ for i := 0; i < segmentsCount; i++ {
+ // Calculate the length of the current frame part which will be
encoded into a segment
+ partialFrameLength := 0
+ if i < segmentsCount-1 || exactFit {
+ partialFrameLength = maxSegmentPayloadSize
+ } else {
+ partialFrameLength = frameLength % maxSegmentPayloadSize
+ }
+ err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+ if err != nil {
+ flushErr = err
+ break
+ }
+ frame = frame[partialFrameLength:]
+ }
+
+ written := len(req.data)
+ if flushErr != nil {
+ written = 0
+ }
+
+ req.resultChan <- writeResult{
+ n: written,
+ err: flushErr,
+ }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool)
error {
+ segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+ if err != nil {
+ return err
+ }
+ _, err = sw.w.writeContext(context.Background(), segmentBuf)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+ r ConnReader
+
+ segmentCodec segmentCodec
+
+ // Reusable buffer for decoded frames
+ // This buffer might have multiple frames inside if self-contained
segment is decoded
+ readBufferDecoded bytes.Reader
+ // Reusable buffer for reading frame header
+ frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+ return &segmentReader{
+ r: r,
+ segmentCodec: segmentCodec,
+ }
+}
+
+// why do we have a write method for reader lol
Review Comment:
`ConnReader` embeds `net.Conn` unfortunately which doesn't make sense. I
actually tried to find whether `gocql.Conn.Write` is used anywhere and it's
only used in test code I believe but that's slightly unrelated to this.
This could definitely be improved as it makes the code harder to read but oh
well. Maybe you can replace this comment with a TODO about refactoring the code
to remove the need to embed `net.Conn` in `ConnReader` and remove unused
methods like `gocql.Conn.Write()`
##########
segment_codec.go:
##########
@@ -0,0 +1,277 @@
+// segment_codec.go
+
+package gocql
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+)
+
+const (
+ maxSegmentPayloadSize = 1<<17 - 1
+
+ compressedHeaderSize = 5 + crc24Size
+ uncompressedHeaderSize = 3 + crc24Size
+
+ crc24Size = 3
+ crc32Size = 4
+)
+
+// segmentHeader represents the header information of a segment.
+type segmentHeader struct {
+ // payload length is the length of the segment payload
+ payloadLength int
+ // uncompressedPayloadLength is the length of the uncompressed payload
(only for compressed segments)
+ uncompressedPayloadLength int
+ // indicates whether the segment contains only completed frames
+ isSelfContained bool
+}
+
+func (segment *segmentHeader) String() string {
+ return fmt.Sprintf("segmentHeader(len=%d, uncompressedLen=%d,
isSelfContained=%v)",
+ segment.payloadLength,
+ segment.uncompressedPayloadLength,
+ segment.isSelfContained)
+}
+
+type segmentCodec struct {
+ compressor Compressor
+ compressed bool
+}
+
+func newSegmentCodec(compressor Compressor) segmentCodec {
+ return segmentCodec{
+ compressed: compressor != nil,
+ compressor: compressor,
+ }
+}
+
+func (sc *segmentCodec) encode(payload []byte, isSelfContained bool) ([]byte,
error) {
+ if len(payload) > maxSegmentPayloadSize {
+ return nil, fmt.Errorf("gocql: payload length (%d) exceeds
maximum segment size of %d", len(payload), maxSegmentPayloadSize)
+ }
+
+ if sc.compressed {
+ return sc.encodeCompressedSegment(payload, isSelfContained)
+ }
+ return sc.encodeUncompressedSegment(payload, isSelfContained)
+}
+
+func (sc *segmentCodec) encodeCompressedSegment(payload []byte,
isSelfContained bool) ([]byte, error) {
+ uncompressedLen := len(payload)
+
+ compressed, err := sc.compressor.AppendCompressed(nil, payload)
+ if err != nil {
+ return nil, err
+ }
+
+ compressedLen := len(compressed)
+
+ // If compression is not worth it, we should send uncompressed data
+ // following the next logic:
+ if uncompressedLen < compressedLen {
+ compressed = payload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ segmentBuf := make([]byte, compressedHeaderSize+compressedLen+crc32Size)
+
+ sc.encodeCompressedSegmentHeader(compressedLen, uncompressedLen,
isSelfContained, segmentBuf)
+ sc.encodePayloadAndChecksum(compressed,
segmentBuf[compressedHeaderSize:])
+
+ return segmentBuf, nil
+}
+
+// encodeCompressedSegmentHeader encodes the compressed segment header into
the provided destination slice.
+// It assumes that dest has enough space to hold the header.
+func (sc *segmentCodec) encodeCompressedSegmentHeader(compressedLen,
uncompressedLen int, isSelfContained bool, dest []byte) {
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ binary.LittleEndian.PutUint64(dest[:], combined)
+
+ headerCRC24 := Crc24(dest[:5])
+ dest[5] = byte(headerCRC24)
+ dest[6] = byte(headerCRC24 >> 8)
+ dest[7] = byte(headerCRC24 >> 16)
+}
+
+func (sc *segmentCodec) encodeUncompressedSegment(payload []byte,
isSelfContained bool) ([]byte, error) {
+ payloadLen := len(payload)
+
+ segmentBuf := make([]byte, uncompressedHeaderSize+payloadLen+crc32Size)
+
+ sc.encodeUncompressedSegmentHeader(payloadLen, isSelfContained,
segmentBuf)
+ sc.encodePayloadAndChecksum(payload,
segmentBuf[uncompressedHeaderSize:])
+
+ return segmentBuf, nil
+}
+
+// encodeUncompressedSegmentHeader encodes the uncompressed segment header
into the provided destination slice.
+// It assumes that dest has enough space to hold the header.
+func (sc *segmentCodec) encodeUncompressedSegmentHeader(payloadLen int,
isSelfContained bool, dest []byte) {
+ headerInt := uint32(payloadLen)
+ if isSelfContained {
+ headerInt |= 1 << 17
+ }
+
+ dest[0] = byte(headerInt)
+ dest[1] = byte(headerInt >> 8)
+ dest[2] = byte(headerInt >> 16)
+
+ crc := Crc24(dest[:3])
+ dest[3] = byte(crc)
+ dest[4] = byte(crc >> 8)
+ dest[5] = byte(crc >> 16)
+}
+
+// encodePayloadAndChecksum encodes the payload and its CRC32 checksum into
the provided destination slice.
+// It assumes that dest has enough space to hold the payload and checksum.
+// Starting from dest[0], it writes the payload followed by its CRC32 checksum.
+func (sc *segmentCodec) encodePayloadAndChecksum(payload []byte, dest []byte) {
+ payloadCRC32 := Crc32(payload)
+ copy(dest, payload)
+ binary.LittleEndian.PutUint32(dest[len(payload):], payloadCRC32)
+}
+
+func (sc *segmentCodec) decode(r io.Reader) ([]byte, bool, error) {
+ if sc.compressed {
+ return sc.decodeCompressedSegment(r)
+ }
+ return sc.decodeUncompressedSegment(r)
+}
+
+func (sc *segmentCodec) decodeCompressedSegment(r io.Reader) ([]byte, bool,
error) {
+ header, err := sc.decodeCompressedSegmentHeader(r)
+ if err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read compressed
segment header, err: %w", err)
+ }
+
+ compressedPayload, err := sc.decodePayload(r, header)
+ if err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read compressed
segment payload, err: %w", err)
+ }
+
+ var uncompressedPayload []byte
+ if header.uncompressedPayloadLength > 0 {
+ uncompressedPayload, err =
sc.compressor.AppendDecompressed(nil, compressedPayload,
uint32(header.uncompressedPayloadLength))
+ if err != nil {
+ return nil, false, err
+ }
+ // Verify that the decompressed length matches the expected
length
+ if uint32(len(uncompressedPayload)) !=
uint32(header.uncompressedPayloadLength) {
+ return nil, false, fmt.Errorf("gocql: length mismatch
after payload decompressing, got %d, expected %d", len(uncompressedPayload),
header.uncompressedPayloadLength)
+ }
+ } else {
+ // in case when the segment was not compressed because
compression was not worth it
+ uncompressedPayload = compressedPayload
+ }
+
+ return uncompressedPayload, header.isSelfContained, nil
+}
+
+func (sc *segmentCodec) decodeUncompressedSegment(r io.Reader) ([]byte, bool,
error) {
+ header, err := sc.decodeUncompressedSegmentHeader(r)
+ if err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed segment header, err: %w", err)
+ }
+
+ payload, err := sc.decodePayload(r, header)
+ if err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed segment payload, err: %w", err)
+ }
+
+ return payload, header.isSelfContained, nil
+}
+
+// verifySegmentHeaderChecksum verifies the CRC24 checksum of the segment
header.
+func (sc *segmentCodec) verifySegmentHeaderChecksum(data []byte, expected
uint32) error {
+ computed := Crc24(data)
+ if computed != expected {
+ return fmt.Errorf("gocql: crc24 mismatch in segment header:
expected %d, got %d", expected, computed)
+ }
+ return nil
+}
+
+// verifySegmentPayloadChecksum verifies the CRC32 checksum of the segment
payload.
+func (sc *segmentCodec) verifySegmentPayloadChecksum(data []byte, expected
uint32) error {
+ computed := Crc32(data)
+ if computed != expected {
+ return fmt.Errorf("gocql: payload crc32 mismatch in segment
payload: expected %d, got %d", expected, computed)
+ }
+ return nil
+}
+
+// decodeCompressedSegmentHeader reads and verifies the header of a compressed
segment from the given reader.
+func (sc *segmentCodec) decodeCompressedSegmentHeader(r io.Reader)
(*segmentHeader, error) {
+ var headerBuf [8]byte // TODO: potentially optimize allocation, could
be stored in segmentCodec and reused if the codec is a specific for each Conn
Review Comment:
I agree with the TODO, it's the same with the read and write buffer we can
just reuse them if each connection has their own.
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
+ // TODO: interesting if compiler optimizes this
+ framesBuf = append(framesBuf, req.data...)
+ }
+
+ err := sw.encodeAndWrite(framesBuf, true)
+ if err != nil {
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: 0,
+ err: err,
+ }
+ }
+ return
+ }
+
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: len(req.data),
+ err: nil,
+ }
+ }
+}
+
+func (sw *segmentWriter) reset() {
+ sw.writeRequests = nil
+ sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+ // Calculate the number of segment the frame will be split into
+ segmentsCount := 0
+ frame := req.data
+ frameLength := len(frame)
+ exactFit := frameLength%maxSegmentPayloadSize == 0
+ if exactFit {
+ segmentsCount = frameLength / maxSegmentPayloadSize
+ } else {
+ // An extra segment for the remainder of the frame
+ segmentsCount = frameLength/maxSegmentPayloadSize + 1
+ }
+
+ var flushErr error
+
+ for i := 0; i < segmentsCount; i++ {
+ // Calculate the length of the current frame part which will be
encoded into a segment
+ partialFrameLength := 0
+ if i < segmentsCount-1 || exactFit {
+ partialFrameLength = maxSegmentPayloadSize
+ } else {
+ partialFrameLength = frameLength % maxSegmentPayloadSize
+ }
+ err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+ if err != nil {
+ flushErr = err
+ break
+ }
+ frame = frame[partialFrameLength:]
+ }
+
+ written := len(req.data)
+ if flushErr != nil {
+ written = 0
+ }
+
+ req.resultChan <- writeResult{
+ n: written,
+ err: flushErr,
+ }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool)
error {
+ segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+ if err != nil {
+ return err
+ }
+ _, err = sw.w.writeContext(context.Background(), segmentBuf)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+ r ConnReader
+
+ segmentCodec segmentCodec
+
+ // Reusable buffer for decoded frames
+ // This buffer might have multiple frames inside if self-contained
segment is decoded
+ readBufferDecoded bytes.Reader
+ // Reusable buffer for reading frame header
+ frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+ return &segmentReader{
+ r: r,
+ segmentCodec: segmentCodec,
+ }
+}
+
+// why do we have a write method for reader lol
+func (sr *segmentReader) Write(b []byte) (n int, err error) {
+ return sr.r.Write(b)
+}
+
+func (sr *segmentReader) Close() error {
+ return sr.r.Close()
+}
+
+func (sr *segmentReader) LocalAddr() net.Addr {
+ return sr.r.LocalAddr()
+}
+
+func (sr *segmentReader) RemoteAddr() net.Addr {
+ return sr.r.RemoteAddr()
+}
+
+func (sr *segmentReader) SetDeadline(t time.Time) error {
+ return sr.r.SetDeadline(t)
+}
+
+func (sr *segmentReader) SetReadDeadline(t time.Time) error {
+ return sr.r.SetReadDeadline(t)
+}
+
+func (sr *segmentReader) SetWriteDeadline(t time.Time) error {
+ return sr.r.SetWriteDeadline(t)
+}
+
+func (sr *segmentReader) SetTimeout(timeout time.Duration) {
+ sr.r.SetTimeout(timeout)
+}
+
+func (sr *segmentReader) GetTimeout() time.Duration {
+ return sr.r.GetTimeout()
+}
+
+func (sr *segmentReader) Read(p []byte) (n int, err error) {
+ // If we don't have a read buffer, or it's empty, read the first
segment.
+ // If we have read all the frames from the current segment, read the
next segment.
+ // If segment is non self-container, it will read all segments and read
buffer will hold the full frame.
+ if sr.readBufferDecoded.Len() == 0 {
+ err = sr.readSegment()
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return sr.readBufferDecoded.Read(p)
+}
+
+func (sr *segmentReader) readSegment() error {
+ segment, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ // TODO: does only network related errors should result in
connection closure?
+ // var verr net.Error
+ // if errors.As(err, &verr) {
+ // return nil, false, verr
+ // }
+ return err
+ }
+
+ if isSelfContained {
+ // Reset the buffer to the new segment
+ // It might contain multiple frames so Read should be called
mutiple times to read all of them
+ sr.readBufferDecoded.Reset(segment)
+ return nil
+ }
+
+ frame, err := sr.readNonSelfContainedSegment(segment)
+ if err != nil {
+ return err
+ }
+
+ // Contains a single frame so we can read it all at once
+ sr.readBufferDecoded.Reset(frame)
+ return nil
+}
+
+// Non self-contained segment contains only part of a bigger frame that is
split into multiple segments.
+// Calling it results in a full frame being read into a single buffer.
+func (sr *segmentReader) readNonSelfContainedSegment(segment []byte) ([]byte,
error) {
+ frameHeader, err := readHeader(bytes.NewBuffer(segment),
sr.frameHeaderBuf[:])
+ if err != nil {
+ return nil, err
+ }
+
+ // Allocate a buffer to read the rest of the segment into
+ buf := bytes.NewBuffer(make([]byte, 0,
frameHeader.length+frameHeadSize))
+ buf.Write(segment)
+
+ // Computing how many bytes of message left to read
+ // len(segment) is the length of the first frame we already read
+ bytesToRead := frameHeader.length - len(segment) + frameHeadSize
+ err = sr.readPartialFrames(buf, bytesToRead)
+ if err != nil {
+ return nil, err
+ }
+
+ return buf.Bytes(), nil
+}
+
+// Reads parts of a bigger frame that is split into multiple segments into a
single buffer.
+// bytesToRead is the number of bytes left to read from the frame.
+// Called by readNonSelfContainedSegment.
+func (sr *segmentReader) readPartialFrames(dstBuf *bytes.Buffer, bytesToRead
int) error {
+ for bytesToRead > 0 {
+ frame, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ return err
+ }
+ // Expected to receive only non self-contained segments
+ if isSelfContained {
+ return errUnexpectedSelfcontainedSegment
+ }
+ if totalLength := dstBuf.Len() + len(frame); totalLength >
dstBuf.Cap() {
+ return fmt.Errorf("gocql: expected partial frame of
length %d, got %d", dstBuf.Cap(), totalLength)
+ }
+ n, _ := dstBuf.Write(frame)
+ bytesToRead -= n
+ }
+
+ if bytesToRead < 0 {
+ // This should never happen actually
+ panic("gocql: something went wrong while reading partial
frames")
Review Comment:
We can return an error here just in case. It would lead to the connection
closing instead of crashing the application which is better imo.
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
+ // TODO: interesting if compiler optimizes this
+ framesBuf = append(framesBuf, req.data...)
+ }
+
+ err := sw.encodeAndWrite(framesBuf, true)
+ if err != nil {
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: 0,
+ err: err,
+ }
+ }
+ return
+ }
+
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: len(req.data),
+ err: nil,
+ }
+ }
+}
+
+func (sw *segmentWriter) reset() {
+ sw.writeRequests = nil
+ sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+ // Calculate the number of segment the frame will be split into
+ segmentsCount := 0
+ frame := req.data
+ frameLength := len(frame)
+ exactFit := frameLength%maxSegmentPayloadSize == 0
+ if exactFit {
+ segmentsCount = frameLength / maxSegmentPayloadSize
+ } else {
+ // An extra segment for the remainder of the frame
+ segmentsCount = frameLength/maxSegmentPayloadSize + 1
+ }
+
+ var flushErr error
+
+ for i := 0; i < segmentsCount; i++ {
+ // Calculate the length of the current frame part which will be
encoded into a segment
+ partialFrameLength := 0
+ if i < segmentsCount-1 || exactFit {
+ partialFrameLength = maxSegmentPayloadSize
+ } else {
+ partialFrameLength = frameLength % maxSegmentPayloadSize
+ }
+ err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+ if err != nil {
+ flushErr = err
+ break
+ }
+ frame = frame[partialFrameLength:]
+ }
+
+ written := len(req.data)
+ if flushErr != nil {
+ written = 0
+ }
+
+ req.resultChan <- writeResult{
+ n: written,
+ err: flushErr,
+ }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool)
error {
+ segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+ if err != nil {
+ return err
+ }
+ _, err = sw.w.writeContext(context.Background(), segmentBuf)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+ r ConnReader
+
+ segmentCodec segmentCodec
+
+ // Reusable buffer for decoded frames
+ // This buffer might have multiple frames inside if self-contained
segment is decoded
+ readBufferDecoded bytes.Reader
+ // Reusable buffer for reading frame header
+ frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+ return &segmentReader{
+ r: r,
+ segmentCodec: segmentCodec,
+ }
+}
+
+// why do we have a write method for reader lol
+func (sr *segmentReader) Write(b []byte) (n int, err error) {
+ return sr.r.Write(b)
+}
+
+func (sr *segmentReader) Close() error {
+ return sr.r.Close()
+}
+
+func (sr *segmentReader) LocalAddr() net.Addr {
+ return sr.r.LocalAddr()
+}
+
+func (sr *segmentReader) RemoteAddr() net.Addr {
+ return sr.r.RemoteAddr()
+}
+
+func (sr *segmentReader) SetDeadline(t time.Time) error {
+ return sr.r.SetDeadline(t)
+}
+
+func (sr *segmentReader) SetReadDeadline(t time.Time) error {
+ return sr.r.SetReadDeadline(t)
+}
+
+func (sr *segmentReader) SetWriteDeadline(t time.Time) error {
+ return sr.r.SetWriteDeadline(t)
+}
+
+func (sr *segmentReader) SetTimeout(timeout time.Duration) {
+ sr.r.SetTimeout(timeout)
+}
+
+func (sr *segmentReader) GetTimeout() time.Duration {
+ return sr.r.GetTimeout()
+}
+
+func (sr *segmentReader) Read(p []byte) (n int, err error) {
+ // If we don't have a read buffer, or it's empty, read the first
segment.
+ // If we have read all the frames from the current segment, read the
next segment.
+ // If segment is non self-container, it will read all segments and read
buffer will hold the full frame.
+ if sr.readBufferDecoded.Len() == 0 {
+ err = sr.readSegment()
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return sr.readBufferDecoded.Read(p)
+}
+
+func (sr *segmentReader) readSegment() error {
+ segment, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ // TODO: does only network related errors should result in
connection closure?
+ // var verr net.Error
+ // if errors.As(err, &verr) {
+ // return nil, false, verr
+ // }
+ return err
+ }
+
+ if isSelfContained {
+ // Reset the buffer to the new segment
+ // It might contain multiple frames so Read should be called
mutiple times to read all of them
Review Comment:
multiple*
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
+ // TODO: interesting if compiler optimizes this
+ framesBuf = append(framesBuf, req.data...)
+ }
+
+ err := sw.encodeAndWrite(framesBuf, true)
+ if err != nil {
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: 0,
+ err: err,
+ }
+ }
+ return
+ }
+
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: len(req.data),
+ err: nil,
+ }
+ }
+}
+
+func (sw *segmentWriter) reset() {
+ sw.writeRequests = nil
+ sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+ // Calculate the number of segment the frame will be split into
+ segmentsCount := 0
+ frame := req.data
+ frameLength := len(frame)
+ exactFit := frameLength%maxSegmentPayloadSize == 0
+ if exactFit {
+ segmentsCount = frameLength / maxSegmentPayloadSize
+ } else {
+ // An extra segment for the remainder of the frame
+ segmentsCount = frameLength/maxSegmentPayloadSize + 1
+ }
+
+ var flushErr error
+
+ for i := 0; i < segmentsCount; i++ {
+ // Calculate the length of the current frame part which will be
encoded into a segment
+ partialFrameLength := 0
+ if i < segmentsCount-1 || exactFit {
+ partialFrameLength = maxSegmentPayloadSize
+ } else {
+ partialFrameLength = frameLength % maxSegmentPayloadSize
+ }
+ err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+ if err != nil {
+ flushErr = err
+ break
+ }
+ frame = frame[partialFrameLength:]
+ }
+
+ written := len(req.data)
+ if flushErr != nil {
+ written = 0
+ }
+
+ req.resultChan <- writeResult{
+ n: written,
+ err: flushErr,
+ }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool)
error {
+ segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+ if err != nil {
+ return err
+ }
+ _, err = sw.w.writeContext(context.Background(), segmentBuf)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+ r ConnReader
+
+ segmentCodec segmentCodec
+
+ // Reusable buffer for decoded frames
+ // This buffer might have multiple frames inside if self-contained
segment is decoded
+ readBufferDecoded bytes.Reader
+ // Reusable buffer for reading frame header
+ frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+ return &segmentReader{
+ r: r,
+ segmentCodec: segmentCodec,
+ }
+}
+
+// why do we have a write method for reader lol
+func (sr *segmentReader) Write(b []byte) (n int, err error) {
+ return sr.r.Write(b)
+}
+
+func (sr *segmentReader) Close() error {
+ return sr.r.Close()
+}
+
+func (sr *segmentReader) LocalAddr() net.Addr {
+ return sr.r.LocalAddr()
+}
+
+func (sr *segmentReader) RemoteAddr() net.Addr {
+ return sr.r.RemoteAddr()
+}
+
+func (sr *segmentReader) SetDeadline(t time.Time) error {
+ return sr.r.SetDeadline(t)
+}
+
+func (sr *segmentReader) SetReadDeadline(t time.Time) error {
+ return sr.r.SetReadDeadline(t)
+}
+
+func (sr *segmentReader) SetWriteDeadline(t time.Time) error {
+ return sr.r.SetWriteDeadline(t)
+}
+
+func (sr *segmentReader) SetTimeout(timeout time.Duration) {
+ sr.r.SetTimeout(timeout)
+}
+
+func (sr *segmentReader) GetTimeout() time.Duration {
+ return sr.r.GetTimeout()
+}
+
+func (sr *segmentReader) Read(p []byte) (n int, err error) {
+ // If we don't have a read buffer, or it's empty, read the first
segment.
+ // If we have read all the frames from the current segment, read the
next segment.
+ // If segment is non self-container, it will read all segments and read
buffer will hold the full frame.
+ if sr.readBufferDecoded.Len() == 0 {
+ err = sr.readSegment()
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return sr.readBufferDecoded.Read(p)
+}
+
+func (sr *segmentReader) readSegment() error {
+ segment, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ // TODO: does only network related errors should result in
connection closure?
+ // var verr net.Error
+ // if errors.As(err, &verr) {
+ // return nil, false, verr
+ // }
Review Comment:
This TODO needs to be addressed. Personally I think it's fine to close the
connection when the driver fails to encode/decode something here because it's
either an error on the decode/encode code and most likely a driver bug or it's
a connection error. A connection error should lead to connection closure and
the encode/decode error should never happen so I don't see a lot of benefit in
trying to handle it.
##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx
context.Context, timeout time
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// segmentWriter allows batching multiple frames into a signle segment before
flushing them to the connection.
+type segmentWriter struct {
+ w contextWriter
+ quit <-chan struct{}
+
+ // Holds write requests for the current segment.
+ writeRequests []writeRequest
+ totalFramesLength int
+ writeCh chan writeRequest
+
+ segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit
<-chan struct{}, compressor Compressor) *segmentWriter {
+ sw := &segmentWriter{
+ w: w,
+ quit: quit,
+ writeCh: make(chan writeRequest),
+ segmentCodec: newSegmentCodec(compressor),
+ }
+
+ go sw.runFlusher(writeInterval)
+
+ return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int,
error) {
+ resultChan := make(chan writeResult, 1)
+ req := writeRequest{
+ resultChan: resultChan,
+ data: frame,
+ }
+
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-sw.quit:
+ return 0, ErrConnectionClosed
+ case sw.writeCh <- req:
+ // Enqueued for writing
+ }
+
+ result := <-resultChan
+ return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ if !timer.Stop() {
+ <-timer.C
+ }
+
+ // Indicates whether the flush timer is running
+ running := false
+
+ for {
+ select {
+ case <-sw.quit:
+ return
+ case req := <-sw.writeCh:
+ frame := req.data
+ if len(frame) > maxSegmentPayloadSize {
+ sw.flushBigFrameImmediately(req)
+ } else if sw.fitsSegment(frame) {
+ sw.appendWriteRequest(req)
+ if !running {
+ running = true
+ timer.Reset(interval)
+ }
+ } else {
+ // Frame doesn't fit into current segment,
+ // so we need to flush the current one and
start a new one
+ sw.flushCurrentSegment()
+ sw.reset()
+ sw.appendWriteRequest(req)
+ timer.Reset(interval)
+ }
+ case <-timer.C:
+ running = false
+ sw.flushCurrentSegment()
+ sw.reset()
+ }
+ }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+ sw.writeRequests = append(sw.writeRequests, req)
+ sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+ return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+ framesBuf := make([]byte, 0, sw.totalFramesLength)
+ for _, req := range sw.writeRequests {
+ // TODO: interesting if compiler optimizes this
+ framesBuf = append(framesBuf, req.data...)
+ }
+
+ err := sw.encodeAndWrite(framesBuf, true)
+ if err != nil {
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: 0,
+ err: err,
+ }
+ }
+ return
+ }
+
+ for _, req := range sw.writeRequests {
+ req.resultChan <- writeResult{
+ n: len(req.data),
+ err: nil,
+ }
+ }
+}
+
+func (sw *segmentWriter) reset() {
+ sw.writeRequests = nil
+ sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+ // Calculate the number of segment the frame will be split into
+ segmentsCount := 0
+ frame := req.data
+ frameLength := len(frame)
+ exactFit := frameLength%maxSegmentPayloadSize == 0
+ if exactFit {
+ segmentsCount = frameLength / maxSegmentPayloadSize
+ } else {
+ // An extra segment for the remainder of the frame
+ segmentsCount = frameLength/maxSegmentPayloadSize + 1
+ }
+
+ var flushErr error
+
+ for i := 0; i < segmentsCount; i++ {
+ // Calculate the length of the current frame part which will be
encoded into a segment
+ partialFrameLength := 0
+ if i < segmentsCount-1 || exactFit {
+ partialFrameLength = maxSegmentPayloadSize
+ } else {
+ partialFrameLength = frameLength % maxSegmentPayloadSize
+ }
+ err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+ if err != nil {
+ flushErr = err
+ break
+ }
+ frame = frame[partialFrameLength:]
+ }
+
+ written := len(req.data)
+ if flushErr != nil {
+ written = 0
+ }
+
+ req.resultChan <- writeResult{
+ n: written,
+ err: flushErr,
+ }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool)
error {
+ segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+ if err != nil {
+ return err
+ }
+ _, err = sw.w.writeContext(context.Background(), segmentBuf)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+ r ConnReader
+
+ segmentCodec segmentCodec
+
+ // Reusable buffer for decoded frames
+ // This buffer might have multiple frames inside if self-contained
segment is decoded
+ readBufferDecoded bytes.Reader
+ // Reusable buffer for reading frame header
+ frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+ return &segmentReader{
+ r: r,
+ segmentCodec: segmentCodec,
+ }
+}
+
+// why do we have a write method for reader lol
+func (sr *segmentReader) Write(b []byte) (n int, err error) {
+ return sr.r.Write(b)
+}
+
+func (sr *segmentReader) Close() error {
+ return sr.r.Close()
+}
+
+func (sr *segmentReader) LocalAddr() net.Addr {
+ return sr.r.LocalAddr()
+}
+
+func (sr *segmentReader) RemoteAddr() net.Addr {
+ return sr.r.RemoteAddr()
+}
+
+func (sr *segmentReader) SetDeadline(t time.Time) error {
+ return sr.r.SetDeadline(t)
+}
+
+func (sr *segmentReader) SetReadDeadline(t time.Time) error {
+ return sr.r.SetReadDeadline(t)
+}
+
+func (sr *segmentReader) SetWriteDeadline(t time.Time) error {
+ return sr.r.SetWriteDeadline(t)
+}
+
+func (sr *segmentReader) SetTimeout(timeout time.Duration) {
+ sr.r.SetTimeout(timeout)
+}
+
+func (sr *segmentReader) GetTimeout() time.Duration {
+ return sr.r.GetTimeout()
+}
+
+func (sr *segmentReader) Read(p []byte) (n int, err error) {
+ // If we don't have a read buffer, or it's empty, read the first
segment.
+ // If we have read all the frames from the current segment, read the
next segment.
+ // If segment is non self-container, it will read all segments and read
buffer will hold the full frame.
+ if sr.readBufferDecoded.Len() == 0 {
+ err = sr.readSegment()
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return sr.readBufferDecoded.Read(p)
+}
+
+func (sr *segmentReader) readSegment() error {
+ segment, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ // TODO: does only network related errors should result in
connection closure?
+ // var verr net.Error
+ // if errors.As(err, &verr) {
+ // return nil, false, verr
+ // }
+ return err
+ }
+
+ if isSelfContained {
+ // Reset the buffer to the new segment
+ // It might contain multiple frames so Read should be called
mutiple times to read all of them
+ sr.readBufferDecoded.Reset(segment)
+ return nil
+ }
+
+ frame, err := sr.readNonSelfContainedSegment(segment)
+ if err != nil {
+ return err
+ }
+
+ // Contains a single frame so we can read it all at once
+ sr.readBufferDecoded.Reset(frame)
+ return nil
+}
+
+// Non self-contained segment contains only part of a bigger frame that is
split into multiple segments.
+// Calling it results in a full frame being read into a single buffer.
+func (sr *segmentReader) readNonSelfContainedSegment(segment []byte) ([]byte,
error) {
+ frameHeader, err := readHeader(bytes.NewBuffer(segment),
sr.frameHeaderBuf[:])
+ if err != nil {
+ return nil, err
+ }
+
+ // Allocate a buffer to read the rest of the segment into
+ buf := bytes.NewBuffer(make([]byte, 0,
frameHeader.length+frameHeadSize))
+ buf.Write(segment)
+
+ // Computing how many bytes of message left to read
+ // len(segment) is the length of the first frame we already read
+ bytesToRead := frameHeader.length - len(segment) + frameHeadSize
+ err = sr.readPartialFrames(buf, bytesToRead)
+ if err != nil {
+ return nil, err
+ }
+
+ return buf.Bytes(), nil
+}
+
+// Reads parts of a bigger frame that is split into multiple segments into a
single buffer.
+// bytesToRead is the number of bytes left to read from the frame.
+// Called by readNonSelfContainedSegment.
+func (sr *segmentReader) readPartialFrames(dstBuf *bytes.Buffer, bytesToRead
int) error {
+ for bytesToRead > 0 {
+ frame, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+ if err != nil {
+ return err
+ }
+ // Expected to receive only non self-contained segments
+ if isSelfContained {
+ return errUnexpectedSelfcontainedSegment
Review Comment:
errUnexpectedSelfContainedSegment*
also why does this one deserve an actual error type and not the other errors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]