Skip to content

Commit

Permalink
feat(cron): Added timezone support to cron workflows. Closes argoproj…
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Jan 16, 2020
1 parent 48b85e5 commit 1a777cc
Show file tree
Hide file tree
Showing 15 changed files with 458 additions and 330 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ ENTRYPOINT [ "argoexec" ]
# workflow-controller
####################################################################################################
FROM scratch as workflow-controller
# Add timezone data
COPY --from=argo-build /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=argo-build /go/src/github.com/argoproj/argo/dist/workflow-controller-linux-amd64 /bin/workflow-controller
ENTRYPOINT [ "workflow-controller" ]

Expand Down
2 changes: 2 additions & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ ENTRYPOINT [ "argoexec" ]
# workflow-controller
####################################################################################################
FROM scratch as workflow-controller
# Add timezone data
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
COPY workflow-controller /bin/
ENTRYPOINT [ "workflow-controller" ]

Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@
"description": "Suspend is a flag that will stop new CronWorkflows from running if set to true",
"type": "boolean"
},
"timezone": {
"description": "Timezone is the timezone against which the cron schedule will be calculated, e.g. \"Asia/Tokyo\". Default is machine's local time.",
"type": "string"
},
"workflowSpec": {
"description": "WorkflowSpec is the spec of the workflow to be run",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.WorkflowSpec"
Expand Down
3 changes: 3 additions & 0 deletions cmd/argo/commands/cron/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func printCronWorkflowTemplate(wf *wfv1.CronWorkflow, outFmt string) {
fmt.Printf(fmtStr, "Created:", humanize.Timestamp(wf.ObjectMeta.CreationTimestamp.Time))
fmt.Printf(fmtStr, "Schedule:", wf.Spec.Schedule)
fmt.Printf(fmtStr, "Suspended:", wf.Spec.Suspend)
if wf.Spec.Timezone != "" {
fmt.Printf(fmtStr, "Timezone:", wf.Spec.Timezone)
}
if wf.Spec.StartingDeadlineSeconds != nil {
fmt.Printf(fmtStr, "StartingDeadlineSeconds:", *wf.Spec.StartingDeadlineSeconds)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/server/cronworkflow/cron-workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3032,6 +3032,10 @@
"type": "integer",
"format": "int32",
"title": "FailedJobsHistoryLimit is the number of successful jobs to be kept at a time"
},
"timezone": {
"type": "string",
"description": "Timezone is the timezone against which the cron schedule will be calculated, e.g. \"Asia/Tokyo\". Default is machine's local time."
}
}
},
Expand Down
6 changes: 6 additions & 0 deletions examples/cron-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ metadata:
name: hello-world
spec:
schedule: "* * * * *"
timezone: "America/Los_Angeles" # Default to local machine timezone
startingDeadlineSeconds: 0
concurrencyPolicy: "Replace" # Default to "Allow"
successfulJobsHistoryLimit: 4 # Default 3
failedJobsHistoryLimit: 4 # Default 1
suspend: false # Set to "true" to suspend scheduling
workflowSpec:
entrypoint: whalesay
templates:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type CronWorkflowSpec struct {
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// FailedJobsHistoryLimit is the number of successful jobs to be kept at a time
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
// Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.
Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
}

type CronWorkflowStatus struct {
Expand Down
691 changes: 366 additions & 325 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions test/e2e/cron_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"fmt"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/workflow/common"
Expand All @@ -12,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"strconv"
"testing"
"time"
)
Expand All @@ -32,6 +34,52 @@ func (s *CronSuite) TestBasic() {
})
}

func (s *CronSuite) TestBasicTimezone() {
// This test works by scheduling a CronWorkflow for the next minute, but using the local time of another timezone
// then seeing if the Workflow was ran within the next minute. Since this test would be trivial if the selected
// timezone was the same as the local timezone, a little-used timezone is used.
testTimezone := "Pacific/Niue"
testLocation, err := time.LoadLocation(testTimezone)
if err != nil {
s.T().Fatal(err)
}
hour, min, _ := time.Now().In(testLocation).Clock()
min++
if min == 60 {
min = 0
hour = (hour + 1) % 24
}
scheduleInTestTimezone := strconv.Itoa(min) + " " + strconv.Itoa(hour) + " * * *"
s.Given().
CronWorkflow(fmt.Sprintf(`
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf-basic
labels:
argo-e2e: true
spec:
schedule: "%s"
timezone: "%s"
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: python:alpine3.6
imagePullPolicy: IfNotPresent
command: ["sh", -c]
args: ["echo hello"]
`, scheduleInTestTimezone, testTimezone)).
When().
CreateCronWorkflow().
Wait(1 * time.Minute).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
})
}

func (s *CronSuite) TestSuspend() {
s.Given().
CronWorkflow("@testdata/basic.yaml").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const CronWorkflowSummaryPanel = (props: Props) => {
{title: 'Name', value: props.cronWorkflow.metadata.name},
{title: 'Namespace', value: props.cronWorkflow.metadata.namespace},
{title: 'Schedule', value: props.cronWorkflow.spec.schedule},
{title: 'Timezone', value: props.cronWorkflow.spec.timezone},
{
title: 'Concurrency Policy',
value: props.cronWorkflow.spec.concurrencyPolicy ? props.cronWorkflow.spec.concurrencyPolicy : 'Allow'
Expand Down
1 change: 1 addition & 0 deletions ui/src/models/cron-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export interface CronWorkflowSpec {
startingDeadlineSeconds?: number;
successfulJobsHistoryLimit?: number;
failedJobsHistoryLimit?: number;
timezone?: string;
}

export interface CronWorkflowStatus {
Expand Down
10 changes: 7 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ func (cc *Controller) processNextCronItem() bool {
return true
}

cronWfIf := cc.wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWf.Namespace)
cronWorkflowOperationCtx, err := newCronWfOperationCtx(cronWf, cc.wfClientset, cronWfIf)
cronWorkflowOperationCtx, err := newCronWfOperationCtx(cronWf, cc.wfClientset)
if err != nil {
log.Error(err)
return true
Expand All @@ -142,7 +141,12 @@ func (cc *Controller) processNextCronItem() bool {
delete(cc.nameEntryIDMap, key.(string))
}

entryId, err := cc.cron.AddJob(cronWf.Spec.Schedule, cronWorkflowOperationCtx)
cronSchedule := cronWf.Spec.Schedule
if cronWf.Spec.Timezone != "" {
cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule
}

entryId, err := cc.cron.AddJob(cronSchedule, cronWorkflowOperationCtx)
if err != nil {
log.Errorf("could not schedule CronWorkflow: %s", err)
return true
Expand Down
4 changes: 2 additions & 2 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ type cronWfOperationCtx struct {
cronWfIf typed.CronWorkflowInterface
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, cronWfIf typed.CronWorkflowInterface) (*cronWfOperationCtx, error) {
func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface) (*cronWfOperationCtx, error) {
return &cronWfOperationCtx{
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: cronWfIf,
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
}, nil
}

Expand Down

0 comments on commit 1a777cc

Please sign in to comment.