Skip to content

Commit

Permalink
defence programing, isolationg any panics from individual objects
Browse files Browse the repository at this point in the history
  • Loading branch information
qdongxu committed Sep 25, 2019
1 parent 329d82d commit 95ddef4
Showing 1 changed file with 75 additions and 55 deletions.
130 changes: 75 additions & 55 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,75 +145,95 @@ func (s *Scheduler) handleConfig(config map[string]string) {

runningObjects := make([]*runningObject, 0)
for name, v := range createOrupdate {
spec, err := SpecFromYAML(v)
if err != nil {
logger.Errorf("BUG: spec from yaml failed: %s: %v", v, err)
continue
}

if name != spec.GetName() {
logger.Errorf("BUG: inconsistent name in key and spec: %s, %s",
name, spec.GetName())
continue
}
or, exists := objectBook[spec.GetKind()]
if !exists {
logger.Errorf("BUG: unsupported kind: %s", spec.GetKind())
continue
}

runningObjects = append(runningObjects, &runningObject{
config: v,
spec: spec,
or: or,
})
func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("recover from handleKvs when creating or updating cfg of %s, err: %v, stack trace:\n%s\n",
name, err, debug.Stack())
}
}()

spec, err := SpecFromYAML(v)
if err != nil {
logger.Errorf("BUG: spec from yaml failed: %s: %v", v, err)
return
}

if name != spec.GetName() {
logger.Errorf("BUG: inconsistent name in key and spec: %s, %s",
name, spec.GetName())
return
}
or, exists := objectBook[spec.GetKind()]
if !exists {
logger.Errorf("BUG: unsupported kind: %s", spec.GetKind())
return
}

runningObjects = append(runningObjects, &runningObject{
config: v,
spec: spec,
or: or,
})
}()
}

sort.Sort(runningPluginsByDepend(runningObjects))

for _, runningObject := range runningObjects {
name := runningObject.spec.GetName()

var prevValue reflect.Value
prevObject := s.runningObjects[name]
if prevObject == nil {
prevValue = reflect.New(runningObject.or.objectType).Elem()
} else {
prevValue = reflect.ValueOf(prevObject.object)
}
in := []reflect.Value{
reflect.ValueOf(runningObject.spec),
prevValue,
reflect.ValueOf(s.handlers),
}
runningObject.object = reflect.ValueOf(runningObject.or.NewFunc).
Call(in)[0].Interface().(Object)
s.runningObjects[name] = runningObject
func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("recover from handleKvs when updating running obj %s, err: %v, stack trace:\n%s\n",
name, err, debug.Stack())
}
}()

var prevValue reflect.Value
prevObject := s.runningObjects[name]
if prevObject == nil {
prevValue = reflect.New(runningObject.or.objectType).Elem()
} else {
prevValue = reflect.ValueOf(prevObject.object)
}
in := []reflect.Value{
reflect.ValueOf(runningObject.spec),
prevValue,
reflect.ValueOf(s.handlers),
}
runningObject.object = reflect.ValueOf(runningObject.or.NewFunc).
Call(in)[0].Interface().(Object)
s.runningObjects[name] = runningObject
}()
}
}

func (s *Scheduler) syncStatus() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("recover from handleSyncStatus, err: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()

timestamp := time.Now().Unix()
statuses := make(map[string]string)
for name, runningObject := range s.runningObjects {
status := reflect.ValueOf(runningObject.object).
MethodByName("Status").Call(nil)[0].Interface()
reflect.ValueOf(status).Elem().FieldByName("Timestamp").SetInt(timestamp)

buff, err := yaml.Marshal(status)
if err != nil {
logger.Errorf("BUG: marshal %#v to yaml failed: %v",
status, err)
continue
}
statuses[name] = string(buff)
func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("recover from handleSyncStatus, err: %v, stack trace:\n%s\n",
err, debug.Stack())
}
}()

status := reflect.ValueOf(runningObject.object).
MethodByName("Status").Call(nil)[0].Interface()
reflect.ValueOf(status).Elem().FieldByName("Timestamp").SetInt(timestamp)

buff, err := yaml.Marshal(status)
if err != nil {
logger.Errorf("BUG: marshal %#v to yaml failed: %v",
status, err)
return
}
statuses[name] = string(buff)
}()
}

s.storage.syncStatus(statuses)
Expand Down

0 comments on commit 95ddef4

Please sign in to comment.