diff --git a/.gitignore b/.gitignore index 358b118fd..e9b227c54 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,6 @@ test/*.png test/*.dot test/logs __pycache__ + +# Python virtual env +venv/ diff --git a/Moblin/Media/AdaptiveBitrate/AdaptiveBitrate.swift b/Moblin/Media/AdaptiveBitrate/AdaptiveBitrate.swift index 7ec7f3454..baf333274 100644 --- a/Moblin/Media/AdaptiveBitrate/AdaptiveBitrate.swift +++ b/Moblin/Media/AdaptiveBitrate/AdaptiveBitrate.swift @@ -15,6 +15,7 @@ struct StreamStats { let latency: Int32? let mbpsSendRate: Double? let relaxed: Bool? + var sendBufferUtilization: Double? // To not push too high bitrate after static scene. The encoder may output way // lower bitrate than configured. diff --git a/Moblin/Media/AdaptiveBitrate/AdaptiveBitrateRtmp.swift b/Moblin/Media/AdaptiveBitrate/AdaptiveBitrateRtmp.swift new file mode 100644 index 000000000..d0f491a43 --- /dev/null +++ b/Moblin/Media/AdaptiveBitrate/AdaptiveBitrateRtmp.swift @@ -0,0 +1,88 @@ +import Foundation + +class AdaptiveBitrateRtmp: AdaptiveBitrate { + private var currentBitrate: Int64 + private let minBitrate: Int64 + private var maxBitrate: Int64 + + private var lastDropTime: ContinuousClock.Instant = .now + private var lastIncreaseTime: ContinuousClock.Instant = .now + private var lastKeyframeTime: ContinuousClock.Instant = .now + + // Configuração conservadora-média (ajustável) + private let aggressiveDropThreshold = 0.82 // sendBufferUtilization + private let rttWarningThresholdMs = 350.0 + private let recoveryCooldown: Duration = .seconds(2.5) // segundos após drop + private let keyframeProtectionWindow: Duration = .milliseconds(800) // janela de proteção + + init(targetBitrate: UInt32, delegate: any AdaptiveBitrateDelegate) { + maxBitrate = Int64(targetBitrate) + currentBitrate = Int64(targetBitrate) + minBitrate = 800_000 // default min + super.init(delegate: delegate) + } + + override func setTargetBitrate(bitrate: UInt32) { + maxBitrate = Int64(bitrate) + } + + override func getCurrentBitrate() -> UInt32 { + UInt32(currentBitrate) + } + + override func getCurrentMaximumBitrateInKbps() -> Int64 { + maxBitrate / 1000 + } + + func notifyKeyframeSent() { + lastKeyframeTime = .now + } + + override func update(stats: StreamStats) { + super.update(stats: stats) + + let now: ContinuousClock.Instant = .now + let utilization = stats.sendBufferUtilization ?? 0.0 + let rtt = stats.rttMs + let timeSinceLastKeyframe = lastKeyframeTime.duration(to: now) + let isInKeyframeWindow = timeSinceLastKeyframe < keyframeProtectionWindow + + // === DROP AGRESSIVO (Congestionamento) === + if utilization > aggressiveDropThreshold || rtt > rttWarningThresholdMs, !isInKeyframeWindow { + let reductionFactor = utilization > 0.92 ? 0.55 : 0.68 + let newBitrate = Int64(Double(currentBitrate) * reductionFactor) + + if newBitrate < currentBitrate && lastDropTime.duration(to: now) > .seconds(1.0) { + currentBitrate = max(minBitrate, newBitrate) + lastDropTime = now + delegate?.adaptiveBitrateSetVideoStreamBitrate(bitrate: UInt32(currentBitrate)) + let action = "Decrease (utilization: \(String(format: "%.2f", utilization))) [protected keyframe]" + logAdaptiveAcion(actionTaken: action) + return + } + } + + // === RECUPERAÇÃO LENTA === + let isLowPressure = utilization < 0.45 && rtt < 180.0 + let isCooldownOver = lastDropTime.duration(to: now) > recoveryCooldown + let isIncreaseCooldownOver = lastIncreaseTime.duration(to: now) > .seconds(2.0) + + if isLowPressure, isCooldownOver, isIncreaseCooldownOver { + let increase = Int64(Double(currentBitrate) * 0.09) // +9% por passo + let newBitrate = min(maxBitrate, currentBitrate + increase) + + if newBitrate > currentBitrate { + currentBitrate = newBitrate + lastIncreaseTime = now + delegate?.adaptiveBitrateSetVideoStreamBitrate(bitrate: UInt32(currentBitrate)) + let action = "Increase (utilization: \(String(format: "%.2f", utilization)))" + logAdaptiveAcion(actionTaken: action) + } + } + } + + func reset() { + lastDropTime = .now + lastIncreaseTime = .now + } +} diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpChunkSerializer.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpChunkSerializer.swift new file mode 100644 index 000000000..c399ef2d1 --- /dev/null +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpChunkSerializer.swift @@ -0,0 +1,66 @@ +import Foundation + +class RtmpChunkSerializer { + func serialize(chunk: RtmpChunk, maximumChunkSize: Int) -> [Data] { + guard let message = chunk.message else { + return [chunk.encode()] + } + + let payload = message.encoded + let headerType3 = RtmpChunkType.three.toBasicHeader(chunk.chunkStreamId) + + let writer = ByteWriter() + writer.writeBytes(chunk.type.toBasicHeader(chunk.chunkStreamId)) + + if message.timestamp > 0xFFFFFF { + writer.writeUInt24(0xFFFFFF) + } else { + writer.writeUInt24(message.timestamp) + } + writer.writeUInt24(UInt32(payload.count)) + writer.writeUInt8(message.type.rawValue) + if chunk.type == .zero { + writer.writeUInt32Le(message.streamId) + } + if message.timestamp > 0xFFFFFF { + writer.writeUInt32(message.timestamp) + } + + let firstHeader = writer.data + + // Calculate exact total size to prevent reallocation + let chunkCount = payload.isEmpty ? 1 : Int(ceil(Double(payload.count) / Double(maximumChunkSize))) + let totalSize = firstHeader.count + payload + .count + (chunkCount > 1 ? (chunkCount - 1) * headerType3.count : 0) + + var buffer = Data(capacity: totalSize) + var results: [Data] = [] + var offset = 0 + + if payload.isEmpty { + buffer.append(firstHeader) + results.append(buffer) + return results + } + + while offset < payload.count { + let chunkSize = min(maximumChunkSize, payload.count - offset) + let isFirst = offset == 0 + + let header = isFirst ? firstHeader : headerType3 + + let startIdx = buffer.count + buffer.append(header) + // Use subdata bounds to prevent inline copying overhead + let payloadSlice = payload.subdata(in: offset ..< offset + chunkSize) + buffer.append(payloadSlice) + let endIdx = buffer.count + + // subdata creates a shared view into the contiguous buffer, zero-copy at Swift level + results.append(buffer.subdata(in: startIdx ..< endIdx)) + + offset += chunkSize + } + return results + } +} diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpConnection.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpConnection.swift index cc5cf1435..50263903b 100644 --- a/Moblin/Media/HaishinKit/Rtmp/RtmpConnection.swift +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpConnection.swift @@ -127,7 +127,7 @@ class RtmpConnection: @unchecked Sendable { } switch RtmpConnectionCode(rawValue: code) { case .connectSuccess: - handleConnectSuccess() + handleConnectSuccess(data: data) case .connectRejected: handleConnectRejected(data: data) case .connectClosed: @@ -137,8 +137,13 @@ class RtmpConnection: @unchecked Sendable { } } - private func handleConnectSuccess() { - socket.maximumChunkSizeToServer = 1024 * 8 + private func handleConnectSuccess(data: AsObject) { + stream?.enhancedCapabilities = RtmpEnhancedCapabilities.fromConnectResponse(data) + + let targetBitrate = stream?.info.bitrateStats.value.latestSpeed ?? 0 + // Reduce chunk size to 32KB / 64KB max to prevent HOL blocking on bad mobile networks + let chunkSize = targetBitrate > 4_000_000 ? 1024 * 64 : 1024 * 32 + socket.maximumChunkSizeToServer = chunkSize _ = socket.write(chunk: RtmpChunk( type: .zero, chunkStreamId: RtmpChunk.ChunkStreamId.control.rawValue, @@ -200,6 +205,10 @@ class RtmpConnection: @unchecked Sendable { "videoFunction": .number(Double(VideoFunction.clientSeek.rawValue)), "pageUrl": .null, "objectEncoding": .number(0), + "videoFourCcInfoMap": .object([ + "hvc1": .object(["codecHeaderType": .string("sequence")]), + "av01": .object(["codecHeaderType": .string("sequence")]), + ]), ], arguments: [] ) @@ -298,6 +307,11 @@ extension RtmpConnection: RtmpSocketDelegate { stream?.info.onWritten(sequence: totalBytesSent) } + func socketGetCurrentBitrate() -> UInt32 { + let latestBytesPerSecond = stream?.info.bitrateStats.value.latestSpeed ?? 62500 + return UInt32(latestBytesPerSecond * 8) + } + func socketDataReceived(data: Data) -> Data { guard let chunk = currentChunk ?? RtmpChunk(data: data, size: socket.maximumChunkSizeFromServer) else { diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpEnhancedVideo.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpEnhancedVideo.swift new file mode 100644 index 000000000..ac385e94f --- /dev/null +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpEnhancedVideo.swift @@ -0,0 +1,38 @@ +import AVFoundation +import CoreMedia +import Foundation + +enum RtmpVideoFourCC: UInt32 { + case avc1 = 0x6176_6331 // H.264 + case hvc1 = 0x6876_6331 // HEVC + case av01 = 0x6176_3031 // AV1 + + var flvCodecId: UInt8 { + switch self { + case .avc1: 7 + case .hvc1, .av01: 12 // Extended codec for Enhanced RTMP + } + } + + var isEnhanced: Bool { + self != .avc1 + } +} + +struct RtmpEnhancedCapabilities { + let supportedFourCCs: [RtmpVideoFourCC] + + static func fromConnectResponse(_ response: AsObject) -> RtmpEnhancedCapabilities { + var supported: [RtmpVideoFourCC] = [.avc1] + + if let videoFourCcInfoMap = response["videoFourCcInfoMap"] as? AsObject { + if videoFourCcInfoMap["hvc1"] != nil { supported.append(.hvc1) } + if videoFourCcInfoMap["av01"] != nil { supported.append(.av01) } + } else if let videoFourCcInfoMap = response["videoFourCcInfoMap"] as? [String: Any] { + if videoFourCcInfoMap["hvc1"] != nil { supported.append(.hvc1) } + if videoFourCcInfoMap["av01"] != nil { supported.append(.av01) } + } + + return RtmpEnhancedCapabilities(supportedFourCCs: supported) + } +} diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpReconnectPolicy.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpReconnectPolicy.swift new file mode 100644 index 000000000..dce5ab0bf --- /dev/null +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpReconnectPolicy.swift @@ -0,0 +1,43 @@ +import Foundation + +struct RtmpReconnectPolicy { + let baseDelay: Double + let maxDelay: Double + let jitterFactor: Double + + init(baseDelay: Double = 1.0, maxDelay: Double = 32.0, jitterFactor: Double = 0.15) { + self.baseDelay = baseDelay + self.maxDelay = maxDelay + self.jitterFactor = jitterFactor + } + + func delay(forAttempt attempt: Int) -> Double { + let exponentialDelay = baseDelay * pow(2.0, Double(attempt)) + let cappedDelay = min(exponentialDelay, maxDelay) + + let jitterAmount = cappedDelay * jitterFactor + let randomJitter = Double.random(in: -jitterAmount ... jitterAmount) + + return cappedDelay + randomJitter + } + + func shouldRetry(forError error: String?) -> Bool { + // Return false for fatal errors like auth failed, no such user, bad stream key + guard let error else { return true } + + let fatalKeywords = [ + "authmod=adobe", + "?reason=authfailed", + "nosuchuser", + "badstreamkey", + "code=403", + "forbidden", + ] + + let lowerError = error.lowercased() + for keyword in fatalKeywords where lowerError.contains(keyword) { + return false + } + return true + } +} diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpSendQueue.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpSendQueue.swift new file mode 100644 index 000000000..1b82fd180 --- /dev/null +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpSendQueue.swift @@ -0,0 +1,51 @@ +import DequeModule +import Foundation + +enum RtmpChunkPriority: Int, Comparable { + case control = 0 + case audio = 1 + case videoKeyframe = 2 + case videoInterframe = 3 + case metadata = 4 + + static func < (lhs: Self, rhs: Self) -> Bool { + lhs.rawValue < rhs.rawValue + } +} + +class RtmpSendQueue { + // Index matches RtmpChunkPriority.rawValue + // 0: control, 1: audio, 2: videoKeyframe, 3: videoInterframe, 4: metadata + private var queues: Atomic<[Deque]> = .init(Array(repeating: Deque(), count: 5)) + + func enqueue(_ data: Data, priority: RtmpChunkPriority) { + queues.mutate { q in + q[priority.rawValue].append(data) + } + } + + func dequeue() -> Data? { + var data: Data? + queues.mutate { q in + for i in 0 ..< 5 where !q[i].isEmpty { + data = q[i].removeFirst() + break + } + } + return data + } + + func dropInterframes() { + queues.mutate { q in + q[RtmpChunkPriority.videoInterframe.rawValue].removeAll() + } + } + + func clear() { + queues.mutate { q in + for i in 0 ..< 5 { + q[i].removeAll() + } + } + } +} diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpSocket.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpSocket.swift index 2852bca9f..6e5c296ee 100644 --- a/Moblin/Media/HaishinKit/Rtmp/RtmpSocket.swift +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpSocket.swift @@ -14,6 +14,7 @@ protocol RtmpSocketDelegate: AnyObject { func socketReadyStateChanged(readyState: RtmpSocketReadyState) func socketUpdateStats(totalBytesSent: Int64) func socketPost(data: AsObject) + func socketGetCurrentBitrate() -> UInt32 } final class RtmpSocket: @unchecked Sendable { @@ -22,11 +23,13 @@ final class RtmpSocket: @unchecked Sendable { private var readyState: RtmpSocketReadyState = .uninitialized private var inputBuffer = Data() weak var delegate: (any RtmpSocketDelegate)? - private var totalBytesSending: Int64 = 0 - private var totalBytesSent: Int64 = 0 + private(set) var totalBytesSending: Int64 = 0 + private(set) var totalBytesSent: Int64 = 0 private let name: String private var connection: NWConnection? private let queue: DispatchQueue + private let sendQueue = RtmpSendQueue() + private var isFlushing = Atomic(false) init(name: String, queue: DispatchQueue) { self.name = name @@ -39,10 +42,15 @@ final class RtmpSocket: @unchecked Sendable { maximumChunkSizeFromServer = RtmpChunk.defaultSize totalBytesSending = 0 totalBytesSent = 0 + sendQueue.clear() + isFlushing.mutate { $0 = false } inputBuffer.removeAll(keepingCapacity: false) + let tcpOptions = NWProtocolTCP.Options() + tcpOptions.noDelay = true + let parameters = NWParameters(tls: tlsOptions, tcp: tcpOptions) connection = NWConnection( to: .hostPort(host: .init(host), port: .init(integer: port)), - using: .init(tls: tlsOptions) + using: parameters ) connection!.viabilityUpdateHandler = viabilityDidChange connection!.stateUpdateHandler = stateDidChange @@ -71,13 +79,85 @@ final class RtmpSocket: @unchecked Sendable { } } + private let serializer = RtmpChunkSerializer() + + private var isDropModeActive = false + + /// Heuristic estimate of outbound send pressure. + /// + /// Computed as: + /// bytesSubmittedToNWConnection - bytesCompletedByContentProcessed + /// + /// This is NOT the actual TCP congestion window, + /// kernel socket send buffer occupancy, + /// or TCP retransmission queue. + /// + /// It is an application-level estimate useful for + /// congestion detection and adaptive bitrate decisions. + func estimatedSendPressure() -> Double { + let expectedBufferedSeconds = 0.6 + let minimumBacklogBytes = 300_000.0 // Evita sensibilidade extrema em bitrates baixos + let recentSendRateBytesPerSecond = Double(delegate?.socketGetCurrentBitrate() ?? 500_000) / 8.0 + let maxBacklog = max(minimumBacklogBytes, recentSendRateBytesPerSecond * expectedBufferedSeconds) + let bufferedBytes = Double(max(0, totalBytesSending - totalBytesSent)) + return min(1.0, bufferedBytes / maxBacklog) + } + func write(chunk: RtmpChunk) -> Int { - for data in chunk.split(maximumSize: maximumChunkSizeToServer) { - write(data: data) + let pressure = estimatedSendPressure() + + if isDropModeActive { + if pressure < 0.65 { + isDropModeActive = false + } + } else { + if pressure > 0.85 { + isDropModeActive = true + sendQueue.dropInterframes() // Limpa o atrasado na transição + // requestKeyframe() can be called here if needed + } + } + + let priority = priorityFor(chunk: chunk) + + // Bloqueia a entrada de novos P-Frames enquanto a congestão persistir + if isDropModeActive, priority == .videoInterframe { + return chunk.message!.length + } + + let serializedChunks = serializer.serialize( + chunk: chunk, + maximumChunkSize: maximumChunkSizeToServer + ) + for data in serializedChunks { + sendQueue.enqueue(data, priority: priority) } + flush() return chunk.message!.length } + private func priorityFor(chunk: RtmpChunk) -> RtmpChunkPriority { + if chunk.type == .zero || chunk.chunkStreamId == RtmpChunk.ChunkStreamId.control.rawValue { + return .control + } + if chunk.chunkStreamId == FlvTagType.audio.streamId { + return .audio + } + if chunk.chunkStreamId == FlvTagType.video.streamId { + if let payload = chunk.message?.encoded, !payload.isEmpty { + let firstByte = payload[0] + let isExtended = (firstByte & 0b1000_0000) != 0 + let frameType = isExtended ? ((firstByte & 0b0111_0000) >> 4) : (firstByte >> 4) + if frameType == FlvFrameType.key.rawValue { + return .videoKeyframe + } + return .videoInterframe + } + return .videoKeyframe + } + return .metadata + } + private func setReadyState(state: RtmpSocketReadyState) { guard readyState != state else { return @@ -87,26 +167,57 @@ final class RtmpSocket: @unchecked Sendable { delegate?.socketReadyStateChanged(readyState: readyState) } - private func write(data: Data) { + private func flush() { + var shouldFlush = false + isFlushing.mutate { + if !$0 { + $0 = true + shouldFlush = true + } + } + guard shouldFlush else { return } + + queue.async { [weak self] in + self?.performFlush() + } + } + + private func performFlush() { + guard let data = sendQueue.dequeue() else { + isFlushing.mutate { $0 = false } + return + } + let size = Int64(data.count) totalBytesSending += size + connection?.send(content: data, completion: .contentProcessed { [weak self] error in - guard let self else { - return - } + guard let self else { return } if error != nil { close(isDisconnected: true) return } + totalBytesSent += size - }) - delegate?.socketUpdateStats(totalBytesSent: totalBytesSending) - if hasTooMuchDataBuffered() { - logger.info("rtmp: \(name): Too much data buffered. Disconnecting.") + delegate?.socketUpdateStats(totalBytesSent: totalBytesSending) + + if hasTooMuchDataBuffered() { + logger.info("rtmp: \(name): Too much data buffered. Disconnecting.") + queue.async { + self.close(isDisconnected: true) + } + return + } + queue.async { - self.close(isDisconnected: true) + self.performFlush() } - } + }) + } + + private func write(data: Data) { + sendQueue.enqueue(data, priority: .control) + flush() } private func hasTooMuchDataBuffered() -> Bool { diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpStream.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpStream.swift index 94a85585c..acc060c5a 100644 --- a/Moblin/Media/HaishinKit/Rtmp/RtmpStream.swift +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpStream.swift @@ -58,11 +58,18 @@ private enum State { case publishing } +import Combine + class RtmpStream: @unchecked Sendable { + @Published var currentMetrics: RtmpStreamMetrics? + private var metricsTimer: SimpleTimer? let info = RtmpStreamInfo() + var onKeyframeSent: (() -> Void)? var streamId: UInt32 = 0 private var state: State = .initialized private var startedAt = Date() + private var reconnectAttempts = 0 + private let reconnectPolicy = RtmpReconnectPolicy() private var audioChunkType: RtmpChunkType = .zero private var videoChunkType: RtmpChunkType = .zero private var dataTimeStamps: [String: Date] = [:] @@ -79,6 +86,9 @@ class RtmpStream: @unchecked Sendable { private var videoTimeStampDelta = 0.0 private var prevRebasedAudioTimeStamp: Double? private var prevRebasedVideoTimeStamp: Double? + private var lastSessionTimestampOffset: UInt32 = 0 + private var lastSentTimestampInSession: UInt32 = 0 + var enhancedCapabilities = RtmpEnhancedCapabilities(supportedFourCCs: [.avc1]) private let processor: Processor weak var delegate: (any RtmpStreamDelegate)? @@ -88,6 +98,7 @@ class RtmpStream: @unchecked Sendable { self.delegate = delegate self.queue = queue connectTimer = SimpleTimer(queue: queue) + metricsTimer = SimpleTimer(queue: queue) connection = RtmpConnection(name: name, queue: queue) connection.stream = self } @@ -105,12 +116,25 @@ class RtmpStream: @unchecked Sendable { func disconnect() { queue.async { + self.reconnectAttempts = 0 + self.lastSessionTimestampOffset = 0 + self.lastSentTimestampInSession = 0 self.disconnectInternal() } } - func reconnectSoon() { - queue.asyncAfter(deadline: .now() + 5) { [weak self] in + func reconnectSoon(errorDescription: String? = nil) { + guard reconnectPolicy.shouldRetry(forError: errorDescription) else { + logger.info("rtmp: \(name): Fatal error '\(errorDescription ?? "")', stopping reconnect.") + closeInternal() + return + } + + let delay = reconnectPolicy.delay(forAttempt: reconnectAttempts) + reconnectAttempts += 1 + + logger.info("rtmp: \(name): Reconnecting in \(delay) seconds (attempt \(reconnectAttempts))") + queue.asyncAfter(deadline: .now() + delay) { [weak self] in self?.connectInternal() } } @@ -122,9 +146,33 @@ class RtmpStream: @unchecked Sendable { func closeInternal() { setState(state: .initialized) stopConnectTimer() + metricsTimer?.stop() processor.stopEncoding(self) } + func getMetrics() -> RtmpStreamMetrics { + let stats = info.stats.value + let bitrateStats = info.bitrateStats.value + + let totalSent = connection.socket.totalBytesSent + let totalSending = connection.socket.totalBytesSending + + let utilization = connection.socket.estimatedSendPressure() + + return RtmpStreamMetrics( + timestamp: .now, + instantBitrate: Int(bitrateStats.latestSpeed) * 8, // bps + bytesSentTotal: UInt64(totalSent), + estimatedRttMs: stats.rttMs > 0 ? stats.rttMs : nil, + sendBufferUtilization: utilization, + currentChunkSize: connection.socket.maximumChunkSizeToServer, + reconnectAttempts: reconnectAttempts, + videoTimestampDrift: 0, // Not easily exposed right now + audioTimestampDrift: 0, + lastReconnectReason: nil // Can populate later if we store it + ) + } + func onInternal(data: AsObject) { guard case let .string(code) = data["code"] else { return @@ -132,6 +180,7 @@ class RtmpStream: @unchecked Sendable { delegate?.rtmpStreamStatus(self, code: code) switch code { case RtmpConnectionCode.connectSuccess.rawValue: + reconnectAttempts = 0 setState(state: .initialized) sendReleaseStream() sendFCPublish() @@ -139,12 +188,20 @@ class RtmpStream: @unchecked Sendable { case RtmpStreamCode.publishStart.rawValue: if state != .initialized { setState(state: .publishing) + startMetricsTimer() } default: break } } + private func startMetricsTimer() { + metricsTimer?.startPeriodic(interval: 1.0) { [weak self] in + guard let self else { return } + currentMetrics = getMetrics() + } + } + private func setState(state: State) { guard self.state != state else { return @@ -157,6 +214,8 @@ class RtmpStream: @unchecked Sendable { sendDeleteStream() sendCloseStream() processor.stopEncoding(self) + lastSessionTimestampOffset += lastSentTimestampInSession + 100 + lastSentTimestampInSession = 0 } switch state { case .open: @@ -326,10 +385,12 @@ class RtmpStream: @unchecked Sendable { guard state == .publishing else { return } + lastSentTimestampInSession = max(lastSentTimestampInSession, timestamp) + let adjustedTimestamp = timestamp + lastSessionTimestampOffset let length = connection.socket.write(chunk: RtmpChunk( type: audioChunkType, chunkStreamId: FlvTagType.audio.streamId, - message: RtmpAudioMessage(streamId: streamId, timestamp: timestamp, payload: buffer) + message: RtmpAudioMessage(streamId: streamId, timestamp: adjustedTimestamp, payload: buffer) )) audioChunkType = .one info.bitrateStats.mutate { $0.add(bytesTransferred: length) } @@ -339,10 +400,12 @@ class RtmpStream: @unchecked Sendable { guard state == .publishing else { return } + lastSentTimestampInSession = max(lastSentTimestampInSession, timestamp) + let adjustedTimestamp = timestamp + lastSessionTimestampOffset let length = connection.socket.write(chunk: RtmpChunk( type: videoChunkType, chunkStreamId: FlvTagType.video.streamId, - message: RtmpVideoMessage(streamId: streamId, timestamp: timestamp, payload: buffer) + message: RtmpVideoMessage(streamId: streamId, timestamp: adjustedTimestamp, payload: buffer) )) videoChunkType = .one info.bitrateStats.mutate { $0.add(bytesTransferred: length) } @@ -448,7 +511,11 @@ class RtmpStream: @unchecked Sendable { return } var buffer: Data - let frameType = sampleBuffer.getIsSync() ? FlvFrameType.key : FlvFrameType.inter + let isKeyframe = sampleBuffer.getIsSync() + if isKeyframe { + onKeyframeSent?() + } + let frameType = isKeyframe ? FlvFrameType.key : FlvFrameType.inter switch format { case .h264: buffer = makeAvcVideoTagHeader(frameType, .nal) diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpStreamInfo.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpStreamInfo.swift index be358bf0b..d996b25b4 100644 --- a/Moblin/Media/HaishinKit/Rtmp/RtmpStreamInfo.swift +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpStreamInfo.swift @@ -33,10 +33,11 @@ class RtmpStreamInfo { func onWritten(sequence: Int64) { latestWrittenSequence = sequence - // Just for safety - if sendTimings.count < 500 { - sendTimings.append(SendTiming(timestamp: .now, sequence: sequence)) + // Keep a rolling window instead of dropping new timings completely + if sendTimings.count >= 5000 { + sendTimings.removeFirst() } + sendTimings.append(SendTiming(timestamp: .now, sequence: sequence)) let packetsInFlight = packetsInFlight() stats.mutate { $0.packetsInFlight = packetsInFlight diff --git a/Moblin/Media/HaishinKit/Rtmp/RtmpStreamMetrics.swift b/Moblin/Media/HaishinKit/Rtmp/RtmpStreamMetrics.swift new file mode 100644 index 000000000..f4223647f --- /dev/null +++ b/Moblin/Media/HaishinKit/Rtmp/RtmpStreamMetrics.swift @@ -0,0 +1,23 @@ +import Foundation + +struct RtmpStreamMetrics: Codable { + let timestamp: Date + let instantBitrate: Int // bps calculado nos últimos 1-2s + let bytesSentTotal: UInt64 + let estimatedRttMs: Double? // baseado em ACK timing + let sendBufferUtilization: Double // 0.0 a 1.0 + let currentChunkSize: Int + let reconnectAttempts: Int + let videoTimestampDrift: TimeInterval + let audioTimestampDrift: TimeInterval + let lastReconnectReason: String? + + // Helper para UI + var healthScore: Double { + // 0.0 = péssimo, 1.0 = excelente + let rttFactor = min(1.0, 200.0 / (estimatedRttMs ?? 150.0)) + let bufferFactor = max(0.0, 1.0 - sendBufferUtilization) + let reconnectFactor = max(0.0, 1.0 - Double(reconnectAttempts) * 0.05) + return min(1.0, max(0.0, rttFactor * 0.4 + bufferFactor * 0.4 + reconnectFactor * 0.2)) + } +} diff --git a/Moblin/Media/RtmpServer/RtmpServer.swift b/Moblin/Media/RtmpServer/RtmpServer.swift index 2030a43ee..8d7422592 100644 --- a/Moblin/Media/RtmpServer/RtmpServer.swift +++ b/Moblin/Media/RtmpServer/RtmpServer.swift @@ -75,7 +75,7 @@ class RtmpServer: @unchecked Sendable { private func setupListener() { let options = NWProtocolTCP.Options() - // options.noDelay = true + options.noDelay = true let parameters = NWParameters(tls: nil, tcp: options) parameters.requiredLocalEndpoint = .hostPort( host: .ipv4(.any), diff --git a/Moblin/Media/RtmpServer/RtmpServerClient.swift b/Moblin/Media/RtmpServer/RtmpServerClient.swift index 0ac85a433..204e32257 100644 --- a/Moblin/Media/RtmpServer/RtmpServerClient.swift +++ b/Moblin/Media/RtmpServer/RtmpServerClient.swift @@ -14,6 +14,7 @@ private enum ClientState { private enum ChunkState { case basicHeaderFirstByte + case basicHeaderRemainingBytes(format: UInt8, firstByte: UInt8) case messageHeaderType0 case messageHeaderType1 case messageHeaderType2 @@ -162,6 +163,8 @@ class RtmpServerClient: @unchecked Sendable { switch chunkState { case .basicHeaderFirstByte: handleDataHandshakeDoneBasicHeaderFirstByte(data: data) + case let .basicHeaderRemainingBytes(format, firstByte): + handleBasicHeaderRemainingBytes(format: format, firstByte: firstByte, data: data) case .messageHeaderType0: handleDataHandshakeDoneMessageHeaderType0(data: data) case .messageHeaderType1: @@ -191,21 +194,46 @@ class RtmpServerClient: @unchecked Sendable { } let firstByte = data[0] let format = firstByte >> 6 - let chunkStreamId = UInt16(firstByte & 0x3F) - switch chunkStreamId { - case 0: - stopInternal(reason: "Two bytes basic header is not implemented") + let chunkStreamId = firstByte & 0x3F + if chunkStreamId == 0 { + chunkState = .basicHeaderRemainingBytes(format: format, firstByte: firstByte) + receiveData(size: 1) return - case 1: - stopInternal(reason: "Three bytes basic header is not implemented") + } else if chunkStreamId == 1 { + chunkState = .basicHeaderRemainingBytes(format: format, firstByte: firstByte) + receiveData(size: 2) return - default: - break } - if chunkStreams[chunkStreamId] == nil { - chunkStreams[chunkStreamId] = RtmpServerChunkStream(client: self, streamId: chunkStreamId) + setupChunkStream(streamId: UInt16(chunkStreamId), format: format) + } + + private func handleBasicHeaderRemainingBytes(format: UInt8, firstByte: UInt8, data: Data) { + let chunkStreamId = firstByte & 0x3F + let streamId: UInt16 + if chunkStreamId == 0 { + guard data.count == 1 else { + stopInternal(reason: "Wrong length \(data.count) in 2-byte basic header") + return + } + streamId = UInt16(data[0]) + 64 + } else if chunkStreamId == 1 { + guard data.count == 2 else { + stopInternal(reason: "Wrong length \(data.count) in 3-byte basic header") + return + } + streamId = UInt16(data[0]) * 256 + UInt16(data[1]) + 64 // Big endian decoding + } else { + stopInternal(reason: "Invalid state in remaining basic header bytes") + return + } + setupChunkStream(streamId: streamId, format: format) + } + + private func setupChunkStream(streamId: UInt16, format: UInt8) { + if chunkStreams[streamId] == nil { + chunkStreams[streamId] = RtmpServerChunkStream(client: self, streamId: streamId) } - chunkStream = chunkStreams[chunkStreamId] + chunkStream = chunkStreams[streamId] // logger.info("rtmp-server: \(chunkStreamId): Chunk message header format: \(format)") switch format { case 0: diff --git a/Moblin/Various/Media.swift b/Moblin/Various/Media.swift index 5af7ef5b8..bea1e41af 100644 --- a/Moblin/Various/Media.swift +++ b/Moblin/Various/Media.swift @@ -439,7 +439,8 @@ final class Media: NSObject, @unchecked Sendable { transportBitrate: streamTransportBitrate(), latency: nil, mbpsSendRate: nil, - relaxed: nil + relaxed: nil, + sendBufferUtilization: rtmpStream.currentMetrics?.sendBufferUtilization )) guard overlay else { return nil @@ -587,16 +588,19 @@ final class Media: NSObject, @unchecked Sendable { targetBitrate: UInt32, adaptiveBitrate adaptiveBitrateEnabled: Bool) { + let abrRtmp: AdaptiveBitrateRtmp? if adaptiveBitrateEnabled { - adaptiveBitrate = AdaptiveBitrateSrtFight(targetBitrate: targetBitrate, - delegate: self, - rttMax: 500, - pifMax: 100) + abrRtmp = AdaptiveBitrateRtmp(targetBitrate: targetBitrate, delegate: self) + adaptiveBitrate = abrRtmp } else { + abrRtmp = nil adaptiveBitrate = nil } rtmpStream?.setUrl(url) for rtmpStream in rtmpStreams { + rtmpStream.onKeyframeSent = { [weak abrRtmp] in + abrRtmp?.notifyKeyframeSent() + } rtmpStream.connect() } } diff --git a/MoblinTests/AdaptiveBitrateRtmpSuite.swift b/MoblinTests/AdaptiveBitrateRtmpSuite.swift new file mode 100644 index 000000000..fe4db1299 --- /dev/null +++ b/MoblinTests/AdaptiveBitrateRtmpSuite.swift @@ -0,0 +1,62 @@ +import Collections +import Foundation +@testable import Moblin +import Testing + +private class MockAdaptiveBitrateDelegate: AdaptiveBitrateDelegate { + var bitrates: Deque = [] + + func adaptiveBitrateSetVideoStreamBitrate(bitrate: UInt32) { + bitrates.append(bitrate) + } +} + +struct AdaptiveBitrateRtmpSuite { + @Test + func dropsBitrateOnHighBufferUtilization() { + let delegate = MockAdaptiveBitrateDelegate() + let abr = AdaptiveBitrateRtmp(targetBitrate: 6_000_000, delegate: delegate) + + let stats = StreamStats( + rttMs: 120, + packetsInFlight: 0, + transportBitrate: nil, + latency: nil, + mbpsSendRate: nil, + relaxed: nil, + sendBufferUtilization: 0.91 // Acima do threshold 0.82 + ) + + abr.update(stats: stats) + + guard let last = delegate.bitrates.last else { + Issue.record("Deveria ter disparado o delegate") + return + } + #expect(last < 6_000_000, "Deveria ter reduzido o bitrate") + } + + @Test + func respectsKeyframeProtectionWindow() { + let delegate = MockAdaptiveBitrateDelegate() + let abr = AdaptiveBitrateRtmp(targetBitrate: 5_000_000, delegate: delegate) + + // Simula keyframe recente + abr.notifyKeyframeSent() + + let stats = StreamStats( + rttMs: 120, + packetsInFlight: 0, + transportBitrate: nil, + latency: nil, + mbpsSendRate: nil, + relaxed: nil, + sendBufferUtilization: 0.95 // Congestionamento forte + ) + + abr.update(stats: stats) + + // Não deve ter droppado por causa da proteção + #expect(delegate.bitrates.isEmpty, "Não deve dropar bitrate dentro da janela de keyframe") + } +} diff --git a/MoblinTests/RtmpReconnectPolicySuite.swift b/MoblinTests/RtmpReconnectPolicySuite.swift new file mode 100644 index 000000000..a0b70254d --- /dev/null +++ b/MoblinTests/RtmpReconnectPolicySuite.swift @@ -0,0 +1,30 @@ +import AVFoundation +@testable import Moblin +import Testing + +struct RtmpReconnectPolicySuite { + @Test + func appliesExponentialBackoffWithJitter() { + let policy = RtmpReconnectPolicy() + + let delay1 = policy.delay(forAttempt: 0) + let delay3 = policy.delay(forAttempt: 3) + let delay5 = policy.delay(forAttempt: 5) + + #expect(delay3 > delay1) + #expect(delay5 > delay3) + // O delay para a tentativa 5 (baseando-se no cap de 32.0 + jitter max) + // 1 * 2^5 = 32.0. Com jitter de 15% (32 * 0.15 = 4.8), o maximo é 36.8 + #expect(delay5 <= 36.8) + } + + @Test + func rejectsFatalAuthErrors() { + let policy = RtmpReconnectPolicy() + + #expect(policy.shouldRetry(forError: "badstreamkey") == false) + #expect(policy.shouldRetry(forError: "authmod=adobe") == false) + #expect(policy.shouldRetry(forError: "code=403") == false) + #expect(policy.shouldRetry(forError: "temporary network error") == true) + } +}