Skip to content

Commit

Permalink
update server and loadbalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Apr 1, 2022
1 parent 3b914e6 commit 36eceb5
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 26 deletions.
28 changes: 4 additions & 24 deletions pkg/protocols/httpprot/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,36 +101,16 @@ func (p *Protocol) CreateResponse(resp interface{}) protocols.Response {
}

// CreateLoadBalancer creates a load balancer.
func (p *Protocol) CreateLoadBalancer(lb string, servers []protocols.Server) (protocols.LoadBalancer, error) {
return nil, nil
func (p *Protocol) CreateLoadBalancer(spec interface{}, servers []protocols.Server) (protocols.LoadBalancer, error) {
return NewLoadBalancer(spec, servers)
}

// CreateServer creates a server.
func (p *Protocol) CreateServer(uri string) (protocols.Server, error) {
return nil, nil
func (p *Protocol) CreateServer(spec interface{}) (protocols.Server, error) {
return NewServer(spec)
}

// CreateTrafficMatcher creates a traffic matcher.
func (p *Protocol) CreateTrafficMatcher(spec interface{}) (protocols.TrafficMatcher, error) {
return NewMatcher(spec)
}

// Server implements protocols.Server for HTTP.
type Server struct {
URL string `yaml:"url" jsonschema:"required,format=url"`
Tags []string `yaml:"tags" jsonschema:"omitempty,uniqueItems=true"`
W int `yaml:"weight" jsonschema:"omitempty,minimum=0,maximum=100"`
addrIsHostName bool
}

// Weight returns weight of the server.
func (s *Server) Weight() int {
return s.W
}

// SendRequest sends request to the server and returns the response.
func (s *Server) SendRequest(req protocols.Request) (protocols.Response, error) {
req = req.Clone()

return nil, nil
}
103 changes: 103 additions & 0 deletions pkg/protocols/httpprot/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package httpprot

import (
"fmt"
"math/rand"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/util/hashtool"
)

const (
// PolicyIPHash is the policy of ip hash.
PolicyIPHash = "ipHash"
// PolicyHeaderHash is the policy of header hash.
PolicyHeaderHash = "headerHash"
)

type LoadBalancer struct {
general protocols.LoadBalancer

spec *LoadBalancerSpec
servers []protocols.Server
}

type LoadBalancerSpec struct {
Policy string `yaml:"policy" jsonschema:"required,enum=roundRobin,enum=random,enum=weightedRandom,enum=ipHash,enum=headerHash"`
HeaderHashKey string `yaml:"headerHashKey" jsonschema:"omitempty"`
}

func (s LoadBalancerSpec) validate() error {
if s.Policy == PolicyHeaderHash && len(s.HeaderHashKey) == 0 {
return fmt.Errorf("headerHash needs to specify headerHashKey")
}

return nil
}

var _ protocols.LoadBalancer = (*LoadBalancer)(nil)

func NewLoadBalancer(spec interface{}, servers []protocols.Server) (protocols.LoadBalancer, error) {
s := spec.(*LoadBalancerSpec)
if err := s.validate(); err != nil {
return nil, err
}
lb := &LoadBalancer{
spec: s,
servers: servers,
}
p := s.Policy
if p == protocols.PolicyRandom || p == protocols.PolicyWeightedRandom || p == protocols.PolicyRoundRobin {
glb, err := protocols.NewLoadBalancer(spec, servers)
if err != nil {
return nil, err
}
lb.general = glb
return lb, nil
}
return lb, nil
}

func (lb *LoadBalancer) ChooseServer(req protocols.Request) protocols.Server {
r := req.(*Request)
switch lb.spec.Policy {
case protocols.PolicyRoundRobin, protocols.PolicyRandom, protocols.PolicyWeightedRandom:
return lb.general.ChooseServer(req)
case PolicyIPHash:
return lb.chooseIPHash(r)
case PolicyHeaderHash:
return lb.chooseHeaderHash(r)
default:
logger.Errorf("unsupported load balancing policy: %s", lb.spec.Policy)
return lb.servers[rand.Intn(len(lb.servers))]
}
}

func (lb *LoadBalancer) chooseIPHash(req *Request) protocols.Server {
sum32 := int(hashtool.Hash32(req.RealIP()))
return lb.servers[sum32%len(lb.servers)]
}

func (lb *LoadBalancer) chooseHeaderHash(req *Request) protocols.Server {
value := req.HTTPHeader().Get(lb.spec.HeaderHashKey)
sum32 := int(hashtool.Hash32(value))
return lb.servers[sum32%len(lb.servers)]
}
198 changes: 198 additions & 0 deletions pkg/protocols/httpprot/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package httpprot

import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
)

// Server implements protocols.Server for HTTP.
type Server struct {
spec *ServerSpec
target *ServerTarget

client *http.Client
}

var fnSendRequest = func(r *http.Request, client *http.Client) (*http.Response, error) {
return client.Do(r)
}

type ServerSpec struct {
Server *ServerTarget `yaml:"server" jsonschema:"required"`
MTLS *ServerMTLS `yaml:"mtls,omitempty" jsonschema:"omitempty"`
MaxIdleConns int `yaml:"maxIdleConns" jsonschema:"omitempty"`
MaxIdleConnsPerHost int `yaml:"maxIdleConnsPerHost" jsonschema:"omitempty"`
}

type ServerMTLS struct {
CertBase64 string `yaml:"certBase64" jsonschema:"required,format=base64"`
KeyBase64 string `yaml:"keyBase64" jsonschema:"required,format=base64"`
RootCertBase64 string `yaml:"rootCertBase64" jsonschema:"required,format=base64"`
}

var _ protocols.Server = (*Server)(nil)

func (s *Server) Weight() int {
return s.target.Weight
}

func (s *Server) tlsConfig() *tls.Config {
if s.spec.MTLS == nil {
return &tls.Config{
InsecureSkipVerify: true,
}
}
rootCertPem, _ := base64.StdEncoding.DecodeString(s.spec.MTLS.RootCertBase64)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(rootCertPem)

var certificates []tls.Certificate
certPem, _ := base64.StdEncoding.DecodeString(s.spec.MTLS.CertBase64)
keyPem, _ := base64.StdEncoding.DecodeString(s.spec.MTLS.KeyBase64)
cert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
logger.Errorf("proxy generates x509 key pair failed: %v", err)
return &tls.Config{
InsecureSkipVerify: true,
}
}
certificates = append(certificates, cert)
return &tls.Config{
Certificates: certificates,
RootCAs: caCertPool,
}
}

func NewServer(spec interface{}) (protocols.Server, error) {
server := &Server{}

server.spec = spec.(*ServerSpec)
server.target = server.spec.Server
server.target.init()
server.client = &http.Client{
// NOTE: Timeout could be no limit, real client or server could cancel it.
Timeout: 0,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 60 * time.Second,
DualStack: true,
}).DialContext,
TLSClientConfig: server.tlsConfig(),
DisableCompression: false,
// NOTE: The large number of Idle Connections can
// reduce overhead of building connections.
MaxIdleConns: server.spec.MaxIdleConns,
MaxIdleConnsPerHost: server.spec.MaxIdleConnsPerHost,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
return server, nil
}

// SendRequest sends request to the server and returns the response.
func (s *Server) SendRequest(req protocols.Request) (protocols.Response, error) {
resp, err := s.sendRequest(req.(*Request))
if err != nil {
return nil, err
}
return resp, nil
}

func (s *Server) sendRequest(req *Request) (*Response, error) {
r, err := prepareRequest(req, s.target)
if err != nil {
return nil, err
}
resp, err := fnSendRequest(r, s.client)
if err != nil {
return nil, err
}
return NewResponse(resp), nil
}

func (s *Server) Close() error {
return nil
}

func prepareRequest(req *Request, server *ServerTarget) (*http.Request, error) {
url := server.URL + req.Path()
if req.URL().RawQuery != "" {
url += "?" + req.URL().RawQuery
}

stdr, err := http.NewRequestWithContext(req.Context(), req.Method(), url, req.GetPayload())
if err != nil {
return nil, fmt.Errorf("BUG: new request failed: %v", err)
}

stdr.Header = req.HTTPHeader()
// only set host when server address is not host name.
if !server.addrIsHostName {
stdr.Host = req.Host()
}
return stdr, nil
}

type ServerTarget struct {
URL string `yaml:"url" jsonschema:"required,format=url"`
Tags []string `yaml:"tags" jsonschema:"omitempty,uniqueItems=true"`
Weight int `yaml:"weight" jsonschema:"omitempty,minimum=0,maximum=100"`
addrIsHostName bool
}

func (s *ServerTarget) init() {
u, err := url.Parse(s.URL)
if err != nil {
return
}
host := u.Host

square := strings.LastIndexByte(host, ']')
colon := strings.LastIndexByte(host, ':')

// There is a port number, remove it.
if colon > square {
host = host[:colon]
}

// IPv6
if square != -1 && host[0] == '[' {
host = host[1:square]
}

s.addrIsHostName = net.ParseIP(host) == nil
}
Loading

0 comments on commit 36eceb5

Please sign in to comment.