Location>code7788 >text

Elsa V3 Learning Flowchart Detail (above)

Popularity:524 ℃/2024-08-19 22:08:44

Previously, we have learned through the interface of some of the basic use of Elsa, if there are practical partners, you should be able to find that our workflow definition of the root, both our workflow canvas is actually an activity, is Flowchart. then this article will come to explain the following flowchart execution logic.

Flowchart Source Code

For your convenience, here is a direct posting of the flowchart source code first.

using ;
using ;
using ;
using ;
using ;
using ;
using ;
using ;
using ;
using ;
using ;

namespace ;

/// <summary>
/// A flowchart consists of a collection of activities and connections between them.
/// </summary>
[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
[Browsable(false)]
public class Flowchart : Container
{
    internal const string ScopeProperty = "Scope";

    /// <inheritdoc />
    public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
    {
        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
    }

    /// <summary>
    /// The activity to execute when the flowchart starts.
    /// </summary>
    [Port]
    [Browsable(false)]
    public IActivity? Start { get; set; }

    /// <summary>
    /// A list of connections between activities.
    /// </summary>
    public ICollection<Connection> Connections { get; set; } = new List<Connection>();

    /// <inheritdoc />
    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
    {
        var startActivity = GetStartActivity(context);

        if (startActivity == null)
        {
            // Nothing else to execute.
            await ();
            return;
        }

        // Schedule the start activity.
        await (startActivity, OnChildCompletedAsync);
    }

    private IActivity? GetStartActivity(ActivityExecutionContext context)
    {
        // If there's a trigger that triggered this workflow, use that.
        var triggerActivityId = ;
        var triggerActivity = triggerActivityId != null ? (x =>  == triggerActivityId) : default;

        if (triggerActivity != null)
            return triggerActivity;

        // If an explicit Start activity was provided, use that.
        if (Start != null)
            return Start;

        // If there is a Start activity on the flowchart, use that.
        var startActivity = (x => x is Start);

        if (startActivity != null)
            return startActivity;

        // If there's an activity marked as "Can Start Workflow", use that.
        var canStartWorkflowActivity = (x => ());

        if (canStartWorkflowActivity != null)
            return canStartWorkflowActivity;

        // If there is a single activity that has no inbound connections, use that.
        var root = GetRootActivity();

        if (root != null)
            return root;

        // If no start activity found, return the first activity.
        return ();
    }

    /// <summary>
    /// Checks if there is any pending work for the flowchart.
    /// </summary>
    private bool HasPendingWork(ActivityExecutionContext context)
    {
        var workflowExecutionContext = ;
        var activityIds = (x => ).ToList();
        var descendantContexts = ().Where(x =>  == context).ToList();
        var activityExecutionContexts = (x => ()).ToList();

        var hasPendingWork = ().Any(workItem =>
        {
            var ownerInstanceId = ?.Id;

            if (ownerInstanceId == null)
                return false;

            if (ownerInstanceId == )
                return true;

            var ownerContext = (x =>  == ownerInstanceId);
            var ancestors = ().ToList();

            return (x => x == context);
        });

        var hasRunningActivityInstances = (x =>  == );

        return hasRunningActivityInstances || hasPendingWork;
    }

    private IActivity? GetRootActivity()
    {
        // Get the first activity that has no inbound connections.
        var query =
            from activity in Activities
            let inboundConnections = (x =>  == activity)
            where !inboundConnections
            select activity;

        var rootActivity = ();
        return rootActivity;
    }

    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
    {
        var logger = <ILogger<Flowchart>>();
        var flowchartContext = ;
        var completedActivityContext = ;
        var completedActivity = ;
        var result = ;

        // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
        var scheduleChildren =  == ;
        var outcomeNames = result is Outcomes outcomes
            ? 
            : [null!, "Done"];

        // Only query the outbound connections if the completed activity wasn't already completed.
        var outboundConnections = (connection =>  == completedActivity && ()).ToList();
        var children = (x => ).ToList();
        var scope = (ScopeProperty, () => new FlowScope());

        (completedActivity);

        // If the complete activity is a terminal node, complete the flowchart immediately.
        if (completedActivity is ITerminalNode)
        {
            await ();
        }
        else if (scheduleChildren)
        {
            if (())
            {
                // Schedule each child, but only if all of its left inbound activities have already executed.
                foreach (var activity in children)
                {
                    var existingActivity = (activity);
                    (activity);

                    var inboundActivities = (activity).ToList();

                    // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                    if (!(completedActivity))
                    {
                        await (activity, OnChildCompletedAsync);
                        continue;
                    }

                    // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                    if (activity is not IJoinNode)
                    {
                        var executionCount = (activity);
                        var haveInboundActivitiesExecuted = (x => (x) > executionCount);

                        if (haveInboundActivitiesExecuted) 
                            await (activity, OnChildCompletedAsync);
                    }
                    else
                    {
                        // Select an existing activity execution context for this activity, if any.
                        var joinContext = (x =>
                             == flowchartContext &&  == activity);
                        var scheduleWorkOptions = new ScheduleWorkOptions
                        {
                            CompletionCallback = OnChildCompletedAsync,
                            ExistingActivityExecutionContext = joinContext,
                            PreventDuplicateScheduling = true
                        };

                        if (joinContext != null)
                            ("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", , );
                        else if (!existingActivity)
                            ("Next activity {ChildActivityId} is a join activity. Creating new join context", );
                        else
                        {
                            ("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", );
                            continue;
                        }

                        await (activity, scheduleWorkOptions);
                    }
                }
            }

            if (!())
            {
                await CompleteIfNoPendingWorkAsync(flowchartContext);
            }
        }

        (ScopeProperty, scope);
    }

    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
    {
        var hasPendingWork = HasPendingWork(context);

        if (!hasPendingWork)
        {
            var hasFaultedActivities = ().Any(x =>  == );

            if (!hasFaultedActivities)
            {
                await ();
            }
        }
    }

    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
    {
        var flowchartContext = ;
        var schedulingActivityContext = ;
        var schedulingActivity = ;
        var outcomes = ;
        var outboundConnections = (connection =>  == schedulingActivity && (!)).ToList();
        var outboundActivities = (x => ).ToList();

        if (())
        {
            // Schedule each child.
            foreach (var activity in outboundActivities) await (activity, OnChildCompletedAsync);
        }
    }

    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
    {
        var flowchartContext = ;
        var activity = ;
        var activityExecutionContext = ;

        if (activityExecutionContext != null)
        {
            await (, new ScheduleWorkOptions
            {
                ExistingActivityExecutionContext = activityExecutionContext,
                CompletionCallback = OnChildCompletedAsync,
                Input = 
            });
        }
        else
        {
            await (activity, new ScheduleWorkOptions
            {
                CompletionCallback = OnChildCompletedAsync,
                Input = 
            });
        }
    }

    private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
    {
        await CompleteIfNoPendingWorkAsync();
    }
}

The first thing we can see from the description parameter in the Activity feature is the sentence that describes the role of a flowchart: A flowchart is a collection of activities and connections between them.Obviously, a flowchart is a collection that stores multiple activities and connections between them. With this data, the flowchart can execute the activities in order based on the connections in the connections.

Container

Next we look further down the page and see that flowchart does not inherit directly from the base class of Activity, but from Container.
Container contains two collection properties, Activities and Variables, for storing our collection of nodes and variables, respectively.
In the Container's execution entry method, the variables are first initialized and registered.

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
    // Ensure variables have names.
    EnsureNames(Variables);

    // Register variables.
    (Variables);

    // Schedule children.
    await ScheduleChildrenAsync(context);
}

At the end a ScheduleChildrenAsync method is called. Here you can see that this method is a dummy method that can be overridden for subclasses.

protected virtual ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
{
    ScheduleChildren(context);
    return ;
}

In flowchart, the entry point for execution is exactly this rewritten ScheduleChildrenAsync method.

Flowchart Execution Logic

Back on topic, we'll move on to Flowchart's entry point, both the ScheduleChildrenAsync method.

protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
{
    var startActivity = GetStartActivity(context);

    if (startActivity == null)
    {
        // Nothing else to execute.
        await ();
        return;
    }

    // Schedule the start activity.
    await (startActivity, OnChildCompletedAsync);
}

First simply over the logic of these lines, first get StartActivity, both to get the first execution of the workflow node, if you can not get, which ends the workflow.
If it is fetched, then scheduling will be initiated and a callback function will be passed in, which is key to the workflow executing in order.

GetStartActivity

So next see how it gets the starting node.

private IActivity? GetStartActivity(ActivityExecutionContext context)
{
    // If there's a trigger that triggered this workflow, use that.
    var triggerActivityId = ;
    var triggerActivity = triggerActivityId != null ? (x =>  == triggerActivityId) : default;

    if (triggerActivity != null)
        return triggerActivity;

    // If an explicit Start activity was provided, use that.
    if (Start != null)
        return Start;

    // If there is a Start activity on the flowchart, use that.
    var startActivity = (x => x is Start);

    if (startActivity != null)
        return startActivity;

    // If there's an activity marked as "Can Start Workflow", use that.
    var canStartWorkflowActivity = (x => ());

    if (canStartWorkflowActivity != null)
        return canStartWorkflowActivity;

    // If there is a single activity that has no inbound connections, use that.
    var root = GetRootActivity();

    if (root != null)
        return root;

    // If no start activity found, return the first activity.
    return ();
}

Here from the beginning you can see that the highest priority StartActivity is not Star, but first get TriggerActivity, then what is TriggerActivity it, such as our HTTP Endpoint, Event, Cron these, when we drag to the canvas, the default will be checking the Trigger workflow option, as shown in the bottom of the figure below. As for his triggering principle of follow-up to explore in-depth, here on a little over it.

If there is no TriggerActivity, then flowchart will determine if the Start property exists, and if it exists to indicate that a Start node is explicitly specified, then this node will be used as the start node of the workflow.
If Start does not exist either, the first Start node will be looked up from all the Activities, and if it exists, it will be used as the workflow start node.
If there is no Start node in the Activities, then determine if there is a node with the Start Of Workflow option checked, and if so, get the first checked Activity as the start node.

If there are no more eligible nodes, it will try to get the root node.

 private IActivity? GetRootActivity()
 {
     // Get the first activity that has no inbound connections.
     var query =
         from activity in Activities
         let inboundConnections = (x =>  == activity)
         where !inboundConnections
         select activity;

     var rootActivity = ();
     return rootActivity;
 }

We can see from the code that the root node is the first node in the Connections connectivity relationship.
If there is no connection between a bunch of nodes, then the first of all the activities will be taken as the entry point.

As you can see, the logic for getting our StartActivity is still pretty tight.

Well, after getting the StartActivity, the next step is to actually initiate the scheduling, by stuffing our StartActivity into the scheduling queue, which will then automatically execute the node. The logic of this execution will be analyzed later in the article. The key to this method is the Callback method. Both OnChildCompletedAsync.

Since the logic of OnChildCompletedAsync is more complex, we'll put it in the next article to continue the explanation.