Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,6 @@ test/*.png
test/*.dot
test/logs
__pycache__

# Python virtual env
venv/
1 change: 1 addition & 0 deletions Moblin/Media/AdaptiveBitrate/AdaptiveBitrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct StreamStats {
let latency: Int32?
let mbpsSendRate: Double?
let relaxed: Bool?
var sendBufferUtilization: Double?

// To not push too high bitrate after static scene. The encoder may output way
// lower bitrate than configured.
Expand Down
88 changes: 88 additions & 0 deletions Moblin/Media/AdaptiveBitrate/AdaptiveBitrateRtmp.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import Foundation

class AdaptiveBitrateRtmp: AdaptiveBitrate {
private var currentBitrate: Int64
private let minBitrate: Int64
private var maxBitrate: Int64

private var lastDropTime: ContinuousClock.Instant = .now
private var lastIncreaseTime: ContinuousClock.Instant = .now
private var lastKeyframeTime: ContinuousClock.Instant = .now

// Configuração conservadora-média (ajustável)
private let aggressiveDropThreshold = 0.82 // sendBufferUtilization
private let rttWarningThresholdMs = 350.0
private let recoveryCooldown: Duration = .seconds(2.5) // segundos após drop
private let keyframeProtectionWindow: Duration = .milliseconds(800) // janela de proteção

init(targetBitrate: UInt32, delegate: any AdaptiveBitrateDelegate) {
maxBitrate = Int64(targetBitrate)
currentBitrate = Int64(targetBitrate)
minBitrate = 800_000 // default min
super.init(delegate: delegate)
}

override func setTargetBitrate(bitrate: UInt32) {
maxBitrate = Int64(bitrate)
}

override func getCurrentBitrate() -> UInt32 {
UInt32(currentBitrate)
}

override func getCurrentMaximumBitrateInKbps() -> Int64 {
maxBitrate / 1000
}

func notifyKeyframeSent() {
lastKeyframeTime = .now
}

override func update(stats: StreamStats) {
super.update(stats: stats)

let now: ContinuousClock.Instant = .now
let utilization = stats.sendBufferUtilization ?? 0.0
let rtt = stats.rttMs
let timeSinceLastKeyframe = lastKeyframeTime.duration(to: now)
let isInKeyframeWindow = timeSinceLastKeyframe < keyframeProtectionWindow

// === DROP AGRESSIVO (Congestionamento) ===
if utilization > aggressiveDropThreshold || rtt > rttWarningThresholdMs, !isInKeyframeWindow {
let reductionFactor = utilization > 0.92 ? 0.55 : 0.68
let newBitrate = Int64(Double(currentBitrate) * reductionFactor)

if newBitrate < currentBitrate && lastDropTime.duration(to: now) > .seconds(1.0) {
currentBitrate = max(minBitrate, newBitrate)
lastDropTime = now
delegate?.adaptiveBitrateSetVideoStreamBitrate(bitrate: UInt32(currentBitrate))
let action = "Decrease (utilization: \(String(format: "%.2f", utilization))) [protected keyframe]"
logAdaptiveAcion(actionTaken: action)
return
}
}

// === RECUPERAÇÃO LENTA ===
let isLowPressure = utilization < 0.45 && rtt < 180.0
let isCooldownOver = lastDropTime.duration(to: now) > recoveryCooldown
let isIncreaseCooldownOver = lastIncreaseTime.duration(to: now) > .seconds(2.0)

if isLowPressure, isCooldownOver, isIncreaseCooldownOver {
let increase = Int64(Double(currentBitrate) * 0.09) // +9% por passo
let newBitrate = min(maxBitrate, currentBitrate + increase)

if newBitrate > currentBitrate {
currentBitrate = newBitrate
lastIncreaseTime = now
delegate?.adaptiveBitrateSetVideoStreamBitrate(bitrate: UInt32(currentBitrate))
let action = "Increase (utilization: \(String(format: "%.2f", utilization)))"
logAdaptiveAcion(actionTaken: action)
}
}
}

func reset() {
lastDropTime = .now
lastIncreaseTime = .now
}
}
66 changes: 66 additions & 0 deletions Moblin/Media/HaishinKit/Rtmp/RtmpChunkSerializer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import Foundation

class RtmpChunkSerializer {
func serialize(chunk: RtmpChunk, maximumChunkSize: Int) -> [Data] {
guard let message = chunk.message else {
return [chunk.encode()]
}

let payload = message.encoded
let headerType3 = RtmpChunkType.three.toBasicHeader(chunk.chunkStreamId)

let writer = ByteWriter()
writer.writeBytes(chunk.type.toBasicHeader(chunk.chunkStreamId))

if message.timestamp > 0xFFFFFF {
writer.writeUInt24(0xFFFFFF)
} else {
writer.writeUInt24(message.timestamp)
}
writer.writeUInt24(UInt32(payload.count))
writer.writeUInt8(message.type.rawValue)
if chunk.type == .zero {
writer.writeUInt32Le(message.streamId)
}
if message.timestamp > 0xFFFFFF {
writer.writeUInt32(message.timestamp)
}

let firstHeader = writer.data

// Calculate exact total size to prevent reallocation
let chunkCount = payload.isEmpty ? 1 : Int(ceil(Double(payload.count) / Double(maximumChunkSize)))
let totalSize = firstHeader.count + payload
.count + (chunkCount > 1 ? (chunkCount - 1) * headerType3.count : 0)

var buffer = Data(capacity: totalSize)
var results: [Data] = []
var offset = 0

if payload.isEmpty {
buffer.append(firstHeader)
results.append(buffer)
return results
}

while offset < payload.count {
let chunkSize = min(maximumChunkSize, payload.count - offset)
let isFirst = offset == 0

let header = isFirst ? firstHeader : headerType3

let startIdx = buffer.count
buffer.append(header)
// Use subdata bounds to prevent inline copying overhead
let payloadSlice = payload.subdata(in: offset ..< offset + chunkSize)
buffer.append(payloadSlice)
let endIdx = buffer.count

// subdata creates a shared view into the contiguous buffer, zero-copy at Swift level
results.append(buffer.subdata(in: startIdx ..< endIdx))

offset += chunkSize
}
return results
}
}
20 changes: 17 additions & 3 deletions Moblin/Media/HaishinKit/Rtmp/RtmpConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class RtmpConnection: @unchecked Sendable {
}
switch RtmpConnectionCode(rawValue: code) {
case .connectSuccess:
handleConnectSuccess()
handleConnectSuccess(data: data)
case .connectRejected:
handleConnectRejected(data: data)
case .connectClosed:
Expand All @@ -137,8 +137,13 @@ class RtmpConnection: @unchecked Sendable {
}
}

private func handleConnectSuccess() {
socket.maximumChunkSizeToServer = 1024 * 8
private func handleConnectSuccess(data: AsObject) {
stream?.enhancedCapabilities = RtmpEnhancedCapabilities.fromConnectResponse(data)

let targetBitrate = stream?.info.bitrateStats.value.latestSpeed ?? 0
// Reduce chunk size to 32KB / 64KB max to prevent HOL blocking on bad mobile networks
let chunkSize = targetBitrate > 4_000_000 ? 1024 * 64 : 1024 * 32
socket.maximumChunkSizeToServer = chunkSize
_ = socket.write(chunk: RtmpChunk(
type: .zero,
chunkStreamId: RtmpChunk.ChunkStreamId.control.rawValue,
Expand Down Expand Up @@ -200,6 +205,10 @@ class RtmpConnection: @unchecked Sendable {
"videoFunction": .number(Double(VideoFunction.clientSeek.rawValue)),
"pageUrl": .null,
"objectEncoding": .number(0),
"videoFourCcInfoMap": .object([
"hvc1": .object(["codecHeaderType": .string("sequence")]),
"av01": .object(["codecHeaderType": .string("sequence")]),
]),
],
arguments: []
)
Expand Down Expand Up @@ -298,6 +307,11 @@ extension RtmpConnection: RtmpSocketDelegate {
stream?.info.onWritten(sequence: totalBytesSent)
}

func socketGetCurrentBitrate() -> UInt32 {
let latestBytesPerSecond = stream?.info.bitrateStats.value.latestSpeed ?? 62500
return UInt32(latestBytesPerSecond * 8)
}

func socketDataReceived(data: Data) -> Data {
guard let chunk = currentChunk ?? RtmpChunk(data: data, size: socket.maximumChunkSizeFromServer)
else {
Expand Down
38 changes: 38 additions & 0 deletions Moblin/Media/HaishinKit/Rtmp/RtmpEnhancedVideo.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import AVFoundation
import CoreMedia
import Foundation

enum RtmpVideoFourCC: UInt32 {
case avc1 = 0x6176_6331 // H.264
case hvc1 = 0x6876_6331 // HEVC
case av01 = 0x6176_3031 // AV1

var flvCodecId: UInt8 {
switch self {
case .avc1: 7
case .hvc1, .av01: 12 // Extended codec for Enhanced RTMP
}
}

var isEnhanced: Bool {
self != .avc1
}
}

struct RtmpEnhancedCapabilities {
let supportedFourCCs: [RtmpVideoFourCC]

static func fromConnectResponse(_ response: AsObject) -> RtmpEnhancedCapabilities {
var supported: [RtmpVideoFourCC] = [.avc1]

if let videoFourCcInfoMap = response["videoFourCcInfoMap"] as? AsObject {
if videoFourCcInfoMap["hvc1"] != nil { supported.append(.hvc1) }
if videoFourCcInfoMap["av01"] != nil { supported.append(.av01) }
} else if let videoFourCcInfoMap = response["videoFourCcInfoMap"] as? [String: Any] {
if videoFourCcInfoMap["hvc1"] != nil { supported.append(.hvc1) }
if videoFourCcInfoMap["av01"] != nil { supported.append(.av01) }
}

return RtmpEnhancedCapabilities(supportedFourCCs: supported)
}
}
43 changes: 43 additions & 0 deletions Moblin/Media/HaishinKit/Rtmp/RtmpReconnectPolicy.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import Foundation

struct RtmpReconnectPolicy {
let baseDelay: Double
let maxDelay: Double
let jitterFactor: Double

init(baseDelay: Double = 1.0, maxDelay: Double = 32.0, jitterFactor: Double = 0.15) {
self.baseDelay = baseDelay
self.maxDelay = maxDelay
self.jitterFactor = jitterFactor
}

func delay(forAttempt attempt: Int) -> Double {
let exponentialDelay = baseDelay * pow(2.0, Double(attempt))
let cappedDelay = min(exponentialDelay, maxDelay)

let jitterAmount = cappedDelay * jitterFactor
let randomJitter = Double.random(in: -jitterAmount ... jitterAmount)

return cappedDelay + randomJitter
}

func shouldRetry(forError error: String?) -> Bool {
// Return false for fatal errors like auth failed, no such user, bad stream key
guard let error else { return true }

let fatalKeywords = [
"authmod=adobe",
"?reason=authfailed",
"nosuchuser",
"badstreamkey",
"code=403",
"forbidden",
]

let lowerError = error.lowercased()
for keyword in fatalKeywords where lowerError.contains(keyword) {
return false
}
return true
}
}
51 changes: 51 additions & 0 deletions Moblin/Media/HaishinKit/Rtmp/RtmpSendQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import DequeModule
import Foundation

enum RtmpChunkPriority: Int, Comparable {
case control = 0
case audio = 1
case videoKeyframe = 2
case videoInterframe = 3
case metadata = 4

static func < (lhs: Self, rhs: Self) -> Bool {
lhs.rawValue < rhs.rawValue
}
}

class RtmpSendQueue {
// Index matches RtmpChunkPriority.rawValue
// 0: control, 1: audio, 2: videoKeyframe, 3: videoInterframe, 4: metadata
private var queues: Atomic<[Deque<Data>]> = .init(Array(repeating: Deque<Data>(), count: 5))

func enqueue(_ data: Data, priority: RtmpChunkPriority) {
queues.mutate { q in
q[priority.rawValue].append(data)
}
}

func dequeue() -> Data? {
var data: Data?
queues.mutate { q in
for i in 0 ..< 5 where !q[i].isEmpty {
data = q[i].removeFirst()
break
}
}
return data
}

func dropInterframes() {
queues.mutate { q in
q[RtmpChunkPriority.videoInterframe.rawValue].removeAll()
}
}

func clear() {
queues.mutate { q in
for i in 0 ..< 5 {
q[i].removeAll()
}
}
}
}
Loading
Loading