Skip to content

Improve contiguous data P2P retrieval #612

@djwhitt

Description

@djwhitt

Improve Contiguous Data P2P Retrieval

Problem

The gateway's contiguous data retrieval from AR.IO peers and trusted gateways has several gaps that become more important as the network grows:

  1. No per-peer outbound concurrency limiting — There is no cap on concurrent outbound requests to a given peer or trusted gateway. Under load, many concurrent request handlers can all select the same high-weight peer, overwhelming it. This is bad neighbor behavior in a cooperative network and ties up local connections on slow peers.

  2. Weight system blind spot — The peer weight system (ArIOPeerManager) handles quality (which peers are fast/reliable) but not quantity (how much load is placed on any single peer). High-performing peers get higher weights, which means they get selected more often under load — the opposite of what a healthy network wants.

  3. No backpressure from saturated peers — When a peer is slow or overloaded, there's no mechanism to divert traffic to other peers until the per-request timeout fires (10 seconds). Concurrency limits would create natural backpressure: once a peer's slots fill, requests route elsewhere immediately.

  4. Sequential peer attempts with no hedgingArIODataSource.getData() tries peers one at a time. If the first peer is slow but doesn't fail, the full 10s timeout must elapse before the next peer is tried. Tail latency = retryCount * requestTimeoutMs.

  5. No weight recovery from transient failures — Peers penalized to weight 1 after a brief outage recover only through successful requests. But at weight 1 they're rarely selected (weighted random), creating a negative feedback loop where recovery takes hours.

  6. All failures treated equallyreportFailure() applies a flat -temperatureDelta regardless of cause. A 404 (peer lacks data), 503 (temporarily overloaded), and connection refused (peer down) all get the same penalty, despite very different implications.

  7. Candidate pool tied to retry countselectPeers(totalRetryCount) means the number of peers considered equals the max retry count (capped at 3). These are different concerns: you may want a larger candidate pool to draw from while still limiting total attempts.

  8. No local peer list cacheArIOPeerManager fetches the peer list from the AO network process on startup (fire-and-forget, not awaited). If the AO process is unreachable, this.peers stays {} and all P2P retrieval is dead until the next hourly refresh succeeds. A single bad startup can leave the gateway peerless for up to an hour.

  9. No data locality in peer selection — Peer selection for contiguous data is purely weight-based with no consideration of which peers are likely to have the requested data. Two requests for the same ID may route to completely different peers, wasting cache potential. The chunk system has bucket-aware selection (selectBucketPeersForOffset), but contiguous data has no equivalent. This means every request is a cold miss from the peer's perspective unless it happens to have the data from an unrelated request.

  10. Registry metadata discardedupdatePeerList() receives rich per-gateway metadata from the AR.IO registry (composite weight, performance ratio, epoch pass/fail stats, stake, tenure) but discards everything except the URL. Initial peer weights default to 50 for all peers regardless of network-observed quality, ignoring free quality signals.

This is an umbrella issue for iterative improvements to contiguous data P2P retrieval. We'll add items here as we identify them.

Requirements

Must Have

  • Per-peer outbound concurrency limiter — Shared concurrency limiter that caps concurrent outbound requests to any single peer or trusted gateway

    • Configurable via PEER_MAX_CONCURRENT_OUTBOUND (default TBD, likely 5-10)
    • Configurable via TRUSTED_GATEWAY_MAX_CONCURRENT_OUTBOUND (default TBD, likely 10-20, or shared setting)
    • When a peer's slots are full, skip it and try the next peer (fail-fast, don't queue)
    • Single shared instance across ArIODataSource and GatewaysDataSource
    • Lazy per-peer initialization (don't pre-allocate for all known peers)
  • Hedged requests — Fire parallel requests after a configurable delay to eliminate tail latency from slow peers

    • Configurable via PEER_HEDGE_DELAY_MS (default: 500ms, 0 = disabled)
    • Configurable via PEER_MAX_HEDGED_REQUESTS (default: 3, caps concurrent fan-out)
    • First peer starts immediately; each subsequent peer starts after hedge delay if no success yet
    • First success aborts all other in-flight requests via AbortController
    • Hard failures immediately trigger the next peer (no wait for hedge timer)
  • Failure-differentiated weight adjustment — Replace flat penalty with failure-type-aware multipliers

    • 404 (not found): 1x temperatureDelta — peer may lack data but isn't broken
    • 429/503 (overloaded): 0.2x temperatureDelta — transient, minimal long-term penalty
    • Timeout: 1x temperatureDelta — could be transient or systemic
    • Connection refused/DNS failure: 3x temperatureDelta — peer likely down
    • Other 5xx: 2x temperatureDelta — something is wrong with the peer
    • New PeerFailureType enum and classifyPeerFailure() helper
  • Local peer list cache — Persist the peer list to disk after each successful fetch; load on startup as fallback when the AO process is unreachable

    • Cache file at data/peers/ar-io-peers.json (follows existing data/ directory convention)
    • Write atomically (write to tmp file, rename) to avoid corrupted reads
    • On startup: attempt network fetch → on failure, load cached list → log warning with cache age
    • Include a timestamp in the cache file so staleness can be assessed
    • Optionally configurable max cache age via PEER_LIST_CACHE_MAX_AGE_MS (default: 86400000 = 24h); if cache is older, log a more urgent warning but still use it (stale peers > no peers)

Should Have

  • Metrics for outbound concurrency: active connections per peer, skip count due to saturation
  • Log warnings when peers are consistently saturated (indicates too-low limits or too-few peers)
  • Weight decay toward default — Exponential decay of weights toward default (50) over time
    • Configurable via PEER_WEIGHT_DECAY_HALF_LIFE_MS (default: 600000 = 10 min)
    • Track last-update timestamp per peer per category
    • Apply decay lazily on read (during peer selection), not on a timer
    • Prevents negative feedback loop: penalized peers gradually recover selection probability
    • Also decays over-boosted peers, preventing permanently high weights from early luck
    • Example: weight 1 → 25 after 10min → 37 after 20min → 44 after 30min → ~50 after 60min
  • Decouple candidate pool from retry count — Select a larger pool of candidate peers, attempt a smaller number
    • Configurable via PEER_CANDIDATE_COUNT (default: 5-10)
    • selectPeers(candidateCount) picks from a wider pool; maxAttempts caps actual requests
    • Enables hedged requests to draw from a deeper bench without increasing max retries
  • Consistent hash routing for cache locality — Use consistent hashing on data IDs to create stable peer affinity, so repeated requests for the same data route to the same peers and benefit from their cache
    • Hash each peer to positions on a ring (using URL or wallet address); for a given data ID, select the N closest peers as its "home set"
    • Try the home set first (ranked by weight within the set), fall back to general weighted selection on miss
    • Creates emergent cache specialization: each peer naturally accumulates a warm cache for "its" slice of the ID space
    • Stable mapping — same ID always tries the same peers, strictly better for cache hit rates than random weighted selection
    • Composes with weight system: home set provides candidate ordering, weights rank within the set
    • Graceful degradation: if a home-set peer goes down, the ring falls through to the next peer, which builds its own warm cache over time
    • IDs are SHA-256 hashes so distribution across the ring is inherently uniform
  • ArNS peer affinity — Track which peers have recently served specific ArNS names successfully, prefer them for subsequent requests to the same name
    • Small in-memory map: Map<arnsName, { peers: string[], lastUpdated: number }>
    • On successful ArNS data retrieval, record the peer for that name
    • On subsequent requests, try recently-successful peers first before falling back to consistent hash routing or general selection
    • requestAttributes.arnsName is already passed to getData(), so the name is available at selection time
    • Entries expire after a TTL (e.g., 30 min) to adapt to changing conditions
    • Targets highest-value requests: ArNS names are the front door for most user traffic (dApp homepages, frequently accessed content)
    • Bounded size: thousands of names at most, negligible memory
  • Seed initial weights from registry quality scores — Use compositeWeight / gatewayPerformanceRatio / epoch stats from the AR.IO gateway registry to seed initial per-peer weights instead of defaulting all peers to 50
    • Store additional gateway metadata alongside the URL in ArIOPeerManager
    • Map normalizedCompositeWeight (0–1) to the weight range (1–100) for initial weight
    • Local weight adjustments (success/failure) still apply on top, so the system adapts from an informed baseline
    • Avoids wasting early requests on peers the network already knows are poor performers

Could Have (Future)

  • Optional brief queueing before skip (wait N ms for a slot before skipping)
  • Integration with weight system: factor saturation into peer selection
  • Adaptive concurrency limits based on peer response characteristics
  • Latency variance tracking — Track p50/p95 TTFB per peer using circular buffers, factor stability into weight adjustments (stable peers get bonus, volatile peers get penalty)

Technical Notes

Implementation Approach: Per-Peer Concurrency Limiter

  1. New module src/data/outbound-request-limiter.ts — Wraps a Map<string, pLimit> with lazy per-peer initialization. Exposes tryAcquire(peerUrl) to check/reserve a slot and a release function.

  2. Modify ArIODataSource — In the peer retry loop, check if the peer has available slots before requesting. If saturated, skip to next peer.

  3. Modify GatewaysDataSource — Same pattern in the gateway tier loop. Skip saturated gateways, try next in tier.

  4. Wire in system.ts — Create one shared limiter instance, inject into both data sources.

  5. Config in src/config.ts — New env vars for concurrency limits.

Implementation Approach: Hedged Requests

Replace the sequential for-loop in ArIODataSource.getData() with a hedged request pattern:

Request timeline (hedge delay = 300ms):

t=0ms    → Request to Peer A
t=300ms  → Peer A hasn't responded → Request to Peer B (hedge)
t=450ms  → Peer B responds first → abort Peer A, return Peer B's data

Key design decisions:

  • Use AbortController to cancel losing requests (existing abort signal pattern)
  • Hard failures short-circuit the hedge delay — next peer starts immediately
  • Cap max concurrent hedged requests to limit fan-out (default 3)
  • If hedging disabled (PEER_HEDGE_DELAY_MS=0), fall back to existing sequential behavior

Implementation Approach: Failure Differentiation

Add a classifyPeerFailure(error) helper that maps errors to failure types:

function classifyPeerFailure(error: any): PeerFailureType {
  if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') return 'connection_error';
  if (error.name === 'AbortError' || error.code === 'ETIMEDOUT') return 'timeout';
  const status = error.response?.status;
  if (status === 404) return 'not_found';
  if (status === 429 || status === 503) return 'overloaded';
  if (status >= 500) return 'server_error';
  return 'client_error';
}

Modify reportFailure() in ArIOPeerManager to accept the failure type and apply a multiplier to the temperature delta.

Implementation Approach: Weight Decay

Apply decay lazily in _selectPeersUncached() before building the weighted table:

// Exponential decay: newWeight = default + (current - default) * 0.5^(elapsed/halfLife)
const decayFactor = Math.pow(0.5, elapsed / halfLife);
const newWeight = defaultWeight + (weight - defaultWeight) * decayFactor;

Track timestamps alongside weights in a parallel Map<WeightCategory, Map<string, number>>. Update timestamps on every reportSuccess/reportFailure call.

Implementation Approach: Local Peer List Cache

Persist the peer list to disk after each successful updatePeerList() call. On startup, if the network fetch fails, load the cached list as a fallback.

Cache file format (data/peers/ar-io-peers.json):

{
  "updatedAt": 1707753600000,
  "peers": {
    "walletAddress1": "https://peer1.example.com",
    "walletAddress2": "https://peer2.example.com"
  }
}

Key changes to ArIOPeerManager:

  1. Constructor — Accept an optional peerCachePath parameter. On initialization (when initialPeers not provided), attempt network fetch, then fall back to cache:
// In constructor, replace fire-and-forget updatePeerList() with:
this.initializePeers();

private async initializePeers(): Promise<void> {
  try {
    await this.updatePeerList();
  } catch {
    await this.loadCachedPeerList();
  }
  // Start periodic refresh regardless
  this.intervalId = setInterval(
    this.updatePeerList.bind(this),
    this.updatePeersRefreshIntervalMs,
  );
}
  1. After successful updatePeerList() — Write cache atomically:
// At end of updatePeerList(), after this.peers = peers:
if (Object.keys(peers).length > 0) {
  await this.writePeerCache(peers);
}
  1. Atomic write — Write to a temp file, then rename:
private async writePeerCache(peers: Record<string, string>): Promise<void> {
  const data = JSON.stringify({ updatedAt: Date.now(), peers }, null, 2);
  const tmpPath = `${this.peerCachePath}.tmp`;
  await fs.writeFile(tmpPath, data);
  await fs.rename(tmpPath, this.peerCachePath);
}
  1. Load with staleness warning:
private async loadCachedPeerList(): Promise<void> {
  const data = JSON.parse(await fs.readFile(this.peerCachePath, 'utf-8'));
  const age = Date.now() - data.updatedAt;
  if (age > this.peerCacheMaxAgeMs) {
    log.warn('Using stale peer list cache', { ageMs: age });
  } else {
    log.info('Loaded peer list from local cache', { ageMs: age, count: Object.keys(data.peers).length });
  }
  this.peers = data.peers;
  // Initialize category weights for cached peers
}

Implementation Approach: Consistent Hash Routing

Use a consistent hash ring to map data IDs to a stable "home set" of peers. This creates cache locality without protocol changes or network coordination.

Data structure:

class PeerHashRing {
  // Virtual nodes for even distribution (e.g., 50 vnodes per peer)
  private ring: Map<number, string>; // hash position → peer URL
  private sortedPositions: number[];
  private vnodeCount: number;

  addPeer(peerUrl: string): void {
    for (let i = 0; i < this.vnodeCount; i++) {
      const hash = hashToPosition(`${peerUrl}:${i}`);
      this.ring.set(hash, peerUrl);
    }
    this.sortedPositions = [...this.ring.keys()].sort((a, b) => a - b);
  }

  getHomeSet(dataId: string, count: number): string[] {
    const position = hashToPosition(dataId);
    // Walk clockwise from position, collect unique peers
    const peers: string[] = [];
    let idx = binarySearchClosest(this.sortedPositions, position);
    while (peers.length < count && peers.length < this.peerCount) {
      const peer = this.ring.get(this.sortedPositions[idx % this.sortedPositions.length]);
      if (!peers.includes(peer)) peers.push(peer);
      idx++;
    }
    return peers;
  }
}

Integration with ArIODataSource.getData():

Peer selection for data ID X:
  1. homeSet = hashRing.getHomeSet(X, homeSetSize)      // stable candidates
  2. ranked = rankByWeight(homeSet)                       // quality ordering
  3. filtered = filterByConcurrencySlots(ranked)          // skip saturated
  4. fallback = generalWeightedSelection(excludeHomeSet)  // if home set exhausted
  5. candidates = [...filtered, ...fallback]

Ring maintenance:

  • Rebuild ring on updatePeerList() (peers join/leave)
  • Ring is read-heavy, write-infrequent — simple sorted array is sufficient
  • Virtual nodes (50 per peer) ensure even distribution despite varying peer count

Implementation Approach: ArNS Peer Affinity

A lightweight learned routing layer specifically for ArNS name resolution.

interface ArNSPeerAffinity {
  peers: string[];        // recently successful peers, most recent first
  lastUpdated: number;
}

class ArNSAffinityCache {
  private cache: Map<string, ArNSPeerAffinity> = new Map();
  private maxPeersPerName = 3;
  private ttlMs = 1_800_000; // 30 minutes

  recordSuccess(arnsName: string, peer: string): void {
    const entry = this.cache.get(arnsName) ?? { peers: [], lastUpdated: 0 };
    // Move peer to front, dedup, cap size
    entry.peers = [peer, ...entry.peers.filter(p => p !== peer)]
      .slice(0, this.maxPeersPerName);
    entry.lastUpdated = Date.now();
    this.cache.set(arnsName, entry);
  }

  getAffinityPeers(arnsName: string): string[] {
    const entry = this.cache.get(arnsName);
    if (!entry || Date.now() - entry.lastUpdated > this.ttlMs) return [];
    return entry.peers;
  }
}

Integration with ArIODataSource.getData():

Peer selection for ArNS request "ardrive":
  1. affinityPeers = arnsAffinity.getAffinityPeers("ardrive")  // learned fast-path
  2. homeSet = hashRing.getHomeSet(resolvedId, homeSetSize)     // consistent hash
  3. general = generalWeightedSelection(...)                     // fallback
  4. candidates = dedupe([...affinityPeers, ...homeSet, ...general])

The ArNS layer is a targeted optimization on top of consistent hashing. It handles the common case (popular names requested repeatedly) while consistent hashing handles the general case (all data IDs).

Implementation Approach: Seed Weights from Registry

Retain gateway metadata during updatePeerList() and use it for initial weight seeding:

// In updatePeerList(), instead of just storing URL:
for (const gateway of items) {
  peers[gateway.gatewayAddress] = {
    url: `${gateway.settings.protocol}://${gateway.settings.fqdn}`,
    compositeWeight: gateway.weights?.normalizedCompositeWeight,
    performanceRatio: gateway.weights?.gatewayPerformanceRatio,
    passRate: gateway.stats.prescribedEpochCount > 0
      ? gateway.stats.passedEpochCount / gateway.stats.prescribedEpochCount
      : undefined,
  };
}

// When initializing weights for a new peer:
const registryWeight = peerMetadata.compositeWeight ?? peerMetadata.performanceRatio;
const initialWeight = registryWeight !== undefined
  ? Math.max(MIN_WEIGHT, Math.round(registryWeight * MAX_WEIGHT))
  : DEFAULT_WEIGHT;

This requires changing the peer storage from Record<string, string> to Record<string, PeerMetadata>, which affects getPeers(), getPeerUrls(), and callers. The URL-only interface can be preserved with accessor methods.

Existing Patterns to Follow

  • p-limit (v6.2.0) already in dependencies, used in CompositeChunkDataSource and ArweaveCompositeClient
  • ArIOChunkSource uses parallel peer requests with concurrency control
  • RebroadcastingChunkSource uses p-limit for outbound concurrency + limiter TokenBucket for rate limiting
  • Chunk rebroadcasting config pattern: CHUNK_REBROADCAST_MAX_CONCURRENT
  • AbortSignal.any() and AbortSignal.timeout() used throughout for combined signal handling

Strategic Fit

These improvements create a multi-signal peer selection system:

  • Weights = quality signal ("prefer this peer")
  • Concurrency limits = load signal ("not too much at once")
  • Hedging = latency signal ("don't wait, try another")
  • Decay = freshness signal ("re-evaluate stale judgments")
  • Failure types = diagnostic signal ("understand why it failed")
  • Hash ring = locality signal ("this peer likely has it cached")
  • ArNS affinity = learned signal ("this peer served this name before")
  • Registry seeding = network signal ("the network says this peer is good")

Implementation Order

# Improvement Impact Complexity Dependencies
1 Local peer list cache High Low None
2 Seed weights from registry Medium Low None
3 Weight decay High Low None
4 Failure differentiation Medium Low None
5 Decouple candidate/retry Low Low None (enables #7)
6 Per-peer concurrency limiter High Moderate None
7 Hedged requests High Moderate #5 (benefits from larger pool)
8 Consistent hash routing High Moderate None
9 ArNS peer affinity Medium Low #8 (falls back to hash ring)
10 Latency variance tracking Medium Medium None

Key Files

File Change
src/data/outbound-request-limiter.ts NEW — Per-peer concurrency limiter
src/data/ar-io-data-source.ts Integrate limiter, hedged requests, failure classification
src/data/gateways-data-source.ts Integrate limiter in gateway tier loop
src/peers/ar-io-peer-manager.ts Weight decay, failure-type-aware penalties, timestamp tracking, peer cache, registry seeding
src/data/peer-hash-ring.ts NEW — Consistent hash ring for data ID → peer mapping
src/data/arns-affinity-cache.ts NEW — Learned ArNS name → peer affinity cache
src/system.ts Create and inject shared limiter, hash ring, ArNS affinity cache
src/config.ts New env vars for all features
src/data/ar-io-data-source.test.ts Tests for hedging, skip-on-saturated, failure classification, hash ring selection
src/data/gateways-data-source.test.ts Tests for skip-on-saturated behavior
src/peers/ar-io-peer-manager.test.ts Tests for decay, failure differentiation, registry seeding
src/data/peer-hash-ring.test.ts Tests for ring distribution, home set stability, peer add/remove
src/data/arns-affinity-cache.test.ts Tests for TTL expiry, peer recording, size bounds

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions