Essential knowledge
Triggers use bookmarks and scheduling, which are analyzed in my other two articles.
what is a trigger
You can directly call the process engine's IWorkflowRuntime to obtain the IWorkflowClient, and then call its CreateAndRunInstanceAsync to start a new process.
You can also let the process engine listen to an event. When the event is triggered, it will automatically create and execute (or restore with the help of bookmarks) a process instance. This is a trigger.
For example, define a trigger to automatically start a specified process when the specified file changes. Another example is defining a trigger to automatically trigger the execution of a certain process every 5 minutes.
Configure triggers in process definition
Elsa provides code or visual designer to define the process. Since the trigger is just a special Activity, you can also drag the trigger into the process definition through code or in the designer.
Different types of triggers require different parameters to be configured.
For example: elsa's built-in StartAt means that it will automatically trigger execution at a specified time point, so you need to set its DateTime to mean that it will automatically trigger at this time point.
Another example: HttpEndpoint is another built-in trigger of elsa. It is automatically triggered when it listens to a specified request, so you need to configure its listening address, Http method, whether to make authorization judgment, and other attributes.
Trigger storage (indexed)
Triggers are defined in the process definition, and there may be multiple triggers of the same or different types in a process definition. Triggers are extracted from all process definitions and stored in a list separately. When the system At startup, or when you need to access the trigger configuration in the entire system, you can quickly get the trigger directly from this list. This is faster than traversing all process definitions each time and extracting triggers from them. This is trigger indexing. If you use ef configured for elsa persistence, then it will be stored in the Triggers table.
The trigger indexer is represented by the ITriggerIndexer interface. The default implementation is TriggerIndexer, which provides the functions of saving, deleting, and obtaining triggers. When saving, it will obtain the list of triggers defined in it based on the process or process definition, and then call its GetTriggerPayloadsAsync method to obtain the parameters of the trigger configuration. This parameter is usually generated based on the trigger attributes. What is more special is that some In triggers, GetTriggerPayloadsAsync will return multiple payloads, which will cause multiple records to be stored in the trigger index list. For example, the built-in trigger HttpEndpoint will use multiple Http methods configured by the user. Return multiple data. If you configure GET POST, the trigger index list will store the corresponding two records. When the same URL address is externally requested in the future, whether it is get or post, the HttpEndpoint node will be executed.
When the process is published after the process definition is changed, or when the process definition is directly refreshed, or in other circumstances, after the process definition is changed, ITriggerIndexer will be called to regenerate and save the trigger. After saving, the WorkflowTriggersIndexed event will be triggered.
Therefore, the indexer also plays a strange role, that is, it allows us to configure trigger-related parameters in the process definition, and the external listening function to cooperate with the trigger can be obtained from persistence or obtain the configuration data of the trigger from event parameters. , thereby controlling the monitoring logic.
The listening part outside the trigger
Monitoring is not defined within the trigger node, but is coordinated externally, such as the HttpEndpont trigger. Monitoring is implemented by a separate core middleware, but this middleware should rely on the HttpEndpont trigger we provide when configuring the process. defined parameters.
There is an UpdateRouteTable in the external part triggered by HttpPoint, which listens to WorkflowTriggersIndexed, obtains the monitored address according to the event parameters, and then configures the route. In addition, the core middleware can also obtain triggers directly from persistence, then access the trigger configuration parameters in the payload, and control the execution process of this middleware based on these parameters.
The external part that cooperates with timer-related triggers such as Timer StartAt Cron is ScheduleWorkflows, which also listens to the WorkflowTriggersIndexed event. During event processing, the elsa scheduler is called to schedule background jobs to allow the trigger to be executed after the specified time.
The trigger node is executed
Triggers are special activities. If there is a process: A → B → C, where B is a trigger, the current process may not be triggered by B's external monitoring. It may be that after A is executed, it flows to B. Causes B's ExecuteAsync to be executed.
It is used to judge this situation. If the current process is triggered by this node, it is true, otherwise it is false.
Therefore, the ExecuteAsync method usually needs to judge these two situations when the trigger is executed.
Built-in HttpEndpoint trigger analysis
Here we analyze the built-in HttpEndpoint trigger, but only focus on the principle part of the trigger to help us understand more deeply how the trigger works. It is defined in a module and it inherits fromTrigger<HttpRequest>
Trigger related input parameters
Parameter name | describe |
---|---|
SupportedMethods | Which http methods to monitor, optional values: "GET", "POST", "PUT", "HEAD", "DELETE" |
Authorize | When the monitored address is requested, whether to make permission judgment |
Policy | Policy name used for permission determination |
Path | Monitoring url address |
RequestTimeout | Request timeout settings |
RequestSizeLimit | Request body size limit |
Core source code:
When the process is published, the GetTriggerPayloads method will be called, and it will return the above input parameters. These input parameters will eventually be saved in the database, and the WorkflowTriggersIndexed event will also be triggered. These monitoring-related parameters will also be saved to the parameters of this event. .
This method will return one or more objects based on the configured SupportedMethods, eventually causing multiple corresponding records to appear in the trigger index list.
UpdateRouteTable
It listens to the WorkflowTriggersIndexed event, gets the Path from the event parameters, and then updates the elsa routing table
public class UpdateRouteTable(IRouteTableUpdater routeTableUpdater, IOptions<HttpActivityOptions> options) :
INotificationHandler<WorkflowTriggersIndexed>,
INotificationHandler<WorkflowBookmarksIndexed>
{
/// <inheritdoc />
public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
{
();
await (, cancellationToken);
await (, cancellationToken);
}
HttpWorkflowsMiddleware
elsa http endpoint listening middleware, just look at the comments
{
[] = true,
[] = ()
};
var cancellationToken = ;
var request = ;
var method = ();
var httpWorkflowLookupService = ();
var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
//Calculate hash value based on request path http method and HttpEndpoint
var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
//According to the above hash value, obtain the matching workflow and its trigger list from the storage
var lookupResult = await (bookmarkHash, cancellationToken);
if (lookupResult != null)
{
//If the process is found and it contains only one trigger that matches the current request, it means that it matches and the process is executed, otherwise an error is reported.
var triggers = ;
if(>1)
{
//Report error
await HandleMultipleWorkflowsFoundAsync(httpContext, () => (x => new
{
}), cancellationToken);
return;
}
var trigger = ();
if (trigger != null)
{
var workflowGraph = !;
//The node where the trigger is located in the execution process
await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
return;
}
}
//If the trigger node has already been executed, that is, it was transferred from other nodes before, a bookmark will be created when the trigger node is executed, and the execution will be resumed directly based on the bookmark.
var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cancellationToken).ToList();
//If multiple matches are found, an error will be reported
if(>1)
{
await HandleMultipleWorkflowsFoundAsync(httpContext, () => (x => new
{
}), cancellationToken);
return;
}
var bookmark = ();
if (bookmark != null)
{
//Resume bookmark execution
await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
return;
}
// If the basic addresses are matched but the corresponding process is not found, a 404 error will be thrown.
if (basePath != null)
{
await (cancellation: cancellationToken);
return;
}
// If no base path was configured, the request should be handled by subsequent middlewares.
await next(httpContext);
}
If the previous middleware matches the current trigger node
();
var httpContext = ;
//The trigger is also an activity. It may be executed directly instead of being executed by an http request. You need to wait for a traditional bookmark request to resume OnResumeAsync.
if (httpContext == null)
{
// We're executing in a non-HTTP context (. in a virtual actor).
// Create a bookmark to allow the invoker to export the state and resume execution from there.
(OnResumeAsync, );
return;
}
//Otherwise, it means that the trigger of the current process is itself and was triggered by the http request.
await HandleRequestAsync(context, httpContext);
}
Regardless of whether HttpWorkflowsMiddleware is matched and executed through the bookmark recovery process; or traditional bookmark recovery is performed, this will be executed.
();
var httpContext = ;
//When resuming execution, it may not be restored by http request, but may be restored by directly calling bookmarks.
if (httpContext == null)
{
// We're executing in a non-HTTP context (. in a virtual actor).
// Create a bookmark to allow the invoker to export the state and resume execution from there.
(OnResumeAsync, );
return;
}
//Handle http request
await HandleRequestAsync(context, httpContext);
}
trigger scheduling
There are several built-in triggers related to time: Cron, StartAt, and Timer. They use elsa's workflow scheduling framework to trigger the execution process in background jobs according to the set time rules.
Trigger schedulerITriggerScheduler
ITriggerScheduler defines two methods, scheduling triggers and canceling trigger scheduling. DefaultTriggerScheduler is implemented by default, which uses elsa's process scheduler to implement process scheduling in background jobs. It is worth noting that DefaultTriggerScheduler always creates new process instances when they are triggered. The core source code is as follows:
triggers, CancellationToken cancellationToken = default)
{
var triggerList = ();
var timerTriggers = <>();
var startAtTriggers = ();
var cronTriggers = ();
var now = ;
// Schedule each Timer trigger.
foreach (var trigger in timerTriggers)
{
var (startAt, interval) = ();
var input = new { StartAt = startAt, Interval = interval }.ToDictionary();
//When scheduling a process job, it is required to create a new process instance
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = (),
TriggerActivityId = ,
Input = input
};
await (, request, startAt, interval, cancellationToken);
}
// Schedule each StartAt trigger.
foreach (var trigger in startAtTriggers)
{
var executeAt = ().ExecuteAt;
// If the trigger is in the past, log info and skip scheduling.
if (executeAt < now)
{
("StartAt trigger is in the past. TriggerId: {TriggerId}. ExecuteAt: {ExecuteAt}. Skipping scheduling", , executeAt);
continue;
}
var input = new { ExecuteAt = executeAt }.ToDictionary();
//When scheduling a process job, it is required to create a new process instance
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = (),
TriggerActivityId = ,
Input = input
};
await (, request, executeAt, cancellationToken);
}
// Schedule each Cron trigger.
foreach (var trigger in cronTriggers)
{
var payload = ();
var cronExpression = ;
if ((cronExpression))
{
("Cron expression is empty. TriggerId: {TriggerId}. Skipping scheduling of this trigger", );
continue;
}
var input = new { CronExpression = cronExpression }.ToDictionary();
//When scheduling a process job, it is required to create a new process instance
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = (),
TriggerActivityId = ,
Input = input
};
try
{
await (, request, cronExpression, cancellationToken);
}
catch (FormatException ex)
{
(ex, "Cron expression format error. CronExpression: {CronExpression}", cronExpression);
}
}
}
Take the StartAt trigger as an example
This is relatively simple. The trigger is triggered when the process is published, so the change event is triggered, and ScheduleWorkflows listens to this event.
, INotificationHandler
{
//...other code
public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
{
//Log out the previous background job
await _triggerScheduler.UnscheduleAsync(, cancellationToken);
//Use background job to restore the node where the trigger is located at the specified time point
await _triggerScheduler.ScheduleAsync(, cancellationToken);
}
Take a look at the internal execution logic of this trigger
protected override object GetTriggerPayload(TriggerIndexingContext context)
{
//Get payload from input parameters
var executeAt = (DateTime);
//Return for access by trigger scheduler
return new StartAtPayload(executeAt);
}
///
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
//If the execution of the current process is caused by the current trigger, it will be completed directly, because the time is up and the current method is executed for the second time.
if (())
{
await();
return;
}
//Otherwise, it means that another trigger caused the process to execute and flow here. At this time, the task should be scheduled.
//Get the specified trigger time from the input parameters
var executeAt = (DateTime);
var clock = ();
var now = ;
var logger = <>>();
("Executed At", now);
if (executeAt <= now)
{
("Scheduled trigger time lies in the past ('{Delta}'). Completing immediately", now - executeAt);
await();
return;
}
//The bookmark persistence middleware will save the bookmark and trigger the event. The event processor will schedule the bookmark and use this parameter.
var payload = new StartAtPayload(executeAt);
//The bookmark persistence middleware will save the bookmark and trigger the bookmark change event, and then the event processor schedules the bookmark to schedule tasks
//The bookmark scheduler will schedule according to the type StartAtPayload
//You need to read a separate chapter about scheduling.
(payload);
}