In the previous article, we analyzed archive in argo-workflow, including pipeline GC, pipeline archive, log archive and other functions. This article mainly analyzes several triggering methods in Workflow, including manual triggering, timed triggering, Event triggering and so on.
1. General
The pipeline of Argo Workflows is triggered in several ways:
- Manual Trigger: manually submitting a Workflow triggers a build, so the pipeline we create is, in theory, a WorkflowTemplate object.
- Timed Trigger:CronWorkflowCronWorkflow creates workflows at regular intervals, similar to job and cronjob in k8s, to achieve timed triggering.
- Event triggering: e.g. by git commit, with the help of theargo-events This function can be realized.
2. Timed triggers
CronWorkflow
Essentially a Workflow + Cron Spec.
The design references the CronJob in k8s.
Demo
A simple CronWorkflow looks like this:
apiVersion: /v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 90"]
Apply it, and you can see that the Workflow you created is named$cronWorkflowName-xxx
[root@lixd-argo workdir]# k get cwf
NAME AGE
test-cron-wf 116s
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Running 47s
Since the runtime task in template issleep 90s
Therefore, the entire task must take more than 60s, and with the concurrencyPolicy set to Replace, the second Workflow is created after 60s, and the first one is stopped.
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Failed 103s Stopped with strategy 'Terminate'
test-cron-wf-1711852620 Running 43s
Specific parameters
The specific parameters supported are listed below:
type CronWorkflowSpec struct {
// WorkflowSpec is the spec of the workflow to be run
WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
// Schedule is a schedule to run the Workflow in Cron format
Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
// Suspend is a flag that will stop new CronWorkflows from running if set to true
Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
// original scheduled time if it is missed.
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
// Timezone is the timezone against which the cron schedule will be calculated, . "Asia/Tokyo". Default is machine's local time.
Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
// WorkflowMetadata contains some metadata of the workflow to be run
WorkflowMetadata * `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}
The content can be divided into 3 parts:
- WorkflowSpec: This is the Workflow Spec, exactly the same.
- Cron Spec: added some Cron related fields.
- WorkflowMetadata: some metadata, subsequent Workflows created by this CronWorkflow will carry the metadata specified here.
There's not much difference between WorkflowSpec and WorkflowMetadata, so I won't go into that, but I'll analyze a few fields related to Cron Spec:
- schedule: cron expression.
* * * * *
Created every minute - concurrencyPolicy: concurrency mode, support Allow, Forbid, Replace
- Allow: Allow multiple Workflows to run at the same time.
- Forbid: disable concurrency, no new ones will be created when there is a Workflow running
- Replace: means create a new workflow to replace the old one, it will not run multiple workflows at the same time.
- startingDeadlineSeconds: the maximum time between the creation of the Workflow and the startup of the first Pod, after which it will be marked as failed.
- suspend: flag whether to stop CronWorkflow, can be set to true if the timed task does not need to be executed.
- timezone: time zone, defaults to the local time on the machine.
Most of the fields are consistent with K8s CronJob
apiVersion: /v1alpha1
kind: CronWorkflow
metadata:
name: my-cron
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 10"]
workflowMetadata:
labels:
from: cron
Added metadata, test it out
[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
labels:
from: cron
As you can see, the created Workflow does carry the label specified in CronWorkflow.
3. Event
argo provides an Event API:/api/v1/events/{namespace}/{discriminator}
, the API can accept any json data.
Workflows can be created through the event API, similar to a Webhook.
The specific request looks like this:
curl https://localhost:2746/api/v1/events/argo/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
Or so:
curl https://localhost:2746/api/v1/events/argo/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
- 1) Preparing Token
- 2) Create a WorkflowEventBinding, configure it to receive the event and the workflow information created after receiving the event.
- 3) Send a request for testing
Token
Create RBAC related objects, role, rolebinding, sa, where role only needs to be provided with minimal privileges.
Created directly in the default namespace
kubectl apply -f - <<EOF
apiVersion: ./v1
kind: Role
metadata:
name: test
rules:
- apiGroups:
-
resources:
- workfloweventbindings
verbs:
- list
- apiGroups:
-
resources:
- workflowtemplates
verbs:
- get
- apiGroups:
-
resources:
- workflows
verbs:
- create
EOF
serviceaccount and rolebinding
kubectl create sa test
kubectl create rolebinding test --role=test --serviceaccount=default:test
Then create a Secret
kubectl apply -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
name: -account-token
annotations:
/: test
type: /service-account-token
EOF
Finally, you can query the Secret to resolve the Token
ARGO_TOKEN="Bearer $(kubectl get secret -account-token -o=jsonpath='{.}' | base64 --decode)"
echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...
Test, if it works
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.}')
curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"
WorkflowEventBinding
To receive an Event, you can create a WorkflowEventBinding object as follows:
apiVersion: /v1alpha1
kind: WorkflowEventBinding
metadata:
name: event-consumer
spec:
event:
# metadata header name must be lowercase to match in selector
selector: != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
submit:
workflowTemplateRef:
name: my-wf-tmple
arguments:
parameters:
- name: message
valueFrom:
event:
Specifies how the Binding should match the received Event, such as the condition here:
- 1) There is a message parameter in the payload, which is not null.
- 2) header contains x-argo-e2e with the value true
- Note: the header is converted to lowercase for all matches here.
- 3) And finally, the discriminator is named my-discriminator.
If it matches, the workflow will be created with the content specified below submit:
- 1) Use my-wf-tmple as the workflowTemplate to create the workflow.
- 2) Use as a parameter
The created workflow is defined by my-wf-tmple, so create this Template first.
apiVersion: /v1alpha1
kind: WorkflowTemplate
metadata:
name: my-wf-tmple
spec:
templates:
- name: main
inputs:
parameters:
- name: message
value: "{{}}"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{}}"]
entrypoint: main
Finally, we can send the API to trigger an event to create the workflow.
curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-H "X-Argo-E2E: true" \
-d '{"message": "hello events"}'
Test it:
{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
> -H "Authorization: $ARGO_TOKEN" \
> -H "X-Argo-E2E: true" \
> -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
my-wf-tmple-ea81n Running 5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
parameters:
- name: message
value: hello events
As you can see, the Workflow has been created, and the parameters are the hello events we gave when we sent the request.
expansion
By default, argo-server can handle 64 events at the same time, any more and it will return 503, you can adjust it by the following parameters:
- 1) --event-operation-queue-size: increase the queue size to receive more events
- 2) --event-worker-count: Increase the number of workers to improve processing speed.
4. Webhook
The previous section on Events mentioned that you can create a workflow that triggers an event by sending an HTTP request, but it requires the client to provide an AuthToken.
The problem is that for some clients that can't specify a token, such as Github, Gitlab, and other Git repositories, you can configure a Webhook to be called when you receive a commit to trigger the pipeline.
At this point, these requests are definitely sent without a token, so additional configuration is needed to authenticate and ensure that argo only handles Webhook requests from Github, Gitlab, and so on.
- 1) Create RBAC related objects, role, rolebinding, sa Prepare token
- 2) Configure Webhook-clients to tell argo what type of Webhook to come over and use that secret as a token
The first step of the Token and Event chapters are the same, so I won't go into the details, but mainly the second step.
webhook-clients config
In the previous step, after creating the RBAC object and preparing the secret, the client usually parses the Token in the secret and sends the request with the Token, like this:
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.}')
ARGO_TOKEN="Bearer $(kubectl get secret -account-token -o=jsonpath='{.}' | base64 --decode)"
curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
However, there is no way for a Webhook client to specify a token in this way, so it is necessary to specify the token via theargo-workflows-webhook-clients
configuration to tell argo which Webhook uses which token from which Secret.
Create a file namedargo-workflows-webhook-clients
Secret, which reads something like this:
kind: Secret
apiVersion: v1
metadata:
name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
# /bitbucket-cloud/docs/manage-webhooks/
: |
type: bitbucket
secret: "my-uuid"
# /bitbucketserver/
bitbucketserver: |
type: bitbucketserver
secret: "shh!"
# /webhooks/securing/
: |
type: github
secret: "shh!"
# /ee/user/project/integrations/
: |
type: gitlab
secret: "shh!"
- where Key must be the name of the Serviceaccount in the current Namespace.
- Value consists of type and secret.
- type: Webhook source, e.g. github, gitlab
- secret: a string, not a k8s secret, usually configured when adding a Webhook to the corresponding platform.
Taking a Github specific, the secret configuration is as follows:
When you add a Webhook, you can fill in a Secret configuration, which is actually a string of encrypted characters, so you can fill in whatever you want.
This way, Github sends a Webhook request with this secret information, and Argo receives it based on theargo-workflows-webhook-clients
The request is compared against the type=github secret field configured in the Secret of the request and is processed if it matches, otherwise the request is ignored.
If there is a match, the Token is parsed from the corresponding Serviceaccount as Authorization information.
source code analysis
Webhook this piece, the official documentation is not very detailed, a pass, so I looked up the source code.
This piece of logic takes the form of an Interceptor, which is passed through for all Event APIs.Used to add Authorization information to requests that do not carry Authorization.。
// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client ) func(w , r *, next ) {
return func(w , r *, next ) {
err := addWebhookAuthorization(r, client)
if err != nil {
(err).Error("Failed to process webhook request")
(403)
// hide the message from the user, because it could help them attack us
_, _ = ([]byte(`{"message": "failed to process webhook request"}`))
} else {
(w, r)
}
}
}
Call addWebhookAuthorization to try to add authentication information.
func addWebhookAuthorization(r *, kube ) error {
// try and exit quickly before we do anything API calls
if != "POST" || len(["Authorization"]) > 0 || !(, pathPrefix) {
return nil
}
parts := ((, pathPrefix), "/", 2)
if len(parts) != 2 {
return nil
}
namespace := parts[0]
secretsInterface := kube.CoreV1().Secrets(namespace)
ctx := ()
webhookClients, err := (ctx, "argo-workflows-webhook-clients", {})
if err != nil {
return ("failed to get webhook clients: %w", err)
}
// we need to read the request body to check the signature, but we still need it for the GRPC request,
// so read it all now, and then reinstate when we are done
buf, _ := ()
defer func() { = ((buf)) }()
serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
for serviceAccountName, data := range {
= ((buf))
client := &webhookClient{}
err := (data, client)
if err != nil {
return ("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
}
({"serviceAccountName": serviceAccountName, "webhookType": }).Debug("Attempting to match webhook request")
ok := webhookParsers[](, r)
if ok {
("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
serviceAccount, err := (ctx, serviceAccountName, {})
if err != nil {
return ("failed to get service account \"%s\": %w", serviceAccountName, err)
}
tokenSecret, err := (ctx, (serviceAccount), {})
if err != nil {
return ("failed to get token secret \"%s\": %w", tokenSecret, err)
}
["Authorization"] = []string{"Bearer " + string(["token"])}
return nil
}
}
return nil
}
The specific process is as follows:
- First of all, it is determined that only POST method and Authorization is empty will be added automatically.
- Then it queries the Secret named argo-workflows-webhook-clients from the Namespace specified in the API.
- Finally, it is a loop to compare whether the type and secret in the secret can match with the current request, if they do, then the key corresponding to the data is used to query the token by the serviceaccount name, and then the token is parsed and used for Authorization.
The third step uses the key directly as the serviceaccount, which is why configuring theargo-workflows-webhook-clients
You need to use the serviceaccount name as the key.
[ArgoWorkflow Series]Continuously updated, search the public number [Explore Cloud Native]Subscribe to read more articles.
5. Summary
This article analyzes several ways to trigger Workflow in Argo.
- 1)Manual trigger: manually create Workflow objects to trigger the pipeline to run.
- (2) Timed Trigger: Use CronWorkflow to automatically create a workflow based on a Cron expression.
- 3)Event:Use the event api provided by argo-server with WorkflowEventBinding to create Workflow.
- 4) Webhook: This method is actually an extension of the Event method, which requires Token authentication, while the Webhook method requires Token authentication via the
argo-workflows-webhook-clients
Configure the Secret used by different sources of Webhooks for authentication, so that you can configure the Event API as a Webhook endpoint to Github, Gitlab, and other environments.