-
Notifications
You must be signed in to change notification settings - Fork 5
/
nginx_cluster.lua
129 lines (104 loc) · 3.01 KB
/
nginx_cluster.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
local acid_cluster = require( "acid.cluster" )
local impl_ngx = require( "acid.impl_ngx" )
local _M = {}
local function make_loop_member_creator(idents_getter)
local i = 0
return function(impl, paxos, dead_ident, members)
local idents = idents_getter(paxos)
for ii = 1, #idents do
i = i + 1
local new_id = idents[(i % #idents) + 1]
if members[new_id] == nil then
return {[new_id]=members[dead_ident]}
end
end
return nil, 'NoFreeMember'
end
end
local function init_cluster_check(cluster, member_id, interval)
local checker
checker = function(premature)
if premature then
return
end
cluster:member_check(member_id)
local ok, err = ngx.timer.at(interval, checker)
end
local ok, err = ngx.timer.at( 0, checker )
end
function _M.new(opt)
-- opt = {
-- path = 'paxos data path',
-- check_interval = 5,
-- new_member = function() end,
-- cb_commit = funtion(committed) end,
-- }
local check_interval = opt.check_interval or 5
local dead_wait = opt.dead_wait or 60
local impl = impl_ngx.new({})
local cache = {}
impl.sto_base_path = opt.path or ngx.config.prefix()
impl.cb_commit = function (committed)
cache.committed = committed
end
if opt.get_standby ~= nil then
impl.new_member = make_loop_member_creator(opt.get_standby)
else
impl.new_member = function()
return nil, 'NoFreeMember'
end
end
impl.is_data_valid = function(impl, paxos, member)
if opt.is_data_valid then
return opt.is_data_valid()
else
return true
end
end
impl.restore = function(impl, paxos, member)
if opt.restore then
return opt.restore()
else
return nil
end
end
impl.destory = function(impl, paxos)
if opt.destory then
return opt.destory()
else
return nil
end
end
local cluster = acid_cluster.new(impl, {
dead_wait = dead_wait,
admin_lease = opt.admin_lease or (check_interval*2) or 60,
max_dead = opt.max_dead or 1,
})
local mid = {cluster_id=opt.cluster_id, ident=opt.ident}
-- init cache
local paxos, err, errmes = cluster.server:new_paxos(mid)
if err then
return nil, err, errmes
end
cache.committed = paxos:read()
cluster.get = function(_, field_key)
local c = cache.committed or {}
-- _make_get_rst does not return err
local _v = paxos:_make_get_rst(field_key, c)
return _v.val
end
cluster.members = function(_)
local view = cluster:get('view')
if view == nil then
return nil
end
local mems = {}
for id, i in pairs(view[1]) do
mems[i] = id
end
return mems
end
init_cluster_check(cluster, mid, check_interval)
return cluster
end
return _M