Skip to content

Commit dd84948

Browse files
sophiatevSophia Tevosyan
andauthored
Extended Sessions for Isolated (Orchestrations) (#449)
* updated to use c# built in memory cache * fixed bug where new events were not cleared * added a new field to the OrchestratorResponse, restored the old public facing method without the extended sessions parameter * removed unused ExtendedSessions object * fixing line endings * added comments to the ExtendedSessionState class and the memory cache import to props * fixing props line endings * added a wrapper ExtendedSessionsCache object * added comments * updated the expiration scan frequency * addressing some comments * fixing indentation * adding proto files * added a max frequency to cache scan expiration * reverting to old implementation without a max * addressing comments * adding updated protos * added null check * missed an earlier PR comment * added the durabletask packages from the test source * updated reference to new preview package * pushing the updated yml * forgot to update the release package name * adding unit tests * had the wrong default timeout * fixed failing test * adding new dependencies and new package number * pushing bug fix for not honoring a host restarting an extended session * updating version numbers * added proto changes from main branch * downgrading the cache package to a version that is test with net6.0 * reverting ci changes * adding caching package * addressing PR comments * missed one file * had a mistake in the LoadAndRun path * slight update to extended session parameter name * slight change to avoid unnecessary initialization of the cache if properties are not specified, also added another test to make sure that extended sessions aren't stored if isExtendedSessions is false * slight change to make the idle timeout need to be >0 rather than >=0 * updating version numbers and protos --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent 7396675 commit dd84948

File tree

10 files changed

+745
-29
lines changed

10 files changed

+745
-29
lines changed

Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
<!-- Microsoft.Extensions.* Packages -->
1111
<ItemGroup>
12+
<PackageVersion Include="Microsoft.Extensions.Caching.Memory" Version="6.0.3" />
1213
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
1314
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.2" />
1415
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
@@ -28,7 +29,7 @@
2829

2930
<!-- DurableTask Packages -->
3031
<ItemGroup>
31-
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.3.0" />
32+
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.4.0" />
3233
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.2.2" />
3334
</ItemGroup>
3435

eng/targets/Release.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</PropertyGroup>
1818

1919
<PropertyGroup>
20-
<VersionPrefix>1.14.0</VersionPrefix>
20+
<VersionPrefix>1.15.0</VersionPrefix>
2121
<VersionSuffix></VersionSuffix>
2222
</PropertyGroup>
2323

src/Grpc/orchestrator_service.proto

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,10 @@ message OrchestratorResponse {
342342
// The number of work item events that were processed by the orchestrator.
343343
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
344344
google.protobuf.Int32Value numEventsProcessed = 5;
345-
346345
OrchestrationTraceContext orchestrationTraceContext = 6;
346+
347+
// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
348+
bool requiresHistory = 7;
347349
}
348350

349351
message CreateInstanceRequest {
@@ -678,6 +680,18 @@ message AbandonEntityTaskResponse {
678680
// Empty.
679681
}
680682

683+
message SkipGracefulOrchestrationTerminationsRequest {
684+
// A maximum of 500 instance IDs can be provided in this list.
685+
repeated string instanceIds = 1;
686+
google.protobuf.StringValue reason = 2;
687+
}
688+
689+
message SkipGracefulOrchestrationTerminationsResponse {
690+
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
691+
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
692+
repeated string unterminatedInstanceIds = 1;
693+
}
694+
681695
service TaskHubSidecarService {
682696
// Sends a hello request to the sidecar service.
683697
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -751,6 +765,10 @@ service TaskHubSidecarService {
751765

752766
// Abandon an entity work item
753767
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
768+
769+
// "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
770+
// Note that a maximum of 500 orchestrations can be terminated at a time using this method.
771+
rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
754772
}
755773

756774
message GetWorkItemsRequest {

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,18 +279,19 @@ internal static Timestamp ToTimestamp(this DateTime dateTime)
279279
/// </param>
280280
/// <param name="entityConversionState">The entity conversion state, or null if no conversion is required.</param>
281281
/// <param name="orchestrationActivity">The <see cref="Activity" /> that represents orchestration execution.</param>
282+
/// <param name="requiresHistory">Whether or not a history is required to complete the orchestration request and none was provided.</param>
282283
/// <returns>The orchestrator response.</returns>
283284
/// <exception cref="NotSupportedException">When an orchestrator action is unknown.</exception>
284285
internal static P.OrchestratorResponse ConstructOrchestratorResponse(
285286
string instanceId,
286287
string executionId,
287288
string? customStatus,
288-
IEnumerable<OrchestratorAction> actions,
289+
IEnumerable<OrchestratorAction>? actions,
289290
string completionToken,
290291
EntityConversionState? entityConversionState,
291-
Activity? orchestrationActivity)
292+
Activity? orchestrationActivity,
293+
bool requiresHistory = false)
292294
{
293-
Check.NotNull(actions);
294295
var response = new P.OrchestratorResponse
295296
{
296297
InstanceId = instanceId,
@@ -302,8 +303,16 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
302303
SpanID = orchestrationActivity?.SpanId.ToString(),
303304
SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(),
304305
},
306+
RequiresHistory = requiresHistory,
305307
};
306308

309+
// If a history is required and the orchestration request was not completed, then there is no list of actions.
310+
if (requiresHistory)
311+
{
312+
return response;
313+
}
314+
315+
Check.NotNull(actions);
307316
foreach (OrchestratorAction action in actions)
308317
{
309318
var protoAction = new P.OrchestratorAction { Id = action.Id };
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
6+
namespace Microsoft.DurableTask.Worker;
7+
8+
/// <summary>
9+
/// Represents the state of an extended session for an orchestration.
10+
/// </summary>
11+
public class ExtendedSessionState
12+
{
13+
/// <summary>
14+
/// Initializes a new instance of the <see cref="ExtendedSessionState"/> class.
15+
/// </summary>
16+
/// <param name="state">The orchestration's runtime state.</param>
17+
/// <param name="taskOrchestration">The TaskOrchestration implementation of the orchestration.</param>
18+
/// <param name="orchestrationExecutor">The TaskOrchestrationExecutor for the orchestration.</param>
19+
public ExtendedSessionState(OrchestrationRuntimeState state, TaskOrchestration taskOrchestration, TaskOrchestrationExecutor orchestrationExecutor)
20+
{
21+
this.RuntimeState = state;
22+
this.TaskOrchestration = taskOrchestration;
23+
this.OrchestrationExecutor = orchestrationExecutor;
24+
}
25+
26+
/// <summary>
27+
/// Gets or sets the saved runtime state of the orchestration.
28+
/// </summary>
29+
public OrchestrationRuntimeState RuntimeState { get; set; }
30+
31+
/// <summary>
32+
/// Gets or sets the saved TaskOrchestration implementation of the orchestration.
33+
/// </summary>
34+
public TaskOrchestration TaskOrchestration { get; set; }
35+
36+
/// <summary>
37+
/// Gets or sets the saved TaskOrchestrationExecutor.
38+
/// </summary>
39+
public TaskOrchestrationExecutor OrchestrationExecutor { get; set; }
40+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.Extensions.Caching.Memory;
5+
6+
namespace Microsoft.DurableTask.Worker;
7+
8+
/// <summary>
9+
/// A cache for extended sessions that wraps a <see cref="MemoryCache"/> instance.
10+
/// Responsible for holding <see cref="ExtendedSessionState"/> for orchestrations that are running within extended sessions.
11+
/// </summary>
12+
public class ExtendedSessionsCache : IDisposable
13+
{
14+
MemoryCache? extendedSessions;
15+
16+
/// <summary>
17+
/// Gets a value indicating whether returns whether or not the cache has been initialized.
18+
/// </summary>
19+
/// <returns>True if the cache has been initialized, false otherwise.</returns>
20+
public bool IsInitialized => this.extendedSessions is not null;
21+
22+
/// <summary>
23+
/// Dispose the cache and release all resources.
24+
/// </summary>
25+
public void Dispose()
26+
{
27+
this.extendedSessions?.Dispose();
28+
GC.SuppressFinalize(this);
29+
}
30+
31+
/// <summary>
32+
/// Gets the cache for extended sessions if it has already been initialized, or otherwise initializes it with the given expiration scan frequency.
33+
/// </summary>
34+
/// <param name="expirationScanFrequencyInSeconds">
35+
/// The expiration scan frequency of the cache, in seconds.
36+
/// This specifies how often the cache checks for stale items, and evicts them.
37+
/// </param>
38+
/// <returns>The IMemoryCache that holds the cached <see cref="ExtendedSessionState"/>.</returns>
39+
public MemoryCache GetOrInitializeCache(double expirationScanFrequencyInSeconds)
40+
{
41+
this.extendedSessions ??= new MemoryCache(new MemoryCacheOptions
42+
{
43+
ExpirationScanFrequency = TimeSpan.FromSeconds(expirationScanFrequencyInSeconds / 5),
44+
});
45+
46+
return this.extendedSessions;
47+
}
48+
}

src/Worker/Core/Worker.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The worker is responsible for processing durable task work items.</PackageDescri
99
</PropertyGroup>
1010

1111
<ItemGroup>
12+
<PackageReference Include="Microsoft.Extensions.Caching.Memory" />
1213
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
1314
<PackageReference Include="Microsoft.Extensions.Options" />
1415
<PackageReference Include="System.Text.Json" />

src/Worker/Grpc/GrpcOrchestrationRunner.cs

Lines changed: 136 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using DurableTask.Core.History;
66
using Google.Protobuf;
77
using Microsoft.DurableTask.Worker.Shims;
8+
using Microsoft.Extensions.Caching.Memory;
89
using Microsoft.Extensions.DependencyInjection;
910
using P = Microsoft.DurableTask.Protobuf;
1011

@@ -82,6 +83,40 @@ public static string LoadAndRun(
8283
string encodedOrchestratorRequest,
8384
ITaskOrchestrator implementation,
8485
IServiceProvider? services = null)
86+
{
87+
return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services);
88+
}
89+
90+
/// <summary>
91+
/// Deserializes orchestration history from <paramref name="encodedOrchestratorRequest"/> and uses it to resume the
92+
/// orchestrator implemented by <paramref name="implementation"/>.
93+
/// </summary>
94+
/// <param name="encodedOrchestratorRequest">
95+
/// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
96+
/// </param>
97+
/// <param name="implementation">
98+
/// An <see cref="ITaskOrchestrator"/> implementation that defines the orchestrator logic.
99+
/// </param>
100+
/// <param name="extendedSessionsCache">
101+
/// The cache of extended sessions which can be used to retrieve the <see cref="ExtendedSessionState"/> if this orchestration request is from within an extended session.
102+
/// </param>
103+
/// <param name="services">
104+
/// Optional <see cref="IServiceProvider"/> from which injected dependencies can be retrieved.
105+
/// </param>
106+
/// <returns>
107+
/// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
108+
/// </returns>
109+
/// <exception cref="ArgumentNullException">
110+
/// Thrown if <paramref name="encodedOrchestratorRequest"/> or <paramref name="implementation"/> is <c>null</c>.
111+
/// </exception>
112+
/// <exception cref="ArgumentException">
113+
/// Thrown if <paramref name="encodedOrchestratorRequest"/> contains invalid data.
114+
/// </exception>
115+
public static string LoadAndRun(
116+
string encodedOrchestratorRequest,
117+
ITaskOrchestrator implementation,
118+
ExtendedSessionsCache? extendedSessionsCache,
119+
IServiceProvider? services = null)
85120
{
86121
Check.NotNullOrEmpty(encodedOrchestratorRequest);
87122
Check.NotNull(implementation);
@@ -95,34 +130,114 @@ public static string LoadAndRun(
95130
pair => pair.Key,
96131
pair => ProtoUtils.ConvertValueToObject(pair.Value));
97132

98-
// Re-construct the orchestration state from the history.
99-
// New events must be added using the AddEvent method.
100-
OrchestrationRuntimeState runtimeState = new(pastEvents);
101-
foreach (HistoryEvent newEvent in newEvents)
102-
{
103-
runtimeState.AddEvent(newEvent);
133+
OrchestratorExecutionResult? result = null;
134+
MemoryCache? extendedSessions = null;
135+
136+
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
137+
bool addToExtendedSessions = false;
138+
bool requiresHistory = false;
139+
bool pastEventsIncluded = true;
140+
bool isExtendedSession = false;
141+
double extendedSessionIdleTimeoutInSeconds = 0;
142+
143+
// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
144+
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
145+
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
146+
&& extendedSessionIdleTimeout > 0
147+
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
148+
&& extendedSessionObj is bool extendedSession)
149+
{
150+
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
151+
isExtendedSession = extendedSession;
152+
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
153+
}
154+
155+
if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
156+
&& includePastEventsObj is bool includePastEvents)
157+
{
158+
pastEventsIncluded = includePastEvents;
159+
}
160+
161+
if (isExtendedSession && extendedSessions != null)
162+
{
163+
// If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended
164+
// session based on the provided history
165+
if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
166+
{
167+
OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState;
168+
runtimeState.NewEvents.Clear();
169+
foreach (HistoryEvent newEvent in newEvents)
170+
{
171+
runtimeState.AddEvent(newEvent);
172+
}
173+
174+
result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents();
175+
if (extendedSessionState.OrchestrationExecutor.IsCompleted)
176+
{
177+
extendedSessions.Remove(request.InstanceId);
178+
}
179+
}
180+
else
181+
{
182+
extendedSessions.Remove(request.InstanceId);
183+
addToExtendedSessions = true;
184+
}
104185
}
105186

106-
TaskName orchestratorName = new(runtimeState.Name);
107-
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
108-
? new(new(p.Name), p.OrchestrationInstance.InstanceId)
109-
: null;
187+
if (result == null)
188+
{
189+
// DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the
190+
// session and lost the orchestration history so we cannot replay the orchestration.
191+
if (!pastEventsIncluded)
192+
{
193+
requiresHistory = true;
194+
}
195+
else
196+
{
197+
// Re-construct the orchestration state from the history.
198+
// New events must be added using the AddEvent method.
199+
OrchestrationRuntimeState runtimeState = new(pastEvents);
200+
201+
foreach (HistoryEvent newEvent in newEvents)
202+
{
203+
runtimeState.AddEvent(newEvent);
204+
}
205+
206+
TaskName orchestratorName = new(runtimeState.Name);
207+
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
208+
? new(new(p.Name), p.OrchestrationInstance.InstanceId)
209+
: null;
110210

111-
DurableTaskShimFactory factory = services is null
112-
? DurableTaskShimFactory.Default
113-
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
114-
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
115-
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
116-
OrchestratorExecutionResult result = executor.Execute();
211+
DurableTaskShimFactory factory = services is null
212+
? DurableTaskShimFactory.Default
213+
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
214+
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
215+
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
216+
result = executor.Execute();
217+
218+
if (addToExtendedSessions && !executor.IsCompleted)
219+
{
220+
extendedSessions.Set<ExtendedSessionState>(
221+
request.InstanceId,
222+
new(runtimeState, shim, executor),
223+
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
224+
}
225+
else
226+
{
227+
extendedSessions?.Remove(request.InstanceId);
228+
}
229+
}
230+
}
117231

118232
P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
119-
request.InstanceId,
233+
request.InstanceId,
120234
request.ExecutionId,
121-
result.CustomStatus,
122-
result.Actions,
235+
result?.CustomStatus,
236+
result?.Actions,
123237
completionToken: string.Empty, /* doesn't apply */
124-
entityConversionState: null,
125-
orchestrationActivity: null);
238+
entityConversionState: null,
239+
orchestrationActivity: null,
240+
requiresHistory: requiresHistory);
126241
byte[] responseBytes = response.ToByteArray();
127242
return Convert.ToBase64String(responseBytes);
128243
}

0 commit comments

Comments
 (0)