Skip to content
Draft
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ a huge list of updates and fixes.
- SN no longer accepts objects with invocation or verification script bigger than 1KiB (#3887)
- Every object with non-zero payload is now paid, not only regular ones (#3856)
- Separate policer placement state from replica shortage (#3901)
- SN now forwards remote SN's response to the client as is (#3877)

### Removed
- `node.persistent_sessions.path` config option from SN config (#3846)
Expand Down
25 changes: 25 additions & 0 deletions internal/protobuf/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
1 + 3 + object.MaxHeaderLen
)

// Common response field numbers.
const (
FieldResponseBody = 1
FieldResponseMetaHeader = 2
FieldResponseVerificationHeader = 3
)

// ParseAPIVersionField parses version.Version from the next field with known
// number and type at given offset. Also returns field length.
func ParseAPIVersionField(buf []byte, fNum protowire.Number, fTyp protowire.Type) (version.Version, int, error) {
Expand Down Expand Up @@ -254,3 +261,21 @@ func ParseAttribute(buf []byte, fNum protowire.Number, fTyp protowire.Type) ([]b

return k, v, nf + lnf, nil
}

// // VerifyObjectSplitInfo checks whether buf is a valid object split info
// // protobuf.
// //
// // Absense of any fields is ignored. Unknown fields are allowed and checked.
// // Repeating fields is allowed.
// func VerifyObjectSplitInfo(buf []byte) error {
// return verifyMessage(buf, objectSplitInfoMessageScheme)
// }
//
// // VerifyObjectHeaderWithOrder checks whether buf is a valid object header
// // protobuf. If so, direct field order flag is returned.
// //
// // Absense of any fields is ignored. Unknown fields are allowed and checked.
// // Repeating fields is allowed.
// func VerifyObjectHeaderWithOrder(buf []byte) (bool, error) {
// return verifyMessageWithOrder(buf, objectHeaderScheme, true, interceptors{})
// }
187 changes: 187 additions & 0 deletions internal/protobuf/buffers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package protobuf

import (
"hash"
"io"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -95,3 +97,188 @@ func (x *MemBufferPool) Get() *MemBuffer {
item.refs.Store(1)
return item
}

// TODO: docs.
type BuffersSlice struct {
buffers mem.BufferSlice
curOff int
lastTo int
}

func (x *BuffersSlice) Reset(buffers mem.BufferSlice) {
x.buffers = buffers
x.curOff = 0
x.lastTo = buffers[len(buffers)-1].Len()
}

// TODO: docs.
func NewBuffersSlice(buffers mem.BufferSlice) BuffersSlice {
if len(buffers) == 0 {
return BuffersSlice{}
}

return BuffersSlice{
buffers: buffers,
curOff: 0,
lastTo: buffers[len(buffers)-1].Len(),
}
}

func (x BuffersSlice) IsEmpty() bool {
return x.Len() == 0
}

func (x *BuffersSlice) buffersSeq(yield func([]byte) bool) {
if len(x.buffers) == 0 {
return
}

if len(x.buffers) == 1 {
yield(x.buffers[0].ReadOnlyData()[x.curOff:x.lastTo])
return
}

if !yield(x.buffers[0].ReadOnlyData()[x.curOff:]) {
return
}

for i := range len(x.buffers) - 2 {
if !yield(x.buffers[i+1].ReadOnlyData()) {
return
}
}

yield(x.buffers[len(x.buffers)-1].ReadOnlyData()[:x.lastTo])
}

func (x *BuffersSlice) bytesSeq(yield func(byte) bool) {
var buf []byte
for {
buf = x.buffers[0].ReadOnlyData()
if len(x.buffers) == 1 {
buf = buf[:x.lastTo]
}

for x.curOff < len(buf) {
cnt := yield(buf[x.curOff])
x.curOff++
if !cnt {
if x.curOff == len(buf) {
x.buffers = x.buffers[1:]
x.curOff = 0
}
return
}
}

x.buffers = x.buffers[1:]
x.curOff = 0

if x.IsEmpty() {
return
}
}
}

func (x *BuffersSlice) MoveNext(n int) (BuffersSlice, error) {
if len(x.buffers) == 0 {
if n > 0 {
return BuffersSlice{}, io.ErrUnexpectedEOF
}
return BuffersSlice{}, nil
}

sub := *x
var ln int

for i := 0; ; i++ {
if i == 0 {
if len(x.buffers) == 1 {
ln = x.lastTo - x.curOff
} else {
ln = x.buffers[0].Len() - x.curOff
}
} else if i < len(x.buffers)-1 {
ln = x.buffers[i].Len()
} else {
ln = x.lastTo
}

if n > ln {
if i == len(x.buffers)-1 {
break
}
n -= ln
continue
}

if n < ln {
x.buffers = x.buffers[i:]
if i == 0 {
x.curOff += n
} else {
x.curOff = 0
}
} else {
x.buffers = x.buffers[i+1:]
x.curOff = 0
}

sub.buffers = sub.buffers[:i+1]
sub.lastTo = n
if i == 0 {
sub.lastTo += sub.curOff
}
return sub, nil
}

return BuffersSlice{}, io.ErrUnexpectedEOF
}

// TODO: docs.
func (x *BuffersSlice) ReadOnlyData() []byte {
if len(x.buffers) == 0 {
return nil
}

if len(x.buffers) == 1 {
return x.buffers[0].ReadOnlyData()[x.curOff:x.lastTo]
}

buf := make([]byte, x.Len())
return buf[:x.CopyTo(buf)]
}

func (x BuffersSlice) Len() int {
if len(x.buffers) == 0 {
return 0
}

var ln int
for buf := range x.buffersSeq {
ln += len(buf)
}

return ln
}

func (x BuffersSlice) HashTo(h hash.Hash) {
if len(x.buffers) == 0 {
return
}

for buf := range x.buffersSeq {
h.Write(buf)
}
}

func (x BuffersSlice) CopyTo(dst []byte) int {
if len(x.buffers) == 0 {
return 0
}
var n int
for buf := range x.buffersSeq {
n += copy(dst[n:], buf)
}
return n
}
19 changes: 15 additions & 4 deletions internal/protobuf/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ type BufferedCodec struct{}

// Marshal implements [encoding.CodecV2].
func (BufferedCodec) Marshal(msg any) (mem.BufferSlice, error) {
if bs, ok := msg.(mem.Buffer); ok {
return mem.BufferSlice{bs}, nil
switch v := msg.(type) {
case mem.BufferSlice:
return v, nil
case mem.Buffer:
return mem.BufferSlice{v}, nil
default:
return encoding.GetCodecV2(proto.Name).Marshal(msg)
}
return encoding.GetCodecV2(proto.Name).Marshal(msg)
}

// Unmarshal implements [encoding.CodecV2].
func (BufferedCodec) Unmarshal(data mem.BufferSlice, msg any) error {
return encoding.GetCodecV2(proto.Name).Unmarshal(data, msg)
switch v := msg.(type) {
case *mem.BufferSlice:
data.Ref()
*v = data
return nil
default:
return encoding.GetCodecV2(proto.Name).Unmarshal(data, msg)
}
}

// Name implements [encoding.CodecV2].
Expand Down
6 changes: 6 additions & 0 deletions internal/protobuf/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ func NewRepeatedFieldError(n protowire.Number) error {
func NewUnsupportedFieldError(n protowire.Number, t protowire.Type) error {
return fmt.Errorf("unsupported field #%d of type %v", n, t)
}

// NewInvalidUTF8Error returns common error for string field #n containing
// invalid UTF-8.
func NewInvalidUTF8Error(n protowire.Number) error {
return fmt.Errorf("string field #%d contains invalid UTF-8", n)
}
Loading
Loading