Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows horizontal scaling #134

Closed
valdestron opened this issue Sep 14, 2021 · 4 comments
Closed

Workflows horizontal scaling #134

valdestron opened this issue Sep 14, 2021 · 4 comments

Comments

@valdestron
Copy link

valdestron commented Sep 14, 2021

Lets say I have a workflows service.

In there i have 1 workflow with some handlers.

I deployed this service in k8s, scaled up to 2 pods, rabbitmq as a message broker, postgres as db.

Start 2 or more workflows at the time, pods will try to upsert/retrieve workflowdata.

You will see the following errors:

  • retrieve errors like "Could not find workflow data" as one pod is trying to retrieve data that is still handled by other pod
  • upsert data errors like "Error persisting workflow data" because one pod will try to update a row with older version

Basically race condition

Although seems with all the errors and retries service will finish the job of the workflow, it takes lots of time to complete with all the retries, also very confusing and does not look like scalable.

Any ideas how to scale horizontally ?

@adenhertog
Copy link
Contributor

It does seem something's not quite right. Could you provide a copy of what your workflow looks like? I'm curious to know how you're seeing stale update errors as there's only a couple of ways this might occur.

Also is your postgres persistence shared amongst all pods?

@valdestron
Copy link
Author

Yes both pods are identical they connect to the same postgress conneciton pool.

import { Workflow, StartedBy, Handles } from '@node-ts/bus-workflow'
import { injectable, inject } from 'inversify'
import { GitopsInstallWorkflowData } from './Data'
import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core'
import { BUS_SYMBOLS, Bus } from '@node-ts/bus-core'
import {
  GitopsInstallWorkflowStart,
  GitopsInstallWorkflowHandleValidation,
  GitopsInstallWorkflowValidationHandled,
  GitopsInstallWorkflowHandlePreparation,
  GitopsInstallWorkflowPreparationHandled,
  GitopsInstallWorkflowMCLDUpdateHandled,
  GitopsInstallWorkflowHandleMCLDUpdate,
  GitopsInstallWorkflowHandleMCLDUpdateWatch,
  GitopsInstallWorkflowMCLDUpdateWatchHandled,
  GitopsInstallWorkflowStatus,
  GitopsWorkflowStages,
} from '@commons/events-commands'

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

@injectable()
export class GitopsWorkflow extends Workflow<GitopsInstallWorkflowData> {
  stagesCount: number = Object.keys(GitopsWorkflowStages).length / 2
  workflowName = 'Atlantis Gitops Workflow'

  constructor(
    @inject(BUS_SYMBOLS.Bus) private readonly bus: Bus,
    @inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger,
  ) {
    super()
  }

  /**
   * Starts new Gitops Installation to AWS Account workflow
   */
  @StartedBy<GitopsInstallWorkflowStart, GitopsInstallWorkflowData, 'startGitopsWorkflow'>(GitopsInstallWorkflowStart)
  async startGitopsWorkflow(event: GitopsInstallWorkflowStart): Promise<Partial<GitopsInstallWorkflowData>> {
    const { username, awsAccount, awsAccountEmail } = event
    const description = `Atlantis Gitops Workflow started by ${username} and will be isntalled to AWS account ${awsAccount}`
    this.logger.info(description)

    await this.bus.send(new GitopsInstallWorkflowHandleValidation(awsAccount))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.started,
        this.stagesCount - 1,
        'Initial Start',
        description,
      ),
    )

    return {
      username,
      awsAccount,
      awsAccountEmail,
      workflowValid: false,
      accountVariables: {},
      mcldGitopsPRUpdateUrl: '',
      mcldGitopsPRMerged: false,
    }
  }

  @Handles<
    GitopsInstallWorkflowValidationHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowValidationResponse'
  >(GitopsInstallWorkflowValidationHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowValidationResponse(
    eventData: GitopsInstallWorkflowValidationHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received validation response: ${awsAccount} validity is: ${eventData.workflowValid}`
    this.logger.info(description)
    if (!eventData.workflowValid) {
      // TODO: send command to complete workflow if eventData.workflowValid = false
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If validation passed, send message to preparation handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandlePreparation(awsAccount))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.validated,
        this.stagesCount - 2,
        'Account Validation',
        description,
      ),
    )
    return { workflowValid: eventData.workflowValid }
  }

  @Handles<
    GitopsInstallWorkflowPreparationHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowPreparationResponse'
  >(GitopsInstallWorkflowPreparationHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowPreparationResponse(
    eventData: GitopsInstallWorkflowPreparationHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received preparation response for ${awsAccount}`
    this.logger.info(description)
    if (!eventData.accountVariables) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If preparation passed, send message to update MCLD handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandleMCLDUpdate(awsAccount, eventData.accountVariables))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.prepared,
        this.stagesCount - 3,
        'Account Preconfiguration',
        description,
      ),
    )
    return {
      accountVariables: eventData.accountVariables,
    }
  }

  @Handles<
    GitopsInstallWorkflowMCLDUpdateHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowMCLDUpdateResponse'
  >(GitopsInstallWorkflowMCLDUpdateHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowMCLDUpdateResponse(
    eventData: GitopsInstallWorkflowMCLDUpdateHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received update MCLD response for ${awsAccount}`
    this.logger.info(description)

    if (!eventData.mcldGitopsPRUpdateUrl) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }
    await sleep(10000)
    // If mcld update passed, send message to update MCLD watch handler and update workflow state data with validity
    await this.bus.send(new GitopsInstallWorkflowHandleMCLDUpdateWatch(awsAccount, eventData.mcldGitopsPRUpdateUrl))
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.iacUpdated,
        this.stagesCount - 4,
        'MCLD Infrastructure As Code',
        description,
      ),
    )

    return {
      mcldGitopsPRUpdateUrl: eventData.mcldGitopsPRUpdateUrl,
    }
  }

  @Handles<
    GitopsInstallWorkflowMCLDUpdateWatchHandled,
    GitopsInstallWorkflowData,
    'handleGitopsInstallWorkflowMCLDWatchResponse'
  >(GitopsInstallWorkflowMCLDUpdateWatchHandled, (event) => event.awsAccount, 'awsAccount')
  async handleGitopsInstallWorkflowMCLDWatchResponse(
    eventData: GitopsInstallWorkflowMCLDUpdateWatchHandled,
    { awsAccount, username }: GitopsInstallWorkflowData,
  ): Promise<Partial<GitopsInstallWorkflowData>> {
    const description = `Atlantis Gitops Workflow received MCLD watch update response for ${eventData.awsAccount}`
    this.logger.info(description)

    if (!eventData.mcldGitopsPRMerged) {
      // TODO: send command to complete workflow if eventData.accountVariables = null
      this.logger.info(`Atlantis Gitops Workflow handling false start`)
      // this.bus.send(new SomeEventToStopWorkflow())
      return this.discard()
    }

    this.logger.info(`Atlantis Gitops Workflow was completed. Initiated by: ${username}, Aws Account: ${awsAccount} `)
    await sleep(10000)
    await this.bus.publish(
      new GitopsInstallWorkflowStatus(
        this.workflowName,
        awsAccount,
        username,
        GitopsWorkflowStages.completed,
        this.stagesCount - 5,
        'Infrastructure Changes landed',
        description,
      ),
    )

    return this.complete({ mcldGitopsPRMerged: eventData.mcldGitopsPRMerged })
  }
}

@adenhertog
Copy link
Contributor

Cool, I had a look through it and have a couple of thoughts:

Would the sleep() calls inside the handlers be the cause of any of the stale data errors? Whilst a handler is sleeping, any other message that is handled by the workflow in the meantime could cause the underlying data to change and throw this error when the handler finally resumes and returns.

Another thought is how an event is mapped to a workflow:

(GitopsInstallWorkflowPreparationHandled, (event) => event.awsAccount, 'awsAccount')

For this to work without collisions, the assumption is that only one workflow instance is ever running for a single awsAccount. If this is not the case (ie: you can have multiple concurrent gitops workflows per aws account), i'd suggest mapping based on a workflow-specific correlation id in the message attributes.

Interested to know your thoughts

@valdestron
Copy link
Author

Thanks a lot, yes you are right the problem was with the unique id, I was testing on the same id which will never be like that!

Now everything works as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants