-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.go
More file actions
337 lines (271 loc) · 10.1 KB
/
executor.go
File metadata and controls
337 lines (271 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package executor
import (
"context"
"errors"
"log/slog"
"time"
"github.com/zalgonoise/cfg"
"github.com/zalgonoise/micron/v3/schedule"
"github.com/zalgonoise/micron/v3/schedule/cronlex"
"github.com/zalgonoise/x/errs"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"github.com/zalgonoise/micron/v3/log"
"github.com/zalgonoise/micron/v3/metrics"
)
const (
defaultID = "micron.executor"
bufferPeriod = 100 * time.Millisecond
errDomain = errs.Domain("micron/executor")
ErrEmpty = errs.Kind("empty")
ErrAlreadyExecuted = errs.Kind("already executed")
ErrRunnerList = errs.Entity("runners list")
ErrScheduler = errs.Entity("scheduler")
ErrReboot = errs.Entity("reboot")
)
const (
defaultTimeout = 10 * time.Minute
)
var (
ErrEmptyRunnerList = errs.WithDomain(errDomain, ErrEmpty, ErrRunnerList)
ErrEmptyScheduler = errs.WithDomain(errDomain, ErrEmpty, ErrScheduler)
ErrAlreadyExecutedReboot = errs.WithDomain(errDomain, ErrAlreadyExecuted, ErrReboot)
)
// Scheduler describes the capabilities of a cron job scheduler. Its sole responsibility is to provide
// the timestamp for the next job's execution, after calculating its frequency from its configuration.
//
// Scheduler exposes one method, Next, that takes in a context.Context and a time.Time. It is implied that the
// input time is the time.Now value, however it is open to any input that the caller desires to pass to it. The returned
// time.Time value must always be the following occurrence according to the schedule, in the context of the input time.
//
// Implementations of Next should take into consideration the cron specification; however the interface allows a custom
// approach to the scheduler, especially if added functionality is necessary (e.g. frequency overriding schedulers,
// dynamic frequencies, and pipeline-approaches where the frequency is evaluated after a certain check).
type Scheduler interface {
// Next calculates and returns the following scheduled time, from the input time.Time.
Next(ctx context.Context, now time.Time) time.Time
}
// Runner describes a type that executes a job or task. It contains only one method, Run, that is called with a
// context as input and returns an error.
//
// Implementations of Runner only need to comply with this method, where the logic within Run is completely up to the
// actual implementation. These implementations need to be aware of the state of the input context.Context, which may
// denote cancellation or closure (e.g. with a timeout).
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
type Runner interface {
// Run executes the job or task.
//
// This call takes in a context.Context which may be used to denote cancellation or closure (e.g. with a timeout)
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
Run(ctx context.Context) error
}
// Runnable is a custom type for any function that takes in a context.Context and returns an error. This type of
// function can be perceived as a Runner type. For that, this custom type will implement Runner by exposing a Run method
// that invokes the actual Runnable function.
type Runnable func(ctx context.Context) error
// Run executes the job or task.
//
// This call takes in a context.Context which may be used to denote cancellation or closure (e.g. with a timeout)
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
func (r Runnable) Run(ctx context.Context) error {
if r == nil {
return nil
}
return r(ctx)
}
// Metrics describes the actions that register Executor-related metrics.
type Metrics interface {
// IncExecutorExecCalls increases the count of Exec calls, by the Executor.
IncExecutorExecCalls(ctx context.Context, id string)
// IncExecutorExecErrors increases the count of Exec call errors, by the Executor.
IncExecutorExecErrors(ctx context.Context, id string)
// ObserveExecLatency registers the duration of an Exec call, by the Executor.
ObserveExecLatency(ctx context.Context, id string, dur time.Duration)
// IncExecutorNextCalls increases the count of Next calls, by the Executor.
IncExecutorNextCalls(ctx context.Context, id string)
}
// Executable is an implementation of the Executor interface. It uses a schedule.Scheduler to mark the next job's
// execution time, and supports multiple Runner.
type Executable struct {
id string
cron Scheduler
runners []Runner
timeout time.Duration
logger *slog.Logger
metrics Metrics
tracer trace.Tracer
}
func JoinSchedules(execs []*Executable) map[*cronlex.Schedule][]*Executable {
m := make(map[*cronlex.Schedule][]*Executable, len(execs))
for i := range execs {
if cronSchedule, ok := execs[i].cron.(*schedule.CronSchedule); ok && cronSchedule != nil {
if _, ok := m[cronSchedule.Schedule]; !ok {
m[cronSchedule.Schedule] = make([]*Executable, 0, len(execs))
}
m[cronSchedule.Schedule] = append(m[cronSchedule.Schedule], execs[i])
}
}
return m
}
func IsReboot(exec interface {
Exec(ctx context.Context, now time.Time) error
Next(ctx context.Context, now time.Time) time.Duration
ID() string
}) bool {
if exec == nil {
return false
}
e, ok := exec.(*Executable)
if !ok {
return false
}
if e.cron == nil {
return false
}
_, ok = e.cron.(*schedule.RebootSchedule)
return ok
}
// Next calls the Executor's underlying schedule.Scheduler Next method.
func (e *Executable) Next(ctx context.Context, now time.Time) time.Duration {
ctx, span := e.tracer.Start(ctx, "Executor.Next")
defer span.End()
e.metrics.IncExecutorNextCalls(ctx, e.id)
next := e.cron.Next(ctx, now)
e.logger.DebugContext(ctx, "next job",
slog.String("id", e.id),
slog.Time("at", next),
)
span.SetAttributes(
attribute.String("id", e.id),
attribute.String("at", next.Format(time.RFC3339)),
)
return next.Sub(now)
}
// Exec runs the task when on its scheduled time.
//
// For this, Exec leverages the Executor's underlying schedule.Scheduler to retrieve the job's next execution time,
// waits for it, and calls Runner.Run on each configured Runner. All raised errors are joined and returned at the end
// of this call.
func (e *Executable) Exec(ctx context.Context, now time.Time) error {
ctx, span := e.tracer.Start(ctx, "Executor.Exec")
defer span.End()
span.SetAttributes(attribute.String("id", e.id))
e.metrics.IncExecutorExecCalls(ctx, e.id)
e.logger.InfoContext(ctx, "executing task", slog.String("id", e.id))
defer func() {
e.metrics.ObserveExecLatency(ctx, e.id, time.Since(now))
}()
// check for reboot executions
if rs, ok := e.cron.(*schedule.RebootSchedule); ok && rs.IsDone() {
return ErrAlreadyExecutedReboot
}
next := e.cron.Next(ctx, now)
timer := time.NewTimer(next.Sub(now))
select {
case <-ctx.Done():
err := ctx.Err()
e.metrics.IncExecutorExecErrors(ctx, e.id)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
e.logger.WarnContext(ctx, "task cancelled",
slog.String("id", e.id),
slog.String("error", err.Error()),
)
return err
case <-timer.C:
// avoid executing before it's time, as it may trigger repeated runs
if preTriggerDuration := time.Since(next); preTriggerDuration > 0 {
time.Sleep(preTriggerDuration + bufferPeriod)
}
runnerErrs := make([]error, 0, len(e.runners))
runCtx, runCancel := context.WithTimeout(ctx, e.timeout)
defer runCancel()
for i := range e.runners {
if err := e.runners[i].Run(runCtx); err != nil {
runnerErrs = append(runnerErrs, err)
}
}
if len(runnerErrs) > 0 {
err := errors.Join(runnerErrs...)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
e.metrics.IncExecutorExecErrors(ctx, e.id)
e.logger.ErrorContext(ctx, "task execution error(s)",
slog.String("id", e.id),
slog.Int("num_errors", len(runnerErrs)),
slog.String("errors", err.Error()),
)
return err
}
return nil
}
}
// ID returns this Executor's ID.
func (e *Executable) ID() string {
return e.id
}
// New creates an Executor with the input cfg.Option(s), also returning an error if raised.
//
// The minimum requirements to create an Executor is to supply at least one Runner, be it an implementation of
// this interface or as a Runnable using the WithRunners option, as well as a schedule.Scheduler using the
// WithScheduler option -- alternatively, callers can simply pass a cron string directly using the WithSchedule option.
//
// If an ID is not supplied, then the default ID of `micron.executor` is set.
func New(id string, runners []Runner, options ...cfg.Option[*Executable]) (*Executable, error) {
e := cfg.Set(defaultExecutable(), options...)
e.runners = runners
return validate(id, e)
}
func validate(id string, e *Executable) (*Executable, error) {
if len(e.runners) == 0 {
return nil, ErrEmptyRunnerList
}
if e.cron == nil {
return nil, ErrEmptyScheduler
}
if id == "" {
id = defaultID
}
e.id = id
if e.logger == nil {
e.logger = slog.New(log.NoOp())
}
if e.metrics == nil {
e.metrics = metrics.NoOp()
}
if e.tracer == nil {
e.tracer = noop.NewTracerProvider().Tracer("micron.executor")
}
return e, nil
}
// NoOp returns a no-op Executor.
func NoOp() noOpExecutor {
return noOpExecutor{}
}
type noOpExecutor struct{}
// Exec runs the task when on its scheduled time.
//
// This is a no-op call, it has no effect and the returned error is always nil.
func (e noOpExecutor) Exec(_ context.Context, _ time.Time) error {
return nil
}
// Next calls the Executor's underlying schedule.Scheduler Next method.
//
// This is a no-op call, it has no effect and the returned time is always zero.
func (e noOpExecutor) Next(_ context.Context, _ time.Time) (t time.Duration) {
return t
}
// ID returns this Executor's ID.
//
// This is a no-op call, it has no effect and the returned string is always empty.
func (e noOpExecutor) ID() string {
return ""
}