Skip to content

Commit

Permalink
Dynamic modification of configuration files at the Codis level (OpenA…
Browse files Browse the repository at this point in the history
…tomFoundation#2110)

* Dynamic modification of configuration files.Check the correctness of the functionality.

* Modified some specification issues.

* Dynamically modify Codis configuration file and add slow query logs.

* Dynamic modification of configuration files at the Codis level
  • Loading branch information
dingxiaoshuai123 authored Nov 9, 2023
1 parent 8a9704c commit 28831ef
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 265 deletions.
3 changes: 0 additions & 3 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# set the number of slowlog in memory, max len is 10000000. (0 to disable)
slowlog_max_len = 128000

# Set metrics server (such as https://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down
10 changes: 2 additions & 8 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000
# set the number of slowlog in memory, max len is 10000000. (0 to disable)
slowlog_max_len = 128000
# Set metrics server (such as https://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down Expand Up @@ -183,7 +180,6 @@ type Config struct {
SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"`

SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`
SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
Expand All @@ -195,7 +191,8 @@ type Config struct {
MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"`
MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`
ConfigFileName string `toml:"-" json:"config_file_name"`

ConfigFileName string `toml:"-" json:"config_file_name"`
}

func NewDefaultConfig() *Config {
Expand Down Expand Up @@ -315,9 +312,6 @@ func (c *Config) Validate() error {
if c.SlowlogLogSlowerThan < 0 {
return errors.New("invalid slowlog_log_slower_than")
}
if c.SlowlogMaxLen < 0 {
return errors.New("invalid slowlog_max_len")
}

if c.MetricsReportPeriod < 0 {
return errors.New("invalid metrics_report_period")
Expand Down
1 change: 0 additions & 1 deletion codis/pkg/proxy/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func init() {
{"SUNIONSTORE", FlagWrite},
{"SYNC", FlagNotAllow},
{"PCONFIG", 0},
{"PSLOWLOG", 0},
{"TIME", FlagNotAllow},
{"TOUCH", FlagWrite},
{"TTL", 0},
Expand Down
109 changes: 91 additions & 18 deletions codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,98 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp {
p.mu.Lock()
defer p.mu.Unlock()
switch key {
case "proxy_max_clients":
return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients)))
case "jodis":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("jodis_name")),
redis.NewBulkBytes([]byte(p.config.JodisName)),
redis.NewBulkBytes([]byte("jodis_addr")),
redis.NewBulkBytes([]byte(p.config.JodisAddr)),
redis.NewBulkBytes([]byte("jodis_auth")),
redis.NewBulkBytes([]byte(p.config.JodisAuth)),
redis.NewBulkBytes([]byte("jodis_timeout")),
redis.NewBulkBytes([]byte(p.config.JodisTimeout.Duration().String())),
redis.NewBulkBytes([]byte("jodis_compatible")),
redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.JodisCompatible))),
})
case "proxy":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("proxy_datacenter")),
redis.NewBulkBytes([]byte(p.config.ProxyDataCenter)),
redis.NewBulkBytes([]byte("proxy_max_clients")),
redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))),
redis.NewBulkBytes([]byte("proxy_max_offheap_size")),
redis.NewBulkBytes([]byte(p.config.ProxyMaxOffheapBytes.HumanString())),
redis.NewBulkBytes([]byte("proxy_heap_placeholder")),
redis.NewBulkBytes([]byte(p.config.ProxyHeapPlaceholder.HumanString())),
})
case "backend_ping_period":
return redis.NewBulkBytes([]byte(p.config.BackendPingPeriod.Duration().String()))
case "backend_buffer_size":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("backend_recv_bufsize")),
redis.NewBulkBytes([]byte(p.config.BackendRecvBufsize.HumanString())),
redis.NewBulkBytes([]byte("backend_send_bufsize")),
redis.NewBulkBytes([]byte(p.config.BackendSendBufsize.HumanString())),
})
case "backend_timeout":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("backend_recv_timeout")),
redis.NewBulkBytes([]byte(p.config.BackendRecvTimeout.Duration().String())),
redis.NewBulkBytes([]byte("backend_send_timeout")),
redis.NewBulkBytes([]byte(p.config.BackendSendTimeout.Duration().String())),
})
case "backend_max_pipeline":
return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendMaxPipeline)))
case "backend_primary_only":
return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly)))
case "max_slot_num":
return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.MaxSlotNum)))
case "backend_replica_parallel":
return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendReplicaParallel)))
case "backend_keepalive_period":
return redis.NewBulkBytes([]byte(p.config.BackendKeepAlivePeriod.Duration().String()))
case "backend_number_databases":
return redis.NewBulkBytes([]byte(strconv.FormatInt(int64(p.config.BackendNumberDatabases), 10)))
case "session_buffer_size":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("session_recv_bufsize")),
redis.NewBulkBytes([]byte(p.config.SessionRecvBufsize.HumanString())),
redis.NewBulkBytes([]byte("session_send_bufsize")),
redis.NewBulkBytes([]byte(p.config.SessionSendBufsize.HumanString())),
})
case "session_timeout":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("session_recv_timeout")),
redis.NewBulkBytes([]byte(p.config.SessionRecvTimeout.Duration().String())),
redis.NewBulkBytes([]byte("session_send_timeout")),
redis.NewBulkBytes([]byte(p.config.SessionSendTimeout.Duration().String())),
})
case "slowlog_log_slower_than":
return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogLogSlowerThan, 10)))
case "slowlog_max_len":
return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogMaxLen, 10)))
case "metrics_report_server":
return redis.NewBulkBytes([]byte(p.config.MetricsReportServer))
case "metrics_report_period":
return redis.NewBulkBytes([]byte(p.config.MetricsReportPeriod.Duration().String()))
case "metrics_report_influxdb":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("metrics_report_influxdb_server")),
redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbServer)),
redis.NewBulkBytes([]byte("metrics_report_influxdb_period")),
redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbPeriod.Duration().String())),
redis.NewBulkBytes([]byte("metrics_report_influxdb_username")),
redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbUsername)),
redis.NewBulkBytes([]byte("metrics_report_influxdb_database")),
redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbDatabase)),
})
case "metrics_report_statsd":
return redis.NewArray([]*redis.Resp{
redis.NewBulkBytes([]byte("metrics_report_statsd_server")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdServer)),
redis.NewBulkBytes([]byte("metrics_report_statsd_period")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPeriod.Duration().String())),
redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)),
})
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand All @@ -233,6 +317,9 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp {
p.mu.Lock()
defer p.mu.Unlock()
switch key {
case "product_name":
p.config.ProductName = value
return redis.NewString([]byte("OK"))
case "proxy_max_clients":
n, err := strconv.Atoi(value)
if err != nil {
Expand All @@ -255,20 +342,6 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp {
}
p.config.SlowlogLogSlowerThan = n
return redis.NewString([]byte("OK"))
case "slowlog_max_len":
n, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return redis.NewErrorf("err:%s", err)
}

if n < 0 {
return redis.NewErrorf("invalid slowlog_max_len")
}
p.config.SlowlogMaxLen = n
if p.config.SlowlogMaxLen > 0 {
SlowLogSetMaxLen(p.config.SlowlogMaxLen)
}
return redis.NewString([]byte("OK"))
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down
63 changes: 1 addition & 62 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,8 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
}
index := getWholeCmd(r.Multi, cmd)
cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
log.Warnf("%s", cmdLog)
if s.config.SlowlogMaxLen > 0 {
SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog})
}
}
return nil
})
Expand Down Expand Up @@ -328,8 +324,6 @@ func (s *Session) handleRequest(r *Request, d *Router) error {
return s.handleRequestExists(r, d)
case "PCONFIG":
return s.handlePConfig(r)
case "PSLOWLOG":
return s.handlePSlowLog(r)
case "SLOTSINFO":
return s.handleRequestSlotsInfo(r, d)
case "SLOTSSCAN":
Expand Down Expand Up @@ -726,61 +720,6 @@ func (s *Session) flushOpStats(force bool) {
}
}

func (s *Session) handlePSlowLog(r *Request) error {
if len(r.Multi) < 2 || len(r.Multi) > 4 {
r.Resp = redis.NewErrorf("ERR slowLog parameters")
return nil
}
var subCmd = strings.ToUpper(string(r.Multi[1].Value))
switch subCmd {
case "GET":
if len(r.Multi) == 3 {
num, err := strconv.ParseInt(string(r.Multi[2].Value), 10, 64)
if err != nil {
r.Resp = redis.NewErrorf("ERR invalid slowLog number")
break
}

r.Resp = SlowLogGetByNum(num)
} else if len(r.Multi) == 4 {
var (
id int64
num int64
err error
)
id, err = strconv.ParseInt(string(r.Multi[2].Value), 10, 64)
if err != nil {
r.Resp = redis.NewErrorf("ERR invalid slowLog start logId")
break
}
num, err = strconv.ParseInt(string(r.Multi[3].Value), 10, 64)
if err != nil {
r.Resp = redis.NewErrorf("ERR invalid slowLog number")
break
}

r.Resp = SlowLogGetByIdAndNUm(id, num)
} else {
r.Resp = SlowLogGetByNum(10)
}
case "LEN":
if len(r.Multi) == 2 {
r.Resp = SlowLogGetLen()
} else {
r.Resp = redis.NewErrorf("ERR slowLog parameters")
}
case "RESET":
if len(r.Multi) == 2 {
r.Resp = SlowLogReset()
} else {
r.Resp = redis.NewErrorf("ERR slowLog parameters")
}
default:
r.Resp = redis.NewErrorf("ERR Unknown SLOWLOG subcommand or wrong args. Try GET, RESET, LEN.")
}
return nil
}

func (s *Session) handlePConfig(r *Request) error {
if len(r.Multi) < 2 || len(r.Multi) > 4 {
r.Resp = redis.NewErrorf("ERR config parameters")
Expand Down
Loading

0 comments on commit 28831ef

Please sign in to comment.