Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2377,9 +2377,18 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool {

endTime := time.Now().Add(self.sendBufferSettings.CreateContractTimeout)

// Back off contract retries when the backend is unreachable. The normal
// retry interval against a dead control API is one of the main bandwidth-leak
// sources during an outage; 30s throttles that while still recovering promptly
// once the API returns.
contractRetryInterval := self.sendBufferSettings.CreateContractRetryInterval
if isBackendDegraded() {
contractRetryInterval = 30 * time.Second
}

if self.sendContract != nil {
// there should be a queued up contract
if traceNextContract(min(self.sendBufferSettings.CreateContractTimeout, self.sendBufferSettings.CreateContractRetryInterval)) {
if traceNextContract(min(self.sendBufferSettings.CreateContractTimeout, contractRetryInterval)) {
return true
}
}
Expand All @@ -2405,13 +2414,17 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool {
EncryptionRole: self.encryptionRole,
EncryptionCompanion: self.encryptionCompanion,
}
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)

if traceNextContract(min(timeout, self.sendBufferSettings.CreateContractRetryInterval)) {
// Gate contract creation when backend is degraded to prevent bandwidth leak
if !isBackendDegraded() {
self.client.ContractManager().CreateContract(
contractKey,
self.contractSeqIndex,
ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction),
)
}

if traceNextContract(min(timeout, contractRetryInterval)) {
return true
}
}
Expand Down
12 changes: 11 additions & 1 deletion transfer_contract_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,8 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq
[]*protocol.Frame{frame},
func(resultFrames []*protocol.Frame, err error) {
if err == nil {
// OOB round-trip succeeded: backend is reachable, clear degradation counter
consecutiveBackendFails.Store(0)
for _, resultFrame := range resultFrames {
self.HandleControlFrame(contractKey, resultFrame)
}
Expand All @@ -1092,7 +1094,15 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq
case <-self.client.Done():
// no need to log warnings when the client closes
default:
self.client.log.Infof("[contract]oob err = %s\n", err)
lastBackendFailNano.Store(time.Now().UnixNano())
consecutiveBackendFails.Add(1)
if ok, suppressed := shouldLogOobErr(); ok {
if suppressed > 0 {
self.client.log.Infof("[contract]oob err = %s (%d suppressed)\n", err, suppressed)
} else {
self.client.log.Infof("[contract]oob err = %s\n", err)
}
}
}
}
},
Expand Down
110 changes: 108 additions & 2 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,87 @@ func (self *ClientAuth) ClientId() (Id, error) {
return byJwt.ClientId, nil
}

// Package-level atomics shared across all PlatformTransport instances,
// rate-limiting auth error log lines to at most once per minute and tracking
// how many were suppressed in the interval.
var lastAuthErrLogNano atomic.Int64
var suppressedAuthErrCount atomic.Int64

// Package-level atomics for OOB error rate-limiting.
var lastOobErrLogNano atomic.Int64
var suppressedOobErrCount atomic.Int64

// lastBackendFailNano is updated on every backend failure (auth or OOB), not
// rate-limited. Used by isBackendDegraded() as the recency guard.
var lastBackendFailNano atomic.Int64

// consecutiveBackendFails counts backend failures (auth or OOB) since the last
// success. Any successful connect or OOB result resets it to 0. A real platform
// outage drives this up fast because every attempt fails with nothing to reset
// it; isolated transient timeouts never accumulate because an interleaved
// success clears the count. isBackendDegraded() requires this to cross a
// threshold so one or two stray failures are not mistaken for an outage.
var consecutiveBackendFails atomic.Int64

// backendDegradedFailThreshold is the number of consecutive backend failures
// (with no intervening success) required before the backend is considered
// degraded. Set above the level of normal transient churn.
const backendDegradedFailThreshold = 3

// backendDegradedWindow is how recent the last failure must be for the counter
// to be trusted. Comfortably larger than the 60s reconnect-backoff cap so a real
// outage's retry attempts always read as recent, while a stale count left by an
// old blip on an idle provider does not.
const backendDegradedWindow = 2 * time.Minute

// shouldLogAuthErr returns (true, suppressedCount) if a log line should be
// emitted, resetting the suppressed counter. Returns (false, 0) if suppressed.
func shouldLogAuthErr() (bool, int64) {
now := time.Now().UnixNano()
last := lastAuthErrLogNano.Load()
if now-last < int64(time.Minute) {
suppressedAuthErrCount.Add(1)
return false, 0
}
if !lastAuthErrLogNano.CompareAndSwap(last, now) {
suppressedAuthErrCount.Add(1)
return false, 0
}
suppressed := suppressedAuthErrCount.Swap(0)
return true, suppressed
}

// shouldLogOobErr returns (true, suppressedCount) if a [contract]oob err line
// should be emitted, resetting the suppressed counter. Returns (false, 0) if
// suppressed. Defined here so this PR is self-contained.
func shouldLogOobErr() (bool, int64) {
now := time.Now().UnixNano()
last := lastOobErrLogNano.Load()
if now-last < int64(time.Minute) {
suppressedOobErrCount.Add(1)
return false, 0
}
if !lastOobErrLogNano.CompareAndSwap(last, now) {
suppressedOobErrCount.Add(1)
return false, 0
}
suppressed := suppressedOobErrCount.Swap(0)
return true, suppressed
}

// isBackendDegraded returns true when backend failures have accumulated past the
// threshold with no intervening success and the last failure is recent. This
// distinguishes a sustained, broad outage (every attempt failing) from the
// isolated single-connection timeouts that are normal churn on a busy provider.
// Contract creation and resend pacing consult this to avoid leaking bandwidth
// against an unreachable platform.
func isBackendDegraded() bool {
if consecutiveBackendFails.Load() < backendDegradedFailThreshold {
return false
}
return time.Now().UnixNano()-lastBackendFailNano.Load() < int64(backendDegradedWindow)
}

// (ctx, network, address)
// type DialContextFunc func(ctx context.Context, network string, address string) (net.Conn, error)

Expand Down Expand Up @@ -494,7 +575,15 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
ws, err = connect()
}
if err != nil {
self.log.Infof("[t]auth error %s = %s\n", clientId, err)
lastBackendFailNano.Store(time.Now().UnixNano())
consecutiveBackendFails.Add(1)
if ok, suppressed := shouldLogAuthErr(); ok {
if suppressed > 0 {
self.log.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed)
} else {
self.log.Infof("[t]auth error %s = %s\n", clientId, err)
}
}
select {
case <-self.ctx.Done():
return
Expand All @@ -503,6 +592,10 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) {
}
}

// auth succeeded: backend is reachable, clear degradation counter
lastBackendFailNano.Store(0)
consecutiveBackendFails.Store(0)

c := func() {
defer ws.Close()

Expand Down Expand Up @@ -1079,14 +1172,27 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D
connStream, err = connect()
}
if err != nil {
self.log.Infof("[t]auth error %s = %s\n", clientId, err)
lastBackendFailNano.Store(time.Now().UnixNano())
consecutiveBackendFails.Add(1)
if ok, suppressed := shouldLogAuthErr(); ok {
if suppressed > 0 {
self.log.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed)
} else {
self.log.Infof("[t]auth error %s = %s\n", clientId, err)
}
}
select {
case <-self.ctx.Done():
return
case <-reconnect.After():
continue
}
}

// auth succeeded: backend is reachable, clear degradation counter
lastBackendFailNano.Store(0)
consecutiveBackendFails.Store(0)

conn := connStream.conn
stream := connStream.stream

Expand Down