Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 0.7.2 #175

Merged
merged 12 commits into from
Sep 29, 2014
8 changes: 8 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
v0.7.2 (2014-9-29)
-----------
* Bugfix : datacenter.wait
* Bugfix : error in forker coroutine
* Add skynet.term
* Accept socket report port
* sharedata can be update more than once

v0.7.1 (2014-9-22)
-----------
* bugfix: wakeup sleep should return BREAK
Expand Down
9 changes: 8 additions & 1 deletion examples/main_log.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
local skynet = require "skynet"
local harbor = require "skynet.harbor"

local function monitor_master()
harbor.linkmaster()
print("master is down")
skynet.exit()
end

skynet.start(function()
print("Log server start")
skynet.monitor "simplemonitor"
local log = skynet.newservice("globallog")
skynet.exit()
skynet.fork(monitor_master)
end)

1 change: 1 addition & 0 deletions examples/watchdog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local gate
local agent = {}

function SOCKET.open(fd, addr)
skynet.error("New client from : " .. addr)
agent[fd] = skynet.newservice("agent")
skynet.call(agent[fd], "lua", "start", gate, fd, proto)
end
Expand Down
3 changes: 0 additions & 3 deletions lualib-src/lua-sharedata.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,6 @@ lupdate(lua_State *L) {
luaL_checktype(L, 3, LUA_TTABLE);
struct ctrl * c= lua_touserdata(L, 1);
struct table *n = lua_touserdata(L, 2);
if (c->update) {
return luaL_error(L, "can't update more than once");
}
if (c->root == n) {
return luaL_error(L, "You should update a new object");
}
Expand Down
9 changes: 5 additions & 4 deletions lualib/http/internal.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ local LIMIT = 8192

local function chunksize(readbytes, body)
while true do
if #body > 128 then
return
end
body = body .. readbytes()
local f,e = body:find("\r\n",1,true)
if f then
return tonumber(body:sub(1,f-1),16), body:sub(e+1)
end
if #body > 128 then
-- pervent the attacker send very long stream without \r\n
return
end
body = body .. readbytes()
end
end

Expand Down
6 changes: 3 additions & 3 deletions lualib/sharedata/corelib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ function meta:__index(key)
local newobj, newtbl = needupdate(self.__gcobj)
if newobj then
local newgcobj = newtbl.__gcobj
-- todo: update
local root = findroot(self)
update(root, newobj, newgcobj)
if obj == self.__obj then
error ("The key [" .. genkey(self) .. "] doesn't exist after update")
end
obj = self.__obj
end
end
local v = index(self.__obj, key)
local v = index(obj, key)
if type(v) == "userdata" then
local r = setmetatable({
__obj = v,
Expand Down Expand Up @@ -127,7 +127,7 @@ end

function conf.update(self, pointer)
local cobj = self.__obj
assert(isdirty(cobj), "Obly dirty object can be update")
assert(isdirty(cobj), "Only dirty object can be update")
core.update(self.__gcobj, pointer, { __gcobj = core.box(pointer) })
end

Expand Down
25 changes: 16 additions & 9 deletions lualib/skynet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ end
function suspend(co, result, command, param, size)
if not result then
local session = session_coroutine_id[co]
local addr = session_coroutine_address[co]
if session ~= 0 then
-- only call response error
c.send(addr, skynet.PTYPE_ERROR, session, "")
if session then -- coroutine may fork by others (session is nil)
local addr = session_coroutine_address[co]
if session ~= 0 then
-- only call response error
c.send(addr, skynet.PTYPE_ERROR, session, "")
end
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
error(debug.traceback(co,tostring(command)))
end
if command == "CALL" then
Expand All @@ -166,7 +168,7 @@ function suspend(co, result, command, param, size)
session_response[co] = true
local ret
if not dead_service[co_address] then
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) >= 0
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
elseif size == nil then
c.trash(param, size)
ret = false
Expand Down Expand Up @@ -200,9 +202,9 @@ function suspend(co, result, command, param, size)
local ret
if not dead_service[co_address] then
if ok then
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, f(...)) >=0
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, f(...)) ~= nil
else
ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "") >=0
ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "") ~= nil
end
else
ret = false
Expand Down Expand Up @@ -329,6 +331,7 @@ function skynet.time()
end

function skynet.exit()
fork_queue = {} -- no fork coroutine can be execute after skynet.exit
skynet.send(".launcher","lua","REMOVE",skynet.self())
-- report the sources that call me
for co, session in pairs(session_coroutine_id) do
Expand Down Expand Up @@ -696,6 +699,10 @@ function skynet.task(ret)
return t
end

function skynet.term(service)
return _error_dispatch(0, service)
end

-- Inject internal debug framework
local debug = require "skynet.debug"
debug(skynet, dispatch_message)
Expand Down
4 changes: 4 additions & 0 deletions lualib/skynet/debug.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ function dbgcmd.RUN(source, filename)
skynet.ret(skynet.pack(table.concat(output, "\n")))
end

function dbgcmd.TERM(service)
skynet.term(service)
end

local function _debug_dispatch(session, address, cmd, ...)
local f = dbgcmd[cmd]
assert(f, cmd)
Expand Down
4 changes: 4 additions & 0 deletions lualib/skynet/harbor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ function harbor.connect(id)
skynet.call(".cslave", "lua", "CONNECT", id)
end

function harbor.linkmaster()
skynet.call(".cslave", "lua", "LINKMASTER")
end

return harbor
10 changes: 9 additions & 1 deletion service/cslave.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local queryname = {}
local harbor = {}
local harbor_service
local monitor = {}
local monitor_master_set = {}

local function read_package(fd)
local sz = socket.read(fd, 1)
Expand Down Expand Up @@ -58,7 +59,7 @@ local function ready()
connect_slave(k,v)
end
for name,address in pairs(globalname) do
skynet.redirect(harbor_service, address, "harbor", "N " .. name)
skynet.redirect(harbor_service, address, "harbor", 0, "N " .. name)
end
end

Expand Down Expand Up @@ -99,6 +100,9 @@ local function monitor_master(master_fd)
end
else
skynet.error("Master disconnect")
for _, v in ipairs(monitor_master_set) do
v(true)
end
socket.close(master_fd)
break
end
Expand Down Expand Up @@ -183,6 +187,10 @@ function harbor.LINK(fd, id)
end
end

function harbor.LINKMASTER()
table.insert(monitor_master_set, skynet.response())
end

function harbor.CONNECT(fd, id)
if not slaves[id] then
if monitor[id] == nil then
Expand Down
6 changes: 3 additions & 3 deletions service/datacenterd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ local function update(db, key, value, ...)
end
end

local function wakeup(db, key1, key2, ...)
local function wakeup(db, key1, ...)
if key1 == nil then
return
end
Expand All @@ -43,7 +43,7 @@ local function wakeup(db, key1, key2, ...)
end
if q[mode] == "queue" then
db[key1] = nil
if key2 then
if select("#", ...) ~= 1 then
-- throw error because can't wake up a branch
for _,response in ipairs(q) do
response(false)
Expand All @@ -53,7 +53,7 @@ local function wakeup(db, key1, key2, ...)
end
else
-- it's branch
return wakeup(q , key2, ...)
return wakeup(q , ...)
end
end

Expand Down
26 changes: 24 additions & 2 deletions service/sharedatad.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local table = table

local NORET = {}
local pool = {}
local pool_count = {}
local objmap = {}

local function newobj(name, tbl)
Expand All @@ -13,6 +14,7 @@ local function newobj(name, tbl)
local v = { value = tbl , obj = cobj, watch = {} }
objmap[cobj] = v
pool[name] = v
pool_count[name] = { n = 0, threshold = 16 }
end

local function collectobj()
Expand Down Expand Up @@ -55,10 +57,11 @@ end
function CMD.delete(name)
local v = assert(pool[name])
pool[name] = nil
pool_count[name] = nil
assert(objmap[v.obj])
objmap[v.obj] = true
sharedata.host.decref(v.obj)
for _,response in ipairs(v.watch) do
for _,response in pairs(v.watch) do
response(true)
end
end
Expand Down Expand Up @@ -86,23 +89,42 @@ function CMD.update(name, t)
objmap[oldcobj] = true
sharedata.host.decref(oldcobj)
pool[name] = nil
pool_count[name] = nil
end
CMD.new(name, t)
local newobj = pool[name].obj
if watch then
sharedata.host.markdirty(oldcobj)
for _,response in ipairs(watch) do
for _,response in pairs(watch) do
response(true, newobj)
end
end
end

local function check_watch(queue)
local n = 0
for k,response in pairs(queue) do
if not response "TEST" then
queue[k] = nil
n = n + 1
end
end
return n
end

function CMD.monitor(name, obj)
local v = assert(pool[name])
if obj ~= v.obj then
return v.obj
end

local n = pool_count[name].n
pool_count[name].n = n + 1
if n > pool_count[name].threshold then
n = n - check_watch(v.watch)
pool_count[name].threshold = n * 2
end

table.insert(v.watch, skynet.response())

return NORET
Expand Down
6 changes: 4 additions & 2 deletions skynet-src/socket_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ open_socket(struct socket_server *ss, struct request_open * request, struct sock
sock = -1;
continue;
}
sp_nonblocking(sock);
break;
}

Expand Down Expand Up @@ -832,7 +831,10 @@ report_accept(struct socket_server *ss, struct socket *s, struct socket_message
result->data = NULL;

void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr;
if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
int sin_port = ntohs((u.s.sa_family == AF_INET) ? u.v4.sin_port : u.v6.sin6_port);
char tmp[INET6_ADDRSTRLEN];
if (inet_ntop(u.s.sa_family, sin_addr, tmp, sizeof(tmp))) {
snprintf(ss->buffer, sizeof(ss->buffer), "%s:%d", tmp, sin_port);
result->data = ss->buffer;
}

Expand Down
15 changes: 15 additions & 0 deletions test/testterm.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local skynet = require "skynet"

local function term()
skynet.error("Sleep one second, and term the call to UNEXIST")
skynet.sleep(100)
local self = skynet.self()
skynet.send(skynet.self(), "debug", "TERM", "UNEXIST")
end

skynet.start(function()
skynet.fork(term)
skynet.error("call an unexist named service UNEXIST, may block")
pcall(skynet.call, "UNEXIST", "lua", "test")
skynet.error("unblock the unexisted service call")
end)