diff --git a/transfer.go b/transfer.go index 2b9abea..c00969c 100644 --- a/transfer.go +++ b/transfer.go @@ -2377,9 +2377,18 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { endTime := time.Now().Add(self.sendBufferSettings.CreateContractTimeout) + // Back off contract retries when the backend is unreachable. The normal + // retry interval against a dead control API is one of the main bandwidth-leak + // sources during an outage; 30s throttles that while still recovering promptly + // once the API returns. + contractRetryInterval := self.sendBufferSettings.CreateContractRetryInterval + if isBackendDegraded() { + contractRetryInterval = 30 * time.Second + } + if self.sendContract != nil { // there should be a queued up contract - if traceNextContract(min(self.sendBufferSettings.CreateContractTimeout, self.sendBufferSettings.CreateContractRetryInterval)) { + if traceNextContract(min(self.sendBufferSettings.CreateContractTimeout, contractRetryInterval)) { return true } } @@ -2405,13 +2414,17 @@ func (self *SendSequence) updateContract(messageByteCount ByteCount) bool { EncryptionRole: self.encryptionRole, EncryptionCompanion: self.encryptionCompanion, } - self.client.ContractManager().CreateContract( - contractKey, - self.contractSeqIndex, - ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), - ) - if traceNextContract(min(timeout, self.sendBufferSettings.CreateContractRetryInterval)) { + // Gate contract creation when backend is degraded to prevent bandwidth leak + if !isBackendDegraded() { + self.client.ContractManager().CreateContract( + contractKey, + self.contractSeqIndex, + ByteCount(32+float32(messageByteCount+messageByteCount+self.sendBufferSettings.MinMessageByteCount)/self.sendBufferSettings.ContractFillFraction), + ) + } + + if traceNextContract(min(timeout, contractRetryInterval)) { return true } } diff --git a/transfer_contract_manager.go b/transfer_contract_manager.go index 648cf6a..7a3c723 100644 --- a/transfer_contract_manager.go +++ b/transfer_contract_manager.go @@ -1084,6 +1084,8 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq []*protocol.Frame{frame}, func(resultFrames []*protocol.Frame, err error) { if err == nil { + // OOB round-trip succeeded: backend is reachable, clear degradation counter + consecutiveBackendFails.Store(0) for _, resultFrame := range resultFrames { self.HandleControlFrame(contractKey, resultFrame) } @@ -1092,7 +1094,15 @@ func (self *ContractManager) CreateContract(contractKey ContractKey, contractSeq case <-self.client.Done(): // no need to log warnings when the client closes default: - self.client.log.Infof("[contract]oob err = %s\n", err) + lastBackendFailNano.Store(time.Now().UnixNano()) + consecutiveBackendFails.Add(1) + if ok, suppressed := shouldLogOobErr(); ok { + if suppressed > 0 { + self.client.log.Infof("[contract]oob err = %s (%d suppressed)\n", err, suppressed) + } else { + self.client.log.Infof("[contract]oob err = %s\n", err) + } + } } } }, diff --git a/transport.go b/transport.go index d885ffa..37461a5 100644 --- a/transport.go +++ b/transport.go @@ -92,6 +92,87 @@ func (self *ClientAuth) ClientId() (Id, error) { return byJwt.ClientId, nil } +// Package-level atomics shared across all PlatformTransport instances, +// rate-limiting auth error log lines to at most once per minute and tracking +// how many were suppressed in the interval. +var lastAuthErrLogNano atomic.Int64 +var suppressedAuthErrCount atomic.Int64 + +// Package-level atomics for OOB error rate-limiting. +var lastOobErrLogNano atomic.Int64 +var suppressedOobErrCount atomic.Int64 + +// lastBackendFailNano is updated on every backend failure (auth or OOB), not +// rate-limited. Used by isBackendDegraded() as the recency guard. +var lastBackendFailNano atomic.Int64 + +// consecutiveBackendFails counts backend failures (auth or OOB) since the last +// success. Any successful connect or OOB result resets it to 0. A real platform +// outage drives this up fast because every attempt fails with nothing to reset +// it; isolated transient timeouts never accumulate because an interleaved +// success clears the count. isBackendDegraded() requires this to cross a +// threshold so one or two stray failures are not mistaken for an outage. +var consecutiveBackendFails atomic.Int64 + +// backendDegradedFailThreshold is the number of consecutive backend failures +// (with no intervening success) required before the backend is considered +// degraded. Set above the level of normal transient churn. +const backendDegradedFailThreshold = 3 + +// backendDegradedWindow is how recent the last failure must be for the counter +// to be trusted. Comfortably larger than the 60s reconnect-backoff cap so a real +// outage's retry attempts always read as recent, while a stale count left by an +// old blip on an idle provider does not. +const backendDegradedWindow = 2 * time.Minute + +// shouldLogAuthErr returns (true, suppressedCount) if a log line should be +// emitted, resetting the suppressed counter. Returns (false, 0) if suppressed. +func shouldLogAuthErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastAuthErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedAuthErrCount.Add(1) + return false, 0 + } + if !lastAuthErrLogNano.CompareAndSwap(last, now) { + suppressedAuthErrCount.Add(1) + return false, 0 + } + suppressed := suppressedAuthErrCount.Swap(0) + return true, suppressed +} + +// shouldLogOobErr returns (true, suppressedCount) if a [contract]oob err line +// should be emitted, resetting the suppressed counter. Returns (false, 0) if +// suppressed. Defined here so this PR is self-contained. +func shouldLogOobErr() (bool, int64) { + now := time.Now().UnixNano() + last := lastOobErrLogNano.Load() + if now-last < int64(time.Minute) { + suppressedOobErrCount.Add(1) + return false, 0 + } + if !lastOobErrLogNano.CompareAndSwap(last, now) { + suppressedOobErrCount.Add(1) + return false, 0 + } + suppressed := suppressedOobErrCount.Swap(0) + return true, suppressed +} + +// isBackendDegraded returns true when backend failures have accumulated past the +// threshold with no intervening success and the last failure is recent. This +// distinguishes a sustained, broad outage (every attempt failing) from the +// isolated single-connection timeouts that are normal churn on a busy provider. +// Contract creation and resend pacing consult this to avoid leaking bandwidth +// against an unreachable platform. +func isBackendDegraded() bool { + if consecutiveBackendFails.Load() < backendDegradedFailThreshold { + return false + } + return time.Now().UnixNano()-lastBackendFailNano.Load() < int64(backendDegradedWindow) +} + // (ctx, network, address) // type DialContextFunc func(ctx context.Context, network string, address string) (net.Conn, error) @@ -494,7 +575,15 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { ws, err = connect() } if err != nil { - self.log.Infof("[t]auth error %s = %s\n", clientId, err) + lastBackendFailNano.Store(time.Now().UnixNano()) + consecutiveBackendFails.Add(1) + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + self.log.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + self.log.Infof("[t]auth error %s = %s\n", clientId, err) + } + } select { case <-self.ctx.Done(): return @@ -503,6 +592,10 @@ func (self *PlatformTransport) runH1(initialTimeout time.Duration) { } } + // auth succeeded: backend is reachable, clear degradation counter + lastBackendFailNano.Store(0) + consecutiveBackendFails.Store(0) + c := func() { defer ws.Close() @@ -1079,7 +1172,15 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D connStream, err = connect() } if err != nil { - self.log.Infof("[t]auth error %s = %s\n", clientId, err) + lastBackendFailNano.Store(time.Now().UnixNano()) + consecutiveBackendFails.Add(1) + if ok, suppressed := shouldLogAuthErr(); ok { + if suppressed > 0 { + self.log.Infof("[t]auth error %s = %s (%d suppressed)\n", clientId, err, suppressed) + } else { + self.log.Infof("[t]auth error %s = %s\n", clientId, err) + } + } select { case <-self.ctx.Done(): return @@ -1087,6 +1188,11 @@ func (self *PlatformTransport) runH3(ptMode TransportMode, initialTimeout time.D continue } } + + // auth succeeded: backend is reachable, clear degradation counter + lastBackendFailNano.Store(0) + consecutiveBackendFails.Store(0) + conn := connStream.conn stream := connStream.stream