Location>code7788 >text

ArgoWorkflow Tutorial (V) ---- Workflow's multiple trigger modes: manual, timed tasks and event triggering

Popularity:702 ℃/2024-09-25 09:11:42

In the previous article, we analyzed archive in argo-workflow, including pipeline GC, pipeline archive, log archive and other functions. This article mainly analyzes several triggering methods in Workflow, including manual triggering, timed triggering, Event triggering and so on.

1. General

The pipeline of Argo Workflows is triggered in several ways:

  • Manual Trigger: manually submitting a Workflow triggers a build, so the pipeline we create is, in theory, a WorkflowTemplate object.
  • Timed Trigger:CronWorkflowCronWorkflow creates workflows at regular intervals, similar to job and cronjob in k8s, to achieve timed triggering.
  • Event triggering: e.g. by git commit, with the help of theargo-events This function can be realized.

2. Timed triggers

CronWorkflow Essentially a Workflow + Cron Spec.

The design references the CronJob in k8s.

Demo

A simple CronWorkflow looks like this:

apiVersion: /v1alpha1
kind: CronWorkflow
metadata:
  name: test-cron-wf
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 90"]

Apply it, and you can see that the Workflow you created is named$cronWorkflowName-xxx

[root@lixd-argo workdir]# k get cwf
NAME           AGE
test-cron-wf   116s
[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE   MESSAGE
test-cron-wf-1711852560   Running   47s

Since the runtime task in template issleep 90s Therefore, the entire task must take more than 60s, and with the concurrencyPolicy set to Replace, the second Workflow is created after 60s, and the first one is stopped.

[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE    MESSAGE
test-cron-wf-1711852560   Failed    103s   Stopped with strategy 'Terminate'
test-cron-wf-1711852620   Running   43s

Specific parameters

The specific parameters supported are listed below:

type CronWorkflowSpec struct {
	// WorkflowSpec is the spec of the workflow to be run
	WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
	// Schedule is a schedule to run the Workflow in Cron format
	Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
	// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
	ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
	// Suspend is a flag that will stop new CronWorkflows from running if set to true
	Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
	// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
	// original scheduled time if it is missed.
	StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
	// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
	SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
	// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
	FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
	// Timezone is the timezone against which the cron schedule will be calculated, . "Asia/Tokyo". Default is machine's local time.
	Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
	// WorkflowMetadata contains some metadata of the workflow to be run
	WorkflowMetadata * `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}

The content can be divided into 3 parts:

  • WorkflowSpec: This is the Workflow Spec, exactly the same.
  • Cron Spec: added some Cron related fields.
  • WorkflowMetadata: some metadata, subsequent Workflows created by this CronWorkflow will carry the metadata specified here.

There's not much difference between WorkflowSpec and WorkflowMetadata, so I won't go into that, but I'll analyze a few fields related to Cron Spec:

  • schedule: cron expression.* * * * * Created every minute
  • concurrencyPolicy: concurrency mode, support Allow, Forbid, Replace
    • Allow: Allow multiple Workflows to run at the same time.
    • Forbid: disable concurrency, no new ones will be created when there is a Workflow running
    • Replace: means create a new workflow to replace the old one, it will not run multiple workflows at the same time.
  • startingDeadlineSeconds: the maximum time between the creation of the Workflow and the startup of the first Pod, after which it will be marked as failed.
  • suspend: flag whether to stop CronWorkflow, can be set to true if the timed task does not need to be executed.
  • timezone: time zone, defaults to the local time on the machine.

Most of the fields are consistent with K8s CronJob

apiVersion: /v1alpha1
kind: CronWorkflow
metadata:
  name: my-cron
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 10"]
  workflowMetadata:
    labels:
      from: cron

Added metadata, test it out

[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
  labels:
    from: cron

As you can see, the created Workflow does carry the label specified in CronWorkflow.

3. Event

argo provides an Event API:/api/v1/events/{namespace}/{discriminator}, the API can accept any json data.

Workflows can be created through the event API, similar to a Webhook.

The specific request looks like this:

curl https://localhost:2746/api/v1/events/argo/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

Or so:

curl https://localhost:2746/api/v1/events/argo/my-discriminator \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'
  • 1) Preparing Token
  • 2) Create a WorkflowEventBinding, configure it to receive the event and the workflow information created after receiving the event.
  • 3) Send a request for testing

Token

Create RBAC related objects, role, rolebinding, sa, where role only needs to be provided with minimal privileges.

Created directly in the default namespace

kubectl apply -f - <<EOF
apiVersion: ./v1
kind: Role
metadata:
  name: test
rules:
  - apiGroups:
      - 
    resources:
      - workfloweventbindings
    verbs:
      - list
  - apiGroups:
      - 
    resources:
      - workflowtemplates
    verbs:
      - get
  - apiGroups:
      - 
    resources:
      - workflows
    verbs:
      - create
EOF

serviceaccount and rolebinding

kubectl create sa test

kubectl create rolebinding test --role=test --serviceaccount=default:test

Then create a Secret

kubectl apply -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: -account-token
  annotations:
    /: test
type: /service-account-token
EOF

Finally, you can query the Secret to resolve the Token

ARGO_TOKEN="Bearer $(kubectl get secret -account-token -o=jsonpath='{.}' | base64 --decode)"

echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...

Test, if it works

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.}')

curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"

WorkflowEventBinding

To receive an Event, you can create a WorkflowEventBinding object as follows:

apiVersion: /v1alpha1
kind: WorkflowEventBinding
metadata:
  name: event-consumer
spec:
  event:
    # metadata header name must be lowercase to match in selector
    selector:  != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
  submit:
    workflowTemplateRef:
      name: my-wf-tmple
    arguments:
      parameters:
      - name: message
        valueFrom:
          event: 

Specifies how the Binding should match the received Event, such as the condition here:

  • 1) There is a message parameter in the payload, which is not null.
  • 2) header contains x-argo-e2e with the value true
    • Note: the header is converted to lowercase for all matches here.
  • 3) And finally, the discriminator is named my-discriminator.

If it matches, the workflow will be created with the content specified below submit:

  • 1) Use my-wf-tmple as the workflowTemplate to create the workflow.
  • 2) Use as a parameter

The created workflow is defined by my-wf-tmple, so create this Template first.

apiVersion: /v1alpha1
kind: WorkflowTemplate
metadata:
  name: my-wf-tmple
spec:
  templates:
    - name: main
      inputs:
        parameters:
          - name: message
            value: "{{}}"
      container:
        image: docker/whalesay:latest
        command: [cowsay]
        args: ["{{}}"]
  entrypoint: main

Finally, we can send the API to trigger an event to create the workflow.

curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
    -H "Authorization: $ARGO_TOKEN" \
    -H "X-Argo-E2E: true" \
    -d '{"message": "hello events"}'

Test it:

{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
>     -H "Authorization: $ARGO_TOKEN" \
>     -H "X-Argo-E2E: true" \
>     -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME                STATUS    AGE   MESSAGE
my-wf-tmple-ea81n   Running   5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
    parameters:
    - name: message
      value: hello events

As you can see, the Workflow has been created, and the parameters are the hello events we gave when we sent the request.

expansion

By default, argo-server can handle 64 events at the same time, any more and it will return 503, you can adjust it by the following parameters:

  • 1) --event-operation-queue-size: increase the queue size to receive more events
  • 2) --event-worker-count: Increase the number of workers to improve processing speed.

4. Webhook

The previous section on Events mentioned that you can create a workflow that triggers an event by sending an HTTP request, but it requires the client to provide an AuthToken.

The problem is that for some clients that can't specify a token, such as Github, Gitlab, and other Git repositories, you can configure a Webhook to be called when you receive a commit to trigger the pipeline.

At this point, these requests are definitely sent without a token, so additional configuration is needed to authenticate and ensure that argo only handles Webhook requests from Github, Gitlab, and so on.

  • 1) Create RBAC related objects, role, rolebinding, sa Prepare token
  • 2) Configure Webhook-clients to tell argo what type of Webhook to come over and use that secret as a token

The first step of the Token and Event chapters are the same, so I won't go into the details, but mainly the second step.

webhook-clients config

In the previous step, after creating the RBAC object and preparing the secret, the client usually parses the Token in the secret and sends the request with the Token, like this:

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.}')
ARGO_TOKEN="Bearer $(kubectl get secret -account-token -o=jsonpath='{.}' | base64 --decode)"

curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

However, there is no way for a Webhook client to specify a token in this way, so it is necessary to specify the token via theargo-workflows-webhook-clients configuration to tell argo which Webhook uses which token from which Secret.

Create a file namedargo-workflows-webhook-clients Secret, which reads something like this:

kind: Secret
apiVersion: v1
metadata:
  name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
  # /bitbucket-cloud/docs/manage-webhooks/
  : |
    type: bitbucket
    secret: "my-uuid"
  # /bitbucketserver/
  bitbucketserver: |
    type: bitbucketserver
    secret: "shh!"
  # /webhooks/securing/
  : |
    type: github
    secret: "shh!"
  # /ee/user/project/integrations/
  : |
    type: gitlab
    secret: "shh!"
  • where Key must be the name of the Serviceaccount in the current Namespace.
  • Value consists of type and secret.
    • type: Webhook source, e.g. github, gitlab
    • secret: a string, not a k8s secret, usually configured when adding a Webhook to the corresponding platform.

Taking a Github specific, the secret configuration is as follows:

When you add a Webhook, you can fill in a Secret configuration, which is actually a string of encrypted characters, so you can fill in whatever you want.

This way, Github sends a Webhook request with this secret information, and Argo receives it based on theargo-workflows-webhook-clients The request is compared against the type=github secret field configured in the Secret of the request and is processed if it matches, otherwise the request is ignored.

If there is a match, the Token is parsed from the corresponding Serviceaccount as Authorization information.

source code analysis

Webhook this piece, the official documentation is not very detailed, a pass, so I looked up the source code.

This piece of logic takes the form of an Interceptor, which is passed through for all Event APIs.Used to add Authorization information to requests that do not carry Authorization.

// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client ) func(w , r *, next ) {
	return func(w , r *, next ) {
		err := addWebhookAuthorization(r, client)
		if err != nil {
			(err).Error("Failed to process webhook request")
			(403)
			// hide the message from the user, because it could help them attack us
			_, _ = ([]byte(`{"message": "failed to process webhook request"}`))
		} else {
			(w, r)
		}
	}
}

Call addWebhookAuthorization to try to add authentication information.

func addWebhookAuthorization(r *, kube ) error {
	// try and exit quickly before we do anything API calls
	if  != "POST" || len(["Authorization"]) > 0 || !(, pathPrefix) {
		return nil
	}
	parts := ((, pathPrefix), "/", 2)
	if len(parts) != 2 {
		return nil
	}
	namespace := parts[0]
	secretsInterface := kube.CoreV1().Secrets(namespace)
	ctx := ()

	webhookClients, err := (ctx, "argo-workflows-webhook-clients", {})
	if err != nil {
		return ("failed to get webhook clients: %w", err)
	}
	// we need to read the request body to check the signature, but we still need it for the GRPC request,
	// so read it all now, and then reinstate when we are done
	buf, _ := ()
	defer func() {  = ((buf)) }()
	serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
	for serviceAccountName, data := range  {
		 = ((buf))
		client := &webhookClient{}
		err := (data, client)
		if err != nil {
			return ("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
		}
		({"serviceAccountName": serviceAccountName, "webhookType": }).Debug("Attempting to match webhook request")
		ok := webhookParsers[](, r)
		if ok {
			("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
			serviceAccount, err := (ctx, serviceAccountName, {})
			if err != nil {
				return ("failed to get service account \"%s\": %w", serviceAccountName, err)
			}
			tokenSecret, err := (ctx, (serviceAccount), {})
			if err != nil {
				return ("failed to get token secret \"%s\": %w", tokenSecret, err)
			}
			["Authorization"] = []string{"Bearer " + string(["token"])}
			return nil
		}
	}
	return nil
}

The specific process is as follows:

  • First of all, it is determined that only POST method and Authorization is empty will be added automatically.
  • Then it queries the Secret named argo-workflows-webhook-clients from the Namespace specified in the API.
  • Finally, it is a loop to compare whether the type and secret in the secret can match with the current request, if they do, then the key corresponding to the data is used to query the token by the serviceaccount name, and then the token is parsed and used for Authorization.

The third step uses the key directly as the serviceaccount, which is why configuring theargo-workflows-webhook-clientsYou need to use the serviceaccount name as the key.


[ArgoWorkflow Series]Continuously updated, search the public number [Explore Cloud Native]Subscribe to read more articles.


5. Summary

This article analyzes several ways to trigger Workflow in Argo.

  • 1)Manual trigger: manually create Workflow objects to trigger the pipeline to run.
  • (2) Timed Trigger: Use CronWorkflow to automatically create a workflow based on a Cron expression.
  • 3)Event:Use the event api provided by argo-server with WorkflowEventBinding to create Workflow.
  • 4) Webhook: This method is actually an extension of the Event method, which requires Token authentication, while the Webhook method requires Token authentication via theargo-workflows-webhook-clients Configure the Secret used by different sources of Webhooks for authentication, so that you can configure the Event API as a Webhook endpoint to Github, Gitlab, and other environments.