Skip to content

Commit

Permalink
Merge pull request concourse#8322 from evanchaoli/depop-avoid-duplica…
Browse files Browse the repository at this point in the history
…ting-parallel-volume-streams

Depop avoid duplicating parallel volume streams
  • Loading branch information
xtremerui committed May 12, 2022
2 parents 3b89b57 + 21093c2 commit 6f2bd34
Show file tree
Hide file tree
Showing 28 changed files with 790 additions and 216 deletions.
4 changes: 4 additions & 0 deletions atc/db/lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func NewInMemoryCheckBuildTrackingLockID(checkableType string, checkableId int)
return LockID{LockTypeInMemoryCheckBuildTracking, lockIDFromString(fmt.Sprintf("%s-%d", checkableType, checkableId))}
}

func NewVolumeStreamingLockID(resourceCacheID int, worker string) LockID {
return LockID{LockTypeJobScheduling, lockIDFromString(fmt.Sprintf("%d-%s", resourceCacheID, worker))}
}

//counterfeiter:generate . LockFactory
type LockFactory interface {
Acquire(logger lager.Logger, ids LockID) (Lock, bool, error)
Expand Down
16 changes: 16 additions & 0 deletions atc/engine/build_step_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,22 @@ func (delegate *buildStepDelegate) StreamingVolume(logger lager.Logger, volume s
}
}

func (delegate *buildStepDelegate) WaitingForStreamedVolume(logger lager.Logger, volume string, destWorker string) {
err := delegate.build.SaveEvent(event.WaitingForStreamedVolume{
Time: time.Now().Unix(),
Origin: event.Origin{
ID: event.OriginID(delegate.planID),
},
Volume: volume,
DestWorker: destWorker,
})

if err != nil {
logger.Error("failed-to-save-waiting-for-streamed-volume-event", err)
return
}
}

func (delegate *buildStepDelegate) Errored(logger lager.Logger, message string) {
err := delegate.build.SaveEvent(event.Error{
Message: message,
Expand Down
14 changes: 14 additions & 0 deletions atc/engine/build_step_delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,4 +1079,18 @@ var _ = Describe("BuildStepDelegate", func() {
Expect(e.(event.StreamingVolume).DestWorker).To(Equal("dest-worker"))
})
})

Describe("WaitingForStreamedVolume", func() {
JustBeforeEach(func() {
delegate.WaitingForStreamedVolume(logger, "some-volume", "dest-worker")
})

It("saves an event", func() {
Expect(fakeBuild.SaveEventCallCount()).To(Equal(1))
e := fakeBuild.SaveEventArgsForCall(0)
Expect(e.EventType()).To(Equal(atc.EventType("waiting-for-streamed-volume")))
Expect(e.(event.WaitingForStreamedVolume).Volume).To(Equal("some-volume"))
Expect(e.(event.WaitingForStreamedVolume).DestWorker).To(Equal("dest-worker"))
})
})
})
10 changes: 10 additions & 0 deletions atc/event/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ type StreamingVolume struct {
func (StreamingVolume) EventType() atc.EventType { return EventTypeStreamingVolume }
func (StreamingVolume) Version() atc.EventVersion { return "1.0" }

type WaitingForStreamedVolume struct {
Time int64 `json:"time"`
Origin Origin `json:"origin"`
Volume string `json:"volume"`
DestWorker string `json:"dest_worker"`
}

func (WaitingForStreamedVolume) EventType() atc.EventType { return EventTypeWaitingForStreamedVolume }
func (WaitingForStreamedVolume) Version() atc.EventVersion { return "1.0" }

type Log struct {
Time int64 `json:"time"`
Origin Origin `json:"origin"`
Expand Down
1 change: 1 addition & 0 deletions atc/event/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func init() {
RegisterEvent(WaitingForWorker{})
RegisterEvent(SelectedWorker{})
RegisterEvent(StreamingVolume{})
RegisterEvent(WaitingForStreamedVolume{})
RegisterEvent(Log{})
RegisterEvent(Error{})
RegisterEvent(ImageCheck{})
Expand Down
3 changes: 3 additions & 0 deletions atc/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
// a step (get/put/task) is streaming a volume from another worker
EventTypeStreamingVolume atc.EventType = "streaming-volume"

// a step (get/put/task) is waiting for another step to stream the volume to this worker
EventTypeWaitingForStreamedVolume atc.EventType = "waiting-for-streamed-volume"

// task execution started
EventTypeStartTask atc.EventType = "start-task"

Expand Down
1 change: 1 addition & 0 deletions atc/exec/build_step_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BuildStepDelegate interface {
WaitingForWorker(lager.Logger)
SelectedWorker(lager.Logger, string)
StreamingVolume(lager.Logger, string, string, string)
WaitingForStreamedVolume(lager.Logger, string, string)

ConstructAcrossSubsteps([]byte, []atc.AcrossVar, [][]interface{}) ([]atc.VarScopedPlan, error)
ContainerOwner(planId atc.PlanID) db.ContainerOwner
Expand Down
43 changes: 43 additions & 0 deletions atc/exec/execfakes/fake_build_step_delegate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions atc/exec/execfakes/fake_check_delegate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions atc/exec/execfakes/fake_get_delegate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions atc/exec/execfakes/fake_put_delegate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6f2bd34

Please sign in to comment.