Skip to content

Commit

Permalink
Add multi-container probing
Browse files Browse the repository at this point in the history
  • Loading branch information
ReToCode committed Jan 31, 2024
1 parent f4ef3c8 commit b7819bb
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 125 deletions.
10 changes: 8 additions & 2 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ metadata:
app.kubernetes.io/component: controller
app.kubernetes.io/version: devel
annotations:
knative.dev/example-checksum: "f2fc138e"
knative.dev/example-checksum: "632d47dd"
data:
_example: |-
################################
Expand Down Expand Up @@ -50,9 +50,15 @@ data:
# Indicates whether multi container support is enabled
#
# WARNING: Cannot safely be disabled once enabled.
# See: https://knative.dev/docs/serving/feature-flags/#multi-containers
# See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-containers
multi-container: "enabled"
# Indicates whether multi container probing is enabled
#
# WARNING: Cannot safely be disabled once enabled.
# See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-container-probing
multi-container-probing: "disabled"
# Indicates whether Kubernetes affinity support is enabled
#
# WARNING: Cannot safely be disabled once enabled.
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
func defaultFeaturesConfig() *Features {
return &Features{
MultiContainer: Enabled,
MultiContainerProbing: Disabled,
PodSpecAffinity: Disabled,
PodSpecTopologySpreadConstraints: Disabled,
PodSpecDryRun: Allowed,
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewFeaturesConfigFromMap(data map[string]string) (*Features, error) {

if err := cm.Parse(data,
asFlag("multi-container", &nc.MultiContainer),
asFlag("multi-container-probing", &nc.MultiContainerProbing),
asFlag("kubernetes.podspec-affinity", &nc.PodSpecAffinity),
asFlag("kubernetes.podspec-topologyspreadconstraints", &nc.PodSpecTopologySpreadConstraints),
asFlag("kubernetes.podspec-dryrun", &nc.PodSpecDryRun),
Expand Down Expand Up @@ -124,6 +126,7 @@ func NewFeaturesConfigFromConfigMap(config *corev1.ConfigMap) (*Features, error)
// Features specifies which features are allowed by the webhook.
type Features struct {
MultiContainer Flag
MultiContainerProbing Flag
PodSpecAffinity Flag
PodSpecTopologySpreadConstraints Flag
PodSpecDryRun Flag
Expand Down
49 changes: 31 additions & 18 deletions pkg/apis/serving/k8s_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,23 @@ func validateContainersPorts(containers []corev1.Container) (corev1.ContainerPor

// validateSidecarContainer validate fields for non serving containers
func validateSidecarContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume) (errs *apis.FieldError) {
if container.LivenessProbe != nil {
errs = errs.Also(apis.CheckDisallowedFields(*container.LivenessProbe,
*ProbeMask(&corev1.Probe{})).ViaField("livenessProbe"))
}
if container.ReadinessProbe != nil {
errs = errs.Also(apis.CheckDisallowedFields(*container.ReadinessProbe,
*ProbeMask(&corev1.Probe{})).ViaField("readinessProbe"))
cfg := config.FromContextOrDefaults(ctx)
if cfg.Features.MultiContainerProbing != config.Enabled {
if container.LivenessProbe != nil {
errs = errs.Also(apis.CheckDisallowedFields(*container.LivenessProbe,
*ProbeMask(&corev1.Probe{})).ViaField("livenessProbe"))
}
if container.ReadinessProbe != nil {
errs = errs.Also(apis.CheckDisallowedFields(*container.ReadinessProbe,
*ProbeMask(&corev1.Probe{})).ViaField("readinessProbe"))
}
} else if cfg.Features.MultiContainerProbing == config.Enabled {
// Liveness Probes
errs = errs.Also(validateProbe(container.LivenessProbe, nil, true).ViaField("livenessProbe"))
// Readiness Probes
errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, nil, true).ViaField("readinessProbe"))
}

return errs.Also(validate(ctx, container, volumes))
}

Expand Down Expand Up @@ -547,9 +556,9 @@ func validateInitContainer(ctx context.Context, container corev1.Container, volu
// ValidateContainer validate fields for serving containers
func ValidateContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) {
// Liveness Probes
errs = errs.Also(validateProbe(container.LivenessProbe, port).ViaField("livenessProbe"))
errs = errs.Also(validateProbe(container.LivenessProbe, &port, false).ViaField("livenessProbe"))
// Readiness Probes
errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, port).ViaField("readinessProbe"))
errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, &port, false).ViaField("readinessProbe"))
return errs.Also(validate(ctx, container, volumes))
}

Expand Down Expand Up @@ -751,12 +760,12 @@ func validateContainerPortBasic(port corev1.ContainerPort) *apis.FieldError {
return errs
}

func validateReadinessProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError {
func validateReadinessProbe(p *corev1.Probe, port *corev1.ContainerPort, isSidecar bool) *apis.FieldError {
if p == nil {
return nil
}

errs := validateProbe(p, port)
errs := validateProbe(p, port, isSidecar)

if p.PeriodSeconds < 0 {
errs = errs.Also(apis.ErrOutOfBoundsValue(p.PeriodSeconds, 0, math.MaxInt32, "periodSeconds"))
Expand Down Expand Up @@ -798,7 +807,7 @@ func validateReadinessProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.Fi
return errs
}

func validateProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError {
func validateProbe(p *corev1.Probe, port *corev1.ContainerPort, isSidecar bool) *apis.FieldError {
if p == nil {
return nil
}
Expand All @@ -812,17 +821,21 @@ func validateProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError
if h.HTTPGet != nil {
handlers = append(handlers, "httpGet")
errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet")
getPort := h.HTTPGet.Port
if getPort.StrVal != "" && getPort.StrVal != port.Name {
errs = errs.Also(apis.ErrInvalidValue(getPort.String(), "httpGet.port", "Probe port must match container port"))
if !isSidecar {
getPort := h.HTTPGet.Port
if getPort.StrVal != "" && getPort.StrVal != port.Name {
errs = errs.Also(apis.ErrInvalidValue(getPort.String(), "httpGet.port", "Probe port must match container port"))
}
}
}
if h.TCPSocket != nil {
handlers = append(handlers, "tcpSocket")
errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket")
tcpPort := h.TCPSocket.Port
if tcpPort.StrVal != "" && tcpPort.StrVal != port.Name {
errs = errs.Also(apis.ErrInvalidValue(tcpPort.String(), "tcpSocket.port", "Probe port must match container port"))
if !isSidecar {
tcpPort := h.TCPSocket.Port
if tcpPort.StrVal != "" && tcpPort.StrVal != port.Name {
errs = errs.Also(apis.ErrInvalidValue(tcpPort.String(), "tcpSocket.port", "Probe port must match container port"))
}
}
}
if h.Exec != nil {
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/serving/v1/revision_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont
// If there are multiple containers then default probes will be applied to the container where user specified PORT
// default probes will not be applied for non serving containers
if len(rs.PodSpec.Containers) == 1 || len(container.Ports) != 0 {
rs.applyProbesWithDefaults(container)
rs.applyGRPCProbeDefaults(container)
rs.applyUserContainerDefaultReadinessProbe(container)
}
rs.applyReadinessProbeDefaults(container)
rs.applyGRPCProbeDefaults(container)

if rs.PodSpec.EnableServiceLinks == nil && apis.IsInCreate(ctx) {
rs.PodSpec.EnableServiceLinks = cfg.Defaults.EnableServiceLinks
Expand All @@ -154,7 +155,7 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont
}
}

func (*RevisionSpec) applyProbesWithDefaults(container *corev1.Container) {
func (*RevisionSpec) applyUserContainerDefaultReadinessProbe(container *corev1.Container) {
if container.ReadinessProbe == nil {
container.ReadinessProbe = &corev1.Probe{}
}
Expand All @@ -164,6 +165,14 @@ func (*RevisionSpec) applyProbesWithDefaults(container *corev1.Container) {
container.ReadinessProbe.GRPC == nil {
container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{}
}
}

func (*RevisionSpec) applyReadinessProbeDefaults(container *corev1.Container) {
if container.ReadinessProbe == nil {
// Sidecars are allowed to not have a readiness-probe
// we do not want the defaults in that case.
return
}

if container.ReadinessProbe.SuccessThreshold == 0 {
container.ReadinessProbe.SuccessThreshold = 1
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/serving/v1/revision_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
// It is never nil and should be exactly the specified container if len(containers) == 1 or
// if there are multiple containers it returns the container which has Ports
// as guaranteed by validation.
// Note: If you change this function, also update GetSidecarContainers.
func (rs *RevisionSpec) GetContainer() *corev1.Container {
switch {
case len(rs.Containers) == 1:
Expand All @@ -87,6 +88,24 @@ func (rs *RevisionSpec) GetContainer() *corev1.Container {
return &corev1.Container{}
}

// GetSidecarContainers returns a slice of pointers to all sidecar containers.
// If len(containers) == 1 OR only one container with a user-port exists, it will return an empty slice.
// It is the "rest" of GetContainer.
func (rs *RevisionSpec) GetSidecarContainers() []*corev1.Container {
sidecars := []*corev1.Container{}
if len(rs.Containers) == 1 {
return sidecars
}

for _, c := range rs.Containers {
if len(c.Ports) == 0 {
sidecars = append(sidecars, &c)

Check failure on line 102 in pkg/apis/serving/v1/revision_helpers.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

G601: Implicit memory aliasing in for loop. (gosec)
}
}

return sidecars
}

// SetRoutingState sets the routingState label on this Revision and updates the
// routingStateModified annotation.
func (r *Revision) SetRoutingState(state RoutingState, tm time.Time) {
Expand Down
104 changes: 64 additions & 40 deletions pkg/queue/readiness/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/serving/pkg/queue/health"
Expand All @@ -37,13 +38,10 @@ const (
retryInterval = 50 * time.Millisecond
)

// Probe wraps a corev1.Probe along with a count of consecutive, successful probes
// Probe holds all wrapped *corev1.Probe along with a barrier to sync single probing execution
type Probe struct {
*corev1.Probe
count int32
pollTimeout time.Duration // To make tests not run for 10 seconds.
out io.Writer // To make tests not log errors in good cases.
autoDetectHTTP2 bool // Feature gate to enable HTTP2 auto-detection.
probes []*wrappedProbe
out io.Writer // To make tests not log errors in good cases.

// Barrier sync to ensure only one probe is happening at the same time.
// When a probe is active `gv` will be non-nil.
Expand All @@ -52,6 +50,14 @@ type Probe struct {
gv *gateValue
}

type wrappedProbe struct {
*corev1.Probe
count int32
pollTimeout time.Duration // To make tests not run for 10 seconds.
out io.Writer // To make tests not log errors in good cases.
autoDetectHTTP2 bool // Feature gate to enable HTTP2 auto-detection.
}

// gateValue is a write-once boolean impl.
type gateValue struct {
broadcast chan struct{}
Expand Down Expand Up @@ -79,27 +85,41 @@ func (gv *gateValue) read() bool {
}

// NewProbe returns a pointer to a new Probe.
func NewProbe(v1p *corev1.Probe) *Probe {
func NewProbe(probes []*corev1.Probe) *Probe {
wrappedProbes := []*wrappedProbe{}
for _, p := range probes {
wrappedProbes = append(wrappedProbes, &wrappedProbe{
Probe: p,
out: os.Stderr,
pollTimeout: PollTimeout,
})
}
return &Probe{
Probe: v1p,
pollTimeout: PollTimeout,
out: os.Stderr,
probes: wrappedProbes,
out: os.Stderr,
}
}

// NewProbeWithHTTP2AutoDetection returns a pointer to a new Probe that has HTTP2
// auto-detection enabled.
func NewProbeWithHTTP2AutoDetection(v1p *corev1.Probe) *Probe {
func NewProbeWithHTTP2AutoDetection(probes []*corev1.Probe) *Probe {
wrappedProbes := []*wrappedProbe{}
for _, p := range probes {
wrappedProbes = append(wrappedProbes, &wrappedProbe{
Probe: p,
out: os.Stderr,
pollTimeout: PollTimeout,
autoDetectHTTP2: true,
})
}
return &Probe{
Probe: v1p,
pollTimeout: PollTimeout,
out: os.Stderr,
autoDetectHTTP2: true,
probes: wrappedProbes,
out: os.Stderr,
}
}

// shouldProbeAggressively indicates whether the Knative probe with aggressive retries should be used.
func (p *Probe) shouldProbeAggressively() bool {
func (p *wrappedProbe) shouldProbeAggressively() bool {
return p.PeriodSeconds == 0
}

Expand Down Expand Up @@ -128,36 +148,40 @@ func (p *Probe) ProbeContainer() bool {
}

func (p *Probe) probeContainerImpl() bool {
var err error

switch {
case p.HTTPGet != nil:
err = p.httpProbe()
case p.TCPSocket != nil:
err = p.tcpProbe()
case p.GRPC != nil:
err = p.grpcProbe()
case p.Exec != nil:
// Should never be reachable. Exec probes to be translated to
// TCP probes when container is built.
// Using Fprintf for a concise error message in the event log.
fmt.Fprintln(p.out, "exec probe not supported")
return false
default:
// Using Fprintf for a concise error message in the event log.
fmt.Fprintln(p.out, "no probe found")
return false
var probeGroup errgroup.Group

for _, probe := range p.probes {
innerProbe := probe
probeGroup.Go(func() error {
fmt.Println("Probing", innerProbe)
switch {
case innerProbe.HTTPGet != nil:
return innerProbe.httpProbe()
case innerProbe.TCPSocket != nil:

return innerProbe.tcpProbe()
case innerProbe.GRPC != nil:
return innerProbe.grpcProbe()
case innerProbe.Exec != nil:
// Should never be reachable. Exec probes to be translated to
// TCP probes when container is built.
return fmt.Errorf("exec probe not supported")
default:
return fmt.Errorf("no probe found")
}
})
}

err := probeGroup.Wait()
if err != nil {
// Using Fprintf for a concise error message in the event log.
fmt.Fprintln(p.out, err.Error())
return false
}

return true
}

func (p *Probe) doProbe(probe func(time.Duration) error) error {
func (p *wrappedProbe) doProbe(probe func(time.Duration) error) error {
if !p.shouldProbeAggressively() {
return probe(time.Duration(p.TimeoutSeconds) * time.Second)
}
Expand Down Expand Up @@ -193,7 +217,7 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error {
// tcpProbe function executes TCP probe once if its standard probe
// otherwise TCP probe polls condition function which returns true
// if the probe count is greater than success threshold and false if TCP probe fails
func (p *Probe) tcpProbe() error {
func (p *wrappedProbe) tcpProbe() error {
config := health.TCPProbeConfigOptions{
Address: p.TCPSocket.Host + ":" + p.TCPSocket.Port.String(),
}
Expand All @@ -207,7 +231,7 @@ func (p *Probe) tcpProbe() error {
// httpProbe function executes HTTP probe once if its standard probe
// otherwise HTTP probe polls condition function which returns true
// if the probe count is greater than success threshold and false if HTTP probe fails
func (p *Probe) httpProbe() error {
func (p *wrappedProbe) httpProbe() error {
config := health.HTTPProbeConfigOptions{
HTTPGetAction: p.HTTPGet,
MaxProtoMajor: 1,
Expand All @@ -225,7 +249,7 @@ func (p *Probe) httpProbe() error {
}

// grpcProbe function executes gRPC probe
func (p *Probe) grpcProbe() error {
func (p *wrappedProbe) grpcProbe() error {
config := health.GRPCProbeConfigOptions{
GRPCAction: p.GRPC,
}
Expand Down
Loading

0 comments on commit b7819bb

Please sign in to comment.