From 066ca1742d43870bf775c2f7610ddfff540acc0f Mon Sep 17 00:00:00 2001 From: fengfei Date: Sat, 31 Mar 2018 15:53:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nginx.conf | 1 + src/access.lua | 57 +++++++++++++++++++++++++------------------- src/balancer.lua | 26 ++++++++++---------- src/model/api.lua | 14 +++++------ src/model/domain.lua | 10 ++++---- src/model/module.lua | 12 +++++----- src/model/server.lua | 22 ++++++++--------- src/service/agw.lua | 23 +++++++++--------- 8 files changed, 86 insertions(+), 79 deletions(-) diff --git a/nginx.conf b/nginx.conf index 1b3ccb7..1746ae8 100755 --- a/nginx.conf +++ b/nginx.conf @@ -11,6 +11,7 @@ http { keepalive_timeout 65; client_max_body_size 1024m; lua_shared_dict cache 10m; + lua_shared_dict domain_cache 10m; lua_shared_dict session 1m; #换成你的实际路径,这里将源码中src目录加入到 lua_package_path diff --git a/src/access.lua b/src/access.lua index 0007244..650197a 100755 --- a/src/access.lua +++ b/src/access.lua @@ -1,11 +1,7 @@ local _M = {} - -function test_output(content, status) - ngx.header.content_type = 'application/json;charset=UTF-8'; - ngx.say(content) - ngx.status = status - ngx.exit(status) -end +local cjson = require "cjson" +local domain_cache = ngx.shared.domain_cache +local cache = ngx.shared.cache function split(str, delimiter) if str == nil or str == '' or delimiter == nil then @@ -19,28 +15,45 @@ function split(str, delimiter) return result end +-- 重写URL function rewrite(request_uri, reg, original_uri) i, j = string.find(request_uri, reg) if i ~= nil then - local real_uri, index = string.gsub(request_uri, reg, original_uri, 1) + local real_uri, _ = string.gsub(request_uri, reg, original_uri, 1) return real_uri end return nil end +-- 负载均衡,选择服务器 +function select_server(api_info) + local servers = api_info["servers"] + local server_count = table.getn(servers) + + if server_count == 0 then + return nil + end + + -- 简单轮询策略 + local request_index_cache_key = ngx.var.host .. "_request_index_" .. api_info["request_uri"] + local request_index, _ = cache:incr(request_index_cache_key, 1) + if request_index == nil then + request_index = cache:incr(request_index_cache_key, 1, 0, 60) -- 设置一段时间过期 + end + + return servers[request_index % server_count + 1]; --Lua 的 table 索引默认从 1 开始 +end + function _M.dispatch() - local cjson = require "cjson" - local cache = ngx.shared.cache - local config_str = cache:get(ngx.var.host); + local config_str = domain_cache:get(ngx.var.host); if config_str == nil then - config_str = cache:get("localhost"); + config_str = domain_cache:get("localhost"); if config_str == nil then ngx.exit(404) end end local config = cjson.decode(config_str) - local real_uri local api_info local api_uri_array = config["api_uri_array"] @@ -50,7 +63,8 @@ function _M.dispatch() uri = uri .. "?" .. ngx.var.args end - for k, uri_regx in pairs(api_uri_array) do + -- 匹配请求映射获得配置(api_info) + for _, uri_regx in pairs(api_uri_array) do local api_info_t = api_uri_map[uri_regx]; real_uri = rewrite(uri, api_info_t["request_uri"], api_info_t["original_uri"]); if (real_uri ~= nil) then @@ -63,21 +77,14 @@ function _M.dispatch() ngx.exit(404) end - local servers = api_info["servers"] - local server_count = table.getn(servers) + -- TODO:限流,IP黑名单... - if server_count == 0 then + -- 负载均衡,选择服务器开始 + local server = select_server(api_info); + if server == nil then ngx.exit(503) end - local request_index_cache_key = ngx.var.host .. "_request_index_" .. api_info["request_uri"] - local request_index = cache:get(request_index_cache_key) - if request_index == nil then - request_index = 1 - end - cache:set(request_index_cache_key, request_index + 1) - local server = servers[request_index % server_count + 1]; - if api_info["host"] == "localhost" then api_info["host"] = ngx.var.host end diff --git a/src/balancer.lua b/src/balancer.lua index 3382a37..4f854a1 100755 --- a/src/balancer.lua +++ b/src/balancer.lua @@ -1,19 +1,19 @@ local _M = {} function _M.balancing() - local balancer = require "ngx.balancer" - local host = ngx.var.backend_host - local port = ngx.var.backend_port - local ok, err = balancer.set_current_peer(host, port) - if not ok then - ngx.log(ngx.ERR, "failed to set the current peer: ", err) - return ngx.exit(500) - end - ok, err = balancer.set_more_tries(3) - if not ok then - ngx.log(ngx.ERR, "failed to set more tries: ", err) - return ngx.exit(500) - end + local balancer = require "ngx.balancer" + local host = ngx.var.backend_host + local port = ngx.var.backend_port + local ok, err = balancer.set_current_peer(host, port) + if not ok then + ngx.log(ngx.ERR, "failed to set the current peer: ", err) + return ngx.exit(500) + end + ok, err = balancer.set_more_tries(3) + if not ok then + ngx.log(ngx.ERR, "failed to set more tries: ", err) + return ngx.exit(500) + end end return _M \ No newline at end of file diff --git a/src/model/api.lua b/src/model/api.lua index 8f043f3..f8982a2 100755 --- a/src/model/api.lua +++ b/src/model/api.lua @@ -4,7 +4,7 @@ local api_model = {} -- 由于系统都是内部使用,对SQL注入问题没有特殊处理 function api_model.getApis() local db = mysql.getDb() - local apis, err, errno, sqlstate = db:query("select * from agw_api", 10) + local apis, err, _, _ = db:query("select * from agw_api", 10) db:set_keepalive(10000, 10) return apis, err end @@ -12,21 +12,21 @@ end function api_model.add(service_id, request_uri, original_uri, description) local db = mysql.getDb() description = ndk.set_var.set_quote_sql_str(description) - local res, err, errno, sqlstate = db:query("INSERT INTO agw_api (service_id,request_uri,original_uri,description)values(\'"..service_id.."\',\'"..request_uri.."\',\'"..original_uri.."\',"..description..")", 10) + local res, err, _, _ = db:query("INSERT INTO agw_api (service_id,request_uri,original_uri,description)values(\'" .. service_id .. "\',\'" .. request_uri .. "\',\'" .. original_uri .. "\'," .. description .. ")", 10) db:set_keepalive(10000, 10) return res, err end function api_model.delete(aid) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_api WHERE id="..aid, 10) + local res, err, _, _ = db:query("DELETE FROM agw_api WHERE id=" .. aid, 10) db:set_keepalive(10000, 10) return res, err end function api_model.deleteByServiceId(sid) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_api WHERE service_id="..sid, 10) + local res, err, _, _ = db:query("DELETE FROM agw_api WHERE service_id=" .. sid, 10) db:set_keepalive(10000, 10) return res, err end @@ -34,14 +34,14 @@ end function api_model.update(id, request_uri, original_uri, description) local db = mysql.getDb() description = ndk.set_var.set_quote_sql_str(description) - local res, err, errno, sqlstate = db:query("UPDATE agw_api SET request_uri=\'"..request_uri.."\',original_uri=\'"..original_uri.."\',description="..description.." WHERE id="..id, 10) + local res, err, _, _ = db:query("UPDATE agw_api SET request_uri=\'" .. request_uri .. "\',original_uri=\'" .. original_uri .. "\',description=" .. description .. " WHERE id=" .. id, 10) db:set_keepalive(10000, 10) return res, err end function api_model.getApi(id) local db = mysql.getDb() - local apis, err, errno, sqlstate = db:query("SELECT * FROM agw_api WHERE id="..id, 10) + local apis, err, _, _ = db:query("SELECT * FROM agw_api WHERE id=" .. id, 10) api = nil if table.getn(apis) > 0 then api = apis[1] @@ -54,7 +54,7 @@ end function api_model.getServiceApis(sid) local db = mysql.getDb() - local services, err, errno, sqlstate = db:query("SELECT * FROM agw_api WHERE service_id="..sid, 10) + local services, err, _, _ = db:query("SELECT * FROM agw_api WHERE service_id=" .. sid, 10) db:set_keepalive(10000, 10) return services, err end diff --git a/src/model/domain.lua b/src/model/domain.lua index 8534be0..0019901 100755 --- a/src/model/domain.lua +++ b/src/model/domain.lua @@ -3,35 +3,35 @@ local domain_model = {} function domain_model.addDomain(name) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("INSERT INTO agw_domain (name)values(\'"..name.."\')", 10) + local res, err, _, _ = db:query("INSERT INTO agw_domain (name)values(\'" .. name .. "\')", 10) db:set_keepalive(10000, 100) return res, err end function domain_model.delete(id) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_domain WHERE id="..id, 10) + local res, err, _, _ = db:query("DELETE FROM agw_domain WHERE id=" .. id, 10) db:set_keepalive(10000, 100) return res, err end function domain_model.update(id, name) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("UPDATE agw_domain SET name=\'"..name.."\' WHERE id="..id, 10) + local res, err, _, _ = db:query("UPDATE agw_domain SET name=\'" .. name .. "\' WHERE id=" .. id, 10) db:set_keepalive(10000, 100) return res, err end function domain_model.getDomain(id) local db = mysql.getDb() - local service, err, errno, sqlstate = db:query("SELECT * FROM agw_domain WHERE id="..id, 10) + local service, err, _, _ = db:query("SELECT * FROM agw_domain WHERE id=" .. id, 10) db:set_keepalive(10000, 100) return service, err end function domain_model.getDomains() local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("SELECT * FROM agw_domain ORDER BY id ASC", 10) + local res, err, _, _ = db:query("SELECT * FROM agw_domain ORDER BY id ASC", 10) db:set_keepalive(10000, 100) return res, err end diff --git a/src/model/module.lua b/src/model/module.lua index ce23281..39c7abd 100755 --- a/src/model/module.lua +++ b/src/model/module.lua @@ -4,21 +4,21 @@ local service_model = {} function service_model.add(domain_id, name, host, description) description = ndk.set_var.set_quote_sql_str(description) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("INSERT INTO agw_service (domain_id, name,host,description)values(\'"..domain_id.."\',\'"..name.."\',\'"..host.."\',"..description..")", 10) + local res, err, _, _ = db:query("INSERT INTO agw_service (domain_id, name,host,description)values(\'" .. domain_id .. "\',\'" .. name .. "\',\'" .. host .. "\'," .. description .. ")", 10) db:set_keepalive(10000, 100) return res, err end function service_model.deleteByDomainId(did) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_service WHERE domain_id="..did, 10) + local res, err, _, _ = db:query("DELETE FROM agw_service WHERE domain_id=" .. did, 10) db:set_keepalive(10000, 100) return res, err end function service_model.delete(id) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_service WHERE id="..id, 10) + local res, err, _, _ = db:query("DELETE FROM agw_service WHERE id=" .. id, 10) db:set_keepalive(10000, 100) return res, err end @@ -26,14 +26,14 @@ end function service_model.update(id, name, host, description) description = ndk.set_var.set_quote_sql_str(description) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("UPDATE agw_service SET name=\'"..name.."\',host=\'"..host.."\',description="..description.." WHERE id="..id, 10) + local res, err, _, _ = db:query("UPDATE agw_service SET name=\'" .. name .. "\',host=\'" .. host .. "\',description=" .. description .. " WHERE id=" .. id, 10) db:set_keepalive(10000, 100) return res, err end function service_model.getService(id) local db = mysql.getDb() - local services, err, errno, sqlstate = db:query("SELECT * FROM agw_service WHERE id="..id, 10) + local services, err, _, _ = db:query("SELECT * FROM agw_service WHERE id=" .. id, 10) service = nil if table.getn(services) > 0 then service = services[1] @@ -46,7 +46,7 @@ end function service_model.getServices(domain_id) local db = mysql.getDb() - local services, err, errno, sqlstate = db:query("SELECT * FROM agw_service WHERE domain_id="..domain_id, 10) + local services, err, _, _ = db:query("SELECT * FROM agw_service WHERE domain_id=" .. domain_id, 10) db:set_keepalive(10000, 100) return services, err end diff --git a/src/model/server.lua b/src/model/server.lua index 0d7e886..77bc808 100755 --- a/src/model/server.lua +++ b/src/model/server.lua @@ -4,21 +4,21 @@ local server_model = {} function server_model.add(service_id, ip, port, weight, description, protocol) description = ndk.set_var.set_quote_sql_str(description) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("INSERT INTO agw_server(service_id,ip,port,weight,description,protocol)values(\'"..service_id.."\',\'"..ip.."\',\'"..port.."\',\'"..weight.."\',"..description..",\'"..protocol.."\')", 10) + local res, err, _, _ = db:query("INSERT INTO agw_server(service_id,ip,port,weight,description,protocol)values(\'" .. service_id .. "\',\'" .. ip .. "\',\'" .. port .. "\',\'" .. weight .. "\'," .. description .. ",\'" .. protocol .. "\')", 10) db:set_keepalive(10000, 100) return res, err end function server_model.delete(server_id) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_server WHERE id="..server_id, 10) + local res, err, _, _ = db:query("DELETE FROM agw_server WHERE id=" .. server_id, 10) db:set_keepalive(10000, 100) return res, err end function server_model.deleteByServiceId(sid) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("DELETE FROM agw_server WHERE service_id="..sid, 10) + local res, err, _, _ = db:query("DELETE FROM agw_server WHERE service_id=" .. sid, 10) db:set_keepalive(10000, 100) return res, err end @@ -26,14 +26,14 @@ end function server_model.update(server_id, ip, port, weight, description, protocol) description = ndk.set_var.set_quote_sql_str(description) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("UPDATE agw_server SET ip=\'"..ip.."\',port="..port..",protocol=\'"..protocol.."\',weight=\'"..weight.."\',description="..description.." WHERE id="..server_id, 10) + local res, err, _, _ = db:query("UPDATE agw_server SET ip=\'" .. ip .. "\',port=" .. port .. ",protocol=\'" .. protocol .. "\',weight=\'" .. weight .. "\',description=" .. description .. " WHERE id=" .. server_id, 10) db:set_keepalive(10000, 100) return res, err end function server_model.getServers(state) local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE status="..state, 10) + local servers, _, _, _ = db:query("SELECT * FROM agw_server WHERE status=" .. state, 10) db:set_keepalive(10000, 100) if not servers then return @@ -43,20 +43,20 @@ end function server_model.getServiceServers(service_id) local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE service_id="..service_id, 10) + local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE service_id=" .. service_id, 10) return servers, err end function server_model.getServiceServersWithState(service_id, state) local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE service_id="..service_id.." AND status="..state, 10) + local servers, err, _, _ = db:query("SELECT * FROM agw_server WHERE service_id=" .. service_id .. " AND status=" .. state, 10) db:set_keepalive(10000, 100) return servers, err end function server_model.getServer(id) local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE id="..id, 10) + local servers, err, _, _ = db:query("SELECT * FROM agw_server WHERE id=" .. id, 10) db:set_keepalive(10000, 100) server = nil if table.getn(servers) > 0 then @@ -69,7 +69,7 @@ end function server_model.getServersByMid(mid) local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server WHERE mid="..mid, 10) + local servers, _, _, _ = db:query("SELECT * FROM agw_server WHERE mid=" .. mid, 10) db:set_keepalive(10000, 100) if not servers then return @@ -79,7 +79,7 @@ end function server_model.getAllServers() local db = mysql.getDb() - local servers, err, errno, sqlstate = db:query("SELECT * FROM agw_server", 10) + local servers, _, _, _ = db:query("SELECT * FROM agw_server", 10) db:set_keepalive(10000, 100) if not servers then return @@ -89,7 +89,7 @@ end function server_model.updateState(sid, state) local db = mysql.getDb() - local res, err, errno, sqlstate = db:query("UPDATE agw_server SET status="..state.." WHERE id="..sid, 10) + local res, err, _, _ = db:query("UPDATE agw_server SET status=" .. state .. " WHERE id=" .. sid, 10) db:set_keepalive(10000, 100) if not res then ngx.log(ngx.ERR, "update server state err:", err); diff --git a/src/service/agw.lua b/src/service/agw.lua index 12468e7..6ed997c 100755 --- a/src/service/agw.lua +++ b/src/service/agw.lua @@ -4,11 +4,10 @@ local service_model = require "model.module" local server_model = require "model.server" local domain_model = require "model.domain" -local cache = ngx.shared.cache +local cache = ngx.shared.domain_cache local log = ngx.log local ERR = ngx.ERR -local DEBUG = ngx.DEBUG local agw_service = {} function check_state(ip, port) @@ -24,19 +23,19 @@ function check_state(ip, port) end function set_domain_api_info(domainId, domainName) - local services, err = service_model.getServices(domainId) + local services, _ = service_model.getServices(domainId) local service_api_uri_map = {} local service_api_uri_array = {} - for k, service in pairs(services) do + for _, service in pairs(services) do local servers = server_model.getServiceServersWithState(service["id"], 1) local apis = api_model.getServiceApis(service["id"]) - for k, api in pairs(apis) do + for _, api in pairs(apis) do api["servers"] = servers api["host"] = service["host"] if service["host"] == "" then api["host"] = domainName end - local original_uri, index = string.gsub(api["original_uri"], "%$([0-9]+)", "%%%1") + local original_uri, _ = string.gsub(api["original_uri"], "%$([0-9]+)", "%%%1") api["original_uri"] = original_uri service_api_uri_map[api["request_uri"]] = api table.insert(service_api_uri_array, api["request_uri"]) @@ -50,13 +49,13 @@ end function agw_service.syncApiInfo() local cachedDomains = cache:get_keys(0); local new_domain_map = {} - local domains, err = domain_model.getDomains() - for k, domain in pairs(domains) do + local domains, _ = domain_model.getDomains() + for _, domain in pairs(domains) do set_domain_api_info(domain["id"], domain["name"]) new_domain_map[domain["name"]] = domain end -- 清除缓存中已删除的域名配置 - for k, domainName in pairs(cachedDomains) do + for _, domainName in pairs(cachedDomains) do if new_domain_map[domainName] == nil then cache:delete(domainName) end @@ -65,7 +64,7 @@ end function agw_service.checkState() local servers = server_model.getAllServers() - for k, server in pairs(servers) do + for _, server in pairs(servers) do if check_state(server["ip"], server["port"]) == false then log(ERR, "server down:", server["ip"] .. ":" .. server["port"]) server_model.updateState(server["id"], 0) @@ -77,7 +76,7 @@ end function agw_service.getServices(domainId) local services, err = service_model.getServices(domainId) - for k, service in pairs(services) do + for _, service in pairs(services) do local up_servers_count = table.getn(server_model.getServiceServersWithState(service["id"], 1)) local down_servers_count = table.getn(server_model.getServiceServersWithState(service["id"], 0)) local status = "no backend servers" @@ -92,7 +91,7 @@ end function agw_service.deleteDomain(domainId) domain_model.delete(domainId) local services = service_model.getServices(domainId) - for k, service in pairs(services) do + for _, service in pairs(services) do api_model.deleteByServiceId(service["id"]) server_model.deleteByServiceId(service["id"]) end