Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message ControlReturn {
WorkerStateResponse workerStateResponse = 50;
WorkerMetricsResponse workerMetricsResponse = 51;
FinalizeCheckpointResponse finalizeCheckpointResponse = 52;
FlowControlUsageResponse flowControlUsageResponse = 53;

// common responses
ControlError controlError = 101;
Expand Down Expand Up @@ -138,4 +139,8 @@ message WorkerStateResponse {

message WorkerMetricsResponse {
worker.WorkerMetrics metrics = 1 [(scalapb.field).no_box = true];
}
}

message FlowControlUsageResponse {
map<string, int64> channel_usage_bytes = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ service WorkerService {
rpc OpenExecutor(EmptyRequest) returns (EmptyReturn);
rpc PauseWorker(EmptyRequest) returns (WorkerStateResponse);
rpc PrepareCheckpoint(PrepareCheckpointRequest) returns (EmptyReturn);
rpc QueryFlowControlUsage(EmptyRequest) returns (FlowControlUsageResponse);
rpc QueryStatistics(EmptyRequest) returns (WorkerMetricsResponse);
rpc ResumeWorker(EmptyRequest) returns (WorkerStateResponse);
rpc RetrieveState(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message WorkerStatistics {
int64 data_processing_time = 3;
int64 control_processing_time = 4;
int64 idle_time = 5;
map<string, int64> channel_usage_bytes = 6;
}

message WorkerMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,20 @@ message OperatorMetrics{
OperatorStatistics operator_statistics = 2 [(scalapb.field).no_box = true];
}

message EdgeStatistics{
string from_op_id = 1;
int32 from_port_id = 2;
string to_op_id = 3;
int32 to_port_id = 4;
int64 usage_bytes = 5;
}

message ExecutionStatsStore {
int64 startTimeStamp = 1;
int64 endTimeStamp = 2;
map<string, OperatorMetrics> operator_info = 3;
repeated OperatorWorkerMapping operator_worker_mapping = 4;
repeated EdgeStatistics edge_info = 5;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

from collections import defaultdict
from typing import DefaultDict
from typing import DefaultDict, Optional

from proto.org.apache.texera.amber.core import PortIdentity
from proto.org.apache.texera.amber.engine.architecture.worker import (
Expand All @@ -40,7 +40,11 @@ def __init__(self) -> None:
self._total_execution_time: int = 0
self._worker_start_time: int = 0

def get_statistics(self) -> WorkerStatistics:
def get_statistics(
self, channel_usage_bytes: Optional[dict[str, int]] = None
) -> WorkerStatistics:
if channel_usage_bytes is None:
channel_usage_bytes = {}
# Compile and return worker statistics
return WorkerStatistics(
[
Expand All @@ -56,6 +60,7 @@ def get_statistics(self) -> WorkerStatistics:
self._total_execution_time
- self._data_processing_time
- self._control_processing_time,
channel_usage_bytes,
)

def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,34 @@
T = TypeVar("T")


def _estimate_in_mem_size(item: T) -> int:
"""
Estimate in-memory bytes for queue accounting.
Prefer payload/frame byte size when available; otherwise fall back to object size.
"""
if item is None:
return 0

payload = getattr(item, "payload", None)
frame = getattr(payload, "frame", None) if payload is not None else None
if frame is not None:
if hasattr(frame, "nbytes"):
return int(frame.nbytes)
if hasattr(frame, "to_table"):
table = frame.to_table()
if hasattr(table, "nbytes"):
return int(table.nbytes)

return sys.getsizeof(item)


class LinkedBlockingMultiQueue(IKeyedQueue):
@inner
class Node(Generic[T]):
def __init__(self, item: T):
self.item = item
self.next: Optional[LinkedBlockingMultiQueue.Node[T]] = None
self.in_mem_size = sys.getsizeof(item)
self.in_mem_size = _estimate_in_mem_size(item)

@inner
class SubQueue(Generic[T]):
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class AkkaMessageTransferService(
private def checkCreditPolling(): Unit = {
channelToFC.foreach {
case (channel, fc) =>
if (fc.isOverloaded) {
if (fc.isOverloaded || fc.getQueuedBytes > 0) {
refService.askForCredit(channel)
}
}
Expand Down Expand Up @@ -164,6 +164,14 @@ class AkkaMessageTransferService(
handleBackpressure(backpressured)
}

def getChannelUsageBytes: Map[String, Long] = {
channelToFC.map {
case (channelId, flowControl) =>
val key = java.util.Base64.getEncoder.encodeToString(channelId.toByteArray)
key -> flowControl.getUsedBytes
}.toMap
}

private def checkResend(): Unit = {
refService.clearQueriedActorRefs()
channelToCC.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload
import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics
import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics}

trait ClientEvent extends WorkflowFIFOMessagePayload

case class ExecutionStateUpdate(state: WorkflowAggregatedState) extends ClientEvent

case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent
case class ExecutionStatsUpdate(
operatorMetrics: Map[String, OperatorMetrics],
edgeStatistics: Seq[EdgeStatistics] = Seq.empty
) extends ClientEvent

case class RuntimeStatisticsPersist(operatorMetrics: Map[String, OperatorMetrics])
extends ClientEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ class Controller(
outputMessages.foreach(transferService.send)
cp.asyncRPCClient.sendToClient(
ExecutionStatsUpdate(
cp.workflowExecution.getAllRegionExecutionsStats
cp.workflowExecution.getAllRegionExecutionsStats,
cp.workflowExecution.getAllRegionEdgeStatistics
)
)
globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package org.apache.texera.amber.engine.architecture.controller.execution

import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
import org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, PhysicalOpIdentity}
import org.apache.texera.amber.engine.architecture.controller.execution.ExecutionUtils.aggregateMetrics
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._
import org.apache.texera.amber.engine.architecture.scheduling.{Region, RegionIdentity}
import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics
import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics}
import org.apache.texera.amber.util.VirtualIdentityUtils

import scala.collection.mutable

Expand Down Expand Up @@ -105,6 +106,68 @@ case class WorkflowExecution() {
*/
def getAllRegionExecutions: Iterable[RegionExecution] = regionExecutions.values

def getAllRegionEdgeStatistics: Seq[EdgeStatistics] = {
val channelUsage: mutable.HashMap[ChannelIdentity, Long] = mutable.HashMap()

getAllRegionExecutions.foreach { regionExecution =>
regionExecution.getAllOperatorExecutions.foreach {
case (_, operatorExecution) =>
operatorExecution.getWorkerIds.foreach { workerId =>
val stats = operatorExecution.getWorkerExecution(workerId).getStats
stats.channelUsageBytes.foreach {
case (encodedChannelId, usageBytes) =>
val bytes =
java.util.Base64.getDecoder.decode(encodedChannelId)
val channelId = ChannelIdentity.parseFrom(bytes)
val value = Math.max(0L, usageBytes)
// Keep latest sampled usage for this channel.
channelUsage.update(channelId, value)
}
}
}
}

val logicalEdgeUsage =
mutable.HashMap[(String, String), mutable.ArrayBuffer[Long]]()

channelUsage.foreach {
case (channelId, usageBytes) =>
val fromLogical = VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId).logicalOpId.id
val toLogical = VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId).logicalOpId.id
logicalEdgeUsage
.getOrElseUpdate((fromLogical, toLogical), mutable.ArrayBuffer())
.append(usageBytes)
}

val edgeUsage =
mutable.HashMap[(String, Int, String, Int), mutable.ArrayBuffer[Long]]()
val edgeChannelCounts = mutable.HashMap[(String, Int, String, Int), Int]()

getAllRegionExecutions.foreach { regionExecution =>
regionExecution.region.resourceConfig.foreach { resourceConfig =>
resourceConfig.linkConfigs.foreach {
case (link, linkConfig) =>
val fromOpId = link.fromOpId.logicalOpId.id
val toOpId = link.toOpId.logicalOpId.id
val fromPortId = link.fromPortId.id
val toPortId = link.toPortId.id
val key = (fromOpId, fromPortId, toOpId, toPortId)
edgeUsage.getOrElseUpdate(key, mutable.ArrayBuffer())
edgeChannelCounts.update(key, linkConfig.channelConfigs.size)
logicalEdgeUsage.get((fromOpId, toOpId)).foreach { usages =>
edgeUsage.getOrElseUpdate(key, mutable.ArrayBuffer()).appendAll(usages)
}
}
}
}

edgeUsage.map {
case ((fromOpId, fromPortId, toOpId, toPortId), usages) =>
val avgUsage = if (usages.nonEmpty) usages.sum / usages.size else 0L
EdgeStatistics(fromOpId, fromPortId, toOpId, toPortId, avgUsage)
}.toSeq
}

/**
* Retrieves the latest `OperatorExecution` associated with the specified physical operatorId.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ trait PauseHandler {
.map { _ =>
// update frontend workflow status and persist statistics
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics
sendToClient(ExecutionStatsUpdate(stats, edgeStats))
sendToClient(RuntimeStatisticsPersist(stats))
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
logger.info(s"workflow paused")
logger.info("workflow paused")
}
EmptyReturn()
}
Expand Down
Loading
Loading