Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
40302fe
Implement enhanced worker crash handling in JobDispatcher
Nov 28, 2025
7a5bb52
agentknob changes for worker crash
Nov 28, 2025
c05c057
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Nov 28, 2025
f2f4138
enhanced worker crash reporting for listner and added telemetry
Dec 3, 2025
95aedec
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 3, 2025
f0a3af4
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 3, 2025
e18c663
removed feature flag telemetry
Dec 4, 2025
425e29b
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 4, 2025
af10a40
cleanup the code
Dec 5, 2025
1d3c339
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 5, 2025
1944d29
refactored the chnages
Dec 10, 2025
eedd2e4
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 10, 2025
ea914c3
refactored the changes
Dec 10, 2025
50fe7d7
refactored the changes
Dec 10, 2025
8e617cf
refactored the changes
Dec 10, 2025
f2699ce
refactored the changes
Dec 10, 2025
e67f638
refactored the changes
Dec 10, 2025
0448035
added retry mechanism
Dec 11, 2025
b56bfc8
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 12, 2025
3a093df
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 15, 2025
7342cee
added try-catch
Dec 16, 2025
be0f03a
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 16, 2025
3c5cd62
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 16, 2025
641d8a0
added tracepoint in telemetry
Dec 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have sendEvent to server method here as well, as here only we are logging if worker is terminated due to unhandled exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I did not do that because the completion reporting where after the below code and I kept it as it is.

// stop renew lock
lockRenewalTokenSource.Cancel();
// renew job request should never blows up.
await renewJobRequest;

I asked from copilot regarding can we keep reporting after we are sending telemetry and it is saying that current order is correct and given the below reason, not sure whether it will be correct to keep before or not.

Reason:

  • Background renewal task: The renewJobRequest task is running continuously in the background to keep the job lock alive
  • Race condition prevention: Without cancellation, renewal might continue even after job completion

// Publish worker crash telemetry for Kusto analysis
var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>();
await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, message.JobId, returnCode);
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Expand All @@ -641,8 +645,40 @@ await processChannel.SendAsync(
await renewJobRequest;

Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}");
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

// Check if enhanced crash handling is enabled via agent knob
bool enhancedworkercrashhandlingenabled = AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean();
Trace.Info($"Enhanced worker crash handling enabled: {enhancedworkercrashhandlingenabled}");

if (enhancedworkercrashhandlingenabled)
{
bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent);
bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode);

Trace.Info($"Enhanced crash handling enabled - Normal completion crash analysis [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, IsPlanV8Plus:{isPlanV8Plus}, IsWorkerCrash:{isWorkerCrash}, ExitCode:{returnCode}, NeedsForcedCompletion:{isPlanV8Plus && isWorkerCrash}]");

if (isPlanV8Plus && isWorkerCrash)
{
// Direct plan event reporting for Plan v8+ worker crashes
Trace.Warning($"Plan event reporting for Plan v8+ worker crash [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await ReportJobCompletionEventAsync(message, result);
Trace.Info("Plan event reporting executed successfully for worker crash");
}
else
{
// Standard completion for Plan v7 or normal Plan v8+ scenarios
Trace.Info($"Standard completion for normal scenario [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
Trace.Info("Standard completion executed successfully");
}
}
else
{
// Original simple completion logic
Trace.Info($"Using previous completion logic [JobId:{message.JobId}, EnhancedHandling:Disabled]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
}

Trace.Info("Job request completion completed");

// print out unhandled exception happened in worker after we complete job request.
Expand Down Expand Up @@ -971,6 +1007,81 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
throw new AggregateException(exceptions);
}

// Reports job completion to server via plan event (similar to how worker reports)
// Used for Plan v8+ scenarios where listener needs to notify server of job completion
private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result)
{
Trace.Info($"Plan event reporting initiated - Sending job completion event to server [JobId:{message.JobId}, Result:{result}]");

try
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
try
{
Uri urlResult = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}");
jobServerUrl = urlResult;
}
}
catch (InvalidOperationException ex)
{
Trace.Error(ex);
}
catch (UriFormatException ex)
{
Trace.Error(ex);
}
}

using (var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation: false))
{
await jobServer.ConnectAsync(jobConnection);
// Create job completed event (similar to worker)
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, false);
try
{
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
Trace.Info($"Plan event reporting completed successfully [JobId:{message.JobId}, Result:{result}]");
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (Exception ex)
{
Trace.Error($"Exception during plan event reporting for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}
}
catch (Exception ex)
{
Trace.Error($"Critical error during plan event reporting setup for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}

// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
Expand Down
50 changes: 50 additions & 0 deletions src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Newtonsoft.Json;

namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry
{
[ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))]
public interface IWorkerCrashTelemetryPublisher : IAgentService
{
Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode);
}

public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher
{
public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode)
{
try
{
var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>();

var telemetryData = new Dictionary<string, object>
{
["JobId"] = jobId.ToString(),
["TaskInstanceId"] = taskInstanceId?.ToString() ?? "N/A",
["ExitCode"] = exitCode.ToString()
};

var command = new Command("telemetry", "publish")
{
Data = JsonConvert.SerializeObject(telemetryData)
};
command.Properties.Add("area", "AzurePipelinesAgent");
command.Properties.Add("feature", "WorkerCrash");

await telemetryPublisher.PublishEvent(hostContext, command);
Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}");
}
catch (Exception ex)
{
Trace.Warning($"Failed to publish worker crash telemetry: {ex.Message}");
}
}
}
}
6 changes: 6 additions & 0 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ public class AgentKnobs
new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnhancedWorkerCrashHandling = new Knob(
nameof(EnhancedWorkerCrashHandling),
"If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events",
new EnvironmentKnobSource("ENHANCED_WORKER_CRASH_HANDLING"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob AllowWorkDirectoryRepositories = new Knob(
nameof(AllowWorkDirectoryRepositories),
"Allows repositories to be checked out below work directory level on self hosted agents.",
Expand Down
Loading