Skip to content

Commit

Permalink
[mesh] enable mTLS (#281)
Browse files Browse the repository at this point in the history
* [mesh] add cert manager

* [mesh] fix conflict

* [mesh] add define

* [mesh] add ingress controller cert handling

* [mesh] fix autotest failure

* [mesh] refactor and raise coverage rate

* [mesh] fix provider bug

* [mesh] fix mTLS cert bug

* Apply suggestions from localvar's code review

Co-authored-by: Bomin Zhang <[email protected]>

* [mesh] refactor according to localvar's comments

* [mesh] raise coverage rate

* Update pkg/filter/proxy/proxy.go

Co-authored-by: Hao Chen <[email protected]>

* [mesh] update with haoel's comments

* [mesh] refactor informer server and ingresscontroller onCert func

* [mesh] fix typo

* [mesh] fix ingress httpserver bug and update spec test

* [mesh] solve conflict

* [mesh] fix warning and bug

* [mesh] fix typo

* [mesh] fix init bug and add more info log

* [mesh] fix mTLS bugs

* [mesh] fix auto test

* [mesh] ingress/egress use its own informer

* [mesh] add closing informer in ingress/egress

* [mesh] udpate according to yunlong's comments

Co-authored-by: Bomin Zhang <[email protected]>
Co-authored-by: Hao Chen <[email protected]>
  • Loading branch information
3 people authored Oct 19, 2021
1 parent 69ef753 commit 514ba09
Show file tree
Hide file tree
Showing 16 changed files with 1,671 additions and 112 deletions.
8 changes: 4 additions & 4 deletions pkg/filter/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *pool) status() *PoolStatus {
return s
}

func (p *pool) handle(ctx context.HTTPContext, reqBody io.Reader) string {
func (p *pool) handle(ctx context.HTTPContext, reqBody io.Reader, client *http.Client) string {
addTag := func(subPrefix, msg string) {
tag := stringtool.Cat(p.tagPrefix, "#", subPrefix, ": ", msg)
ctx.Lock()
Expand Down Expand Up @@ -159,7 +159,7 @@ func (p *pool) handle(ctx context.HTTPContext, reqBody io.Reader) string {
return resultInternalError
}

resp, span, err := p.doRequest(ctx, req)
resp, span, err := p.doRequest(ctx, req, client)
if err != nil {
// NOTE: May add option to cancel the tracing if failed here.
// ctx.Span().Cancel()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (p *pool) prepareRequest(ctx context.HTTPContext, server *Server, reqBody i
return p.newRequest(ctx, server, reqBody)
}

func (p *pool) doRequest(ctx context.HTTPContext, req *request) (*http.Response, tracing.Span, error) {
func (p *pool) doRequest(ctx context.HTTPContext, req *request, client *http.Client) (*http.Response, tracing.Span, error) {
req.start()

spanName := p.spec.SpanName
Expand All @@ -219,7 +219,7 @@ func (p *pool) doRequest(ctx context.HTTPContext, req *request) (*http.Response,
span := ctx.Span().NewChildWithStart(spanName, req.startTime())
span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.std.Header))

resp, err := fnSendRequest(req.std)
resp, err := fnSendRequest(req.std, client)
if err != nil {
return nil, nil, err
}
Expand Down
108 changes: 73 additions & 35 deletions pkg/filter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package proxy

import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/util/fallback"
)
Expand All @@ -51,39 +54,8 @@ func init() {
httppipeline.Register(&Proxy{})
}

// All Proxy instances use one globalClient in order to reuse
// some resounces such as keepalive connections.
var globalClient = &http.Client{
// NOTE: Timeout could be no limit, real client or server could cancel it.
Timeout: 0,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 60 * time.Second,
DualStack: true,
}).DialContext,
TLSClientConfig: &tls.Config{
// NOTE: Could make it an paramenter,
// when the requests need cross WAN.
InsecureSkipVerify: true,
},
DisableCompression: false,
// NOTE: The large number of Idle Connections can
// reduce overhead of building connections.
MaxIdleConns: 10240,
MaxIdleConnsPerHost: 512,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}

var fnSendRequest = func(r *http.Request) (*http.Response, error) {
return globalClient.Do(r)
var fnSendRequest = func(r *http.Request, client *http.Client) (*http.Response, error) {
return client.Do(r)
}

type (
Expand All @@ -98,6 +70,8 @@ type (
candidatePools []*pool
mirrorPool *pool

client *http.Client

compression *compression
}

Expand All @@ -109,6 +83,7 @@ type (
MirrorPool *PoolSpec `yaml:"mirrorPool,omitempty" jsonschema:"omitempty"`
FailureCodes []int `yaml:"failureCodes" jsonschema:"omitempty,uniqueItems=true,format=httpcode-array"`
Compression *CompressionSpec `yaml:"compression,omitempty" jsonschema:"omitempty"`
MTLS *MTLS `yaml:"mtls,omitempty" jsonschema:"omitempty"`
}

// FallbackSpec describes the fallback policy.
Expand All @@ -123,6 +98,13 @@ type (
CandidatePools []*PoolStatus `yaml:"candidatePools,omitempty"`
MirrorPool *PoolStatus `yaml:"mirrorPool,omitempty"`
}

// MTLS is the configuration for client side mTLS.
MTLS struct {
CertBase64 string `yaml:"certBase64" jsonschema:"required,format=base64"`
KeyBase64 string `yaml:"keyBase64" jsonschema:"required,format=base64"`
RootCertBase64 string `yaml:"rootCertBase64" jsonschema:"required,format=base64"`
}
)

// Validate validates Spec.
Expand Down Expand Up @@ -194,6 +176,37 @@ func (b *Proxy) Inherit(filterSpec *httppipeline.FilterSpec, previousGeneration
b.Init(filterSpec)
}

func (b *Proxy) needmTLS() bool {
return b.spec.MTLS != nil
}

func (b *Proxy) tlsConfig() *tls.Config {
if !b.needmTLS() {
return &tls.Config{
InsecureSkipVerify: true,
}
}
rootCertPem, _ := base64.StdEncoding.DecodeString(b.spec.MTLS.RootCertBase64)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(rootCertPem)

var certificates []tls.Certificate
certPem, _ := base64.StdEncoding.DecodeString(b.spec.MTLS.CertBase64)
keyPem, _ := base64.StdEncoding.DecodeString(b.spec.MTLS.KeyBase64)
cert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
logger.Errorf("proxy generates x509 key pair failed: %v", err)
return &tls.Config{
InsecureSkipVerify: true,
}
}
certificates = append(certificates, cert)
return &tls.Config{
Certificates: certificates,
RootCAs: caCertPool,
}
}

func (b *Proxy) reload() {
super := b.filterSpec.Super()

Expand Down Expand Up @@ -221,6 +234,31 @@ func (b *Proxy) reload() {
if b.spec.Compression != nil {
b.compression = newCompression(b.spec.Compression)
}

b.client = &http.Client{
// NOTE: Timeout could be no limit, real client or server could cancel it.
Timeout: 0,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 60 * time.Second,
DualStack: true,
}).DialContext,
TLSClientConfig: b.tlsConfig(),
DisableCompression: false,
// NOTE: The large number of Idle Connections can
// reduce overhead of building connections.
MaxIdleConns: 10240,
MaxIdleConnsPerHost: 512,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
}

// Status returns Proxy status.
Expand Down Expand Up @@ -283,7 +321,7 @@ func (b *Proxy) handle(ctx context.HTTPContext) (result string) {

go func() {
defer wg.Done()
b.mirrorPool.handle(ctx, slave)
b.mirrorPool.handle(ctx, slave, b.client)
}()
}

Expand All @@ -305,7 +343,7 @@ func (b *Proxy) handle(ctx context.HTTPContext) (result string) {
return ""
}

result = p.handle(ctx, ctx.Request().Body())
result = p.handle(ctx, ctx.Request().Body(), b.client)
if result != "" {
return result
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/filter/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,22 @@ failureCodes: [503, 504]
t.Error("fallback for 500 should be false")
}

fnSendRequest = func(r *http.Request) (*http.Response, error) {
fnSendRequest = func(r *http.Request, client *http.Client) (*http.Response, error) {
return &http.Response{
Body: io.NopCloser(strings.NewReader("this is the body")),
}, nil
}

result := proxy.Handle(ctx)
if result != "" {
t.Error("proxy.Handle should succeeded")
}
ctx.Finish()

fnSendRequest = func(r *http.Request) (*http.Response, error) {
fnSendRequest = func(r *http.Request, client *http.Client) (*http.Response, error) {
return nil, fmt.Errorf("mocked error")
}

result = proxy.Handle(ctx)
if result == "" {
t.Error("proxy.Handle should fail")
Expand Down
19 changes: 18 additions & 1 deletion pkg/object/httpserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package httpserver

import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"regexp"
Expand All @@ -39,6 +40,7 @@ type (
CacheSize uint32 `yaml:"cacheSize" jsonschema:"omitempty"`
XForwardedFor bool `yaml:"xForwardedFor" jsonschema:"omitempty"`
Tracing *tracing.Spec `yaml:"tracing" jsonschema:"omitempty"`
CaCertBase64 string `yaml:"caCertBase64" jsonschema:"omitempty,format=base64"`

// Support multiple certs, preserve the certbase64 and keybase64
// for backward compatibility
Expand Down Expand Up @@ -142,7 +144,22 @@ func (spec *Spec) tlsConfig() (*tls.Config, error) {
return nil, fmt.Errorf("none valid certs and secret")
}

return &tls.Config{Certificates: certificates}, nil
tlsConf := &tls.Config{
Certificates: certificates,
}

// if caCertBase64 configuration is provided, should enable tls.ClientAuth and
// add the root cert
if len(spec.CaCertBase64) != 0 {
rootCertPem, _ := base64.StdEncoding.DecodeString(spec.CaCertBase64)
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(rootCertPem)

tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
tlsConf.ClientCAs = certPool
}

return tlsConf, nil
}

func (h *Header) initHeaderRoute() {
Expand Down
Loading

0 comments on commit 514ba09

Please sign in to comment.