服务实现的复杂性
服务实现是最为复杂的一个部分,其复杂不是在于本身的代码,而是服务几乎把所有的模块都联系到了一起
从消息队列message_queue,服务管理handle_storage,模块加载modules,监视器monitor,Skynet中API的实现
以及c-lua的调用,lualib的调用等,各种命令的传递,都深度关涉到服务实现
可以说服务实现是使用这些部分的一个综合协调体
服务会使用模块加载的创建,初始化,释放,通知
服务会使用服务管理的创建,释放,查找,命名
服务会使用消息队列的发消息,收消息,处理消息
服务还会被worker线程获取执行,被监视器monitor监控
服务更需要实现对Skynet中各类API接口的封装
同时服务也被网络通信绑定
也关系到lua协程、以及任务的执行
对此,我会尽最大的努力来描述,并尽可能地把上面的模块串联起来
第一个启动的服务
随着以下步骤的实现,我们迎来了第一个服务的启动
if (config->daemon) { // 守护进程
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor); // 节点建立
skynet_handle_init(config->harbor); // 服务管理
skynet_mq_init(); // 消息队列
skynet_module_init(config->module_path);// 模块加载
skynet_timer_init(); // 定时器
skynet_socket_init(); // 网络
skynet_profile_enable(config->profile); // 性能监控
第一个启动的就是logger服务
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
服务启动函数
skynet_context_new() 函数用来新建一个服务,实现如下:
// 位于skynet_server.c文件中
struct skynet_context *
skynet_context_new(const char * name, const char *param) {
// 第一步:查找模块,详细见《Skynet源码之:模块加载》
struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
return NULL;
// 第二步:模块实例化,详细见《Skynet源码之:模块加载》
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
// 第三步:分配服务内存
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
// 这个宏是在开发阶段检查框架是否有bug而存在的
// 详细见:https://github.com/cloudwu/skynet/discussions/1753
CHECKCALLING_INIT(ctx)
// 第四步:对服务结构体进行一些赋值
ctx->mod = mod;
ctx->instance = inst;
// 对服务的引用进程初始化,并设置为2
// 详细见《Skynet专题之:原子操作》
// 这里为什么要设置为2呢?后面会再说
ATOM_INIT(&ctx->ref, 2);
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
// 对服务的日志打印进行初始化
// 详细见《Skynet专题之:原子操作》
// 这里可以全局搜索:logfile,也能了解其用法
// 或参考:https://github.com/cloudwu/skynet/discussions/1851
// 这是有关debug_console中logon和logoff命令的实现
ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
ctx->init = false;
// 有关debug_console中endless命令的实现
ctx->endless = false;
ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
// 往全局管理中添加服务节点,详细见《Skynet源码之:环境准备》
context_inc();
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
// 这里为什么需要把次级消息队列放入全局队列中呢?
// 在skynet_mq.c文件中,第85行,有明确的注释
// When the queue is create (always between service create and service init) ,
// set in_global flag to avoid push it to global queue .
// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
// 详细请看《Skynet源码之:消息队列》
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}
查找服务模块
函数:skynet_module_query()
首先是skynet_module结构体,在skynet_module.h文件中定义
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
最后,我们得到一个指向 struct skynet_module * mod 的指针,被赋值给mod
代码:
struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
return NULL;
创建模块实例化
函数:skynet_module_instance_create()
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
但是不同的模块具有不同的 module_create() 函数
重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》
最后,我们得到一个指向 该服务的实例 的指针,被赋值给inst
代码:
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
服务结构体
先来看看服务的结构体以及它的赋值
// 服务结构体
struct skynet_context {
void * instance; // 指向服务的实例,即前面的inst
struct skynet_module * mod; // 指向服务所属的模块,即前面的mod
void * cb_ud;
skynet_cb cb; // 属于服务的回调函数
struct message_queue *queue; // 属于服务的次级消息队列
ATOM_POINTER logfile; // 用于日志打印,详细见《Skynet源码之:日志打印》
uint64_t cpu_cost; // in microsec,统计此服务花费的cpu时间总数
uint64_t cpu_start; // in microsec,记录服务处理某条消息时的开始时间,等消息处理完毕时,会根据 cpu_end - cpu_start 来计算
char result[32]; // 用于存放stat命令的结果,展示给控制台
uint32_t handle; // 属于此服务的handle,在一个Skynet节点中,handle是唯一的
int session_id; // 此服务消息序列的id
ATOM_INT ref; // 原子性操作,用于记录服务的引用(非常重要,引用计数关系着服务内存的释放)
int message_count; // 服务消息的总数
bool init; // 服务是否完成初始化
bool endless; // 服务是否陷入死循环
bool profile; // 服务是否开启性能监控
CHECKCALLING_DECL
};
注册服务
函数:skynet_handle_register()
详细见《Skynet源码之:服务管理》,这里面管理着所有的服务
代码:
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
// 需要先把handle置为0,表明这是个空置的服务
// 因为在skynet_handle_register(ctx)注册的时候,可能遇到服务全部退出的情况
// 所以设置为0之后,不至于skynet_handle_retireall()找不到对应的handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
创建消息队列
函数:skynet_mq_create()
详细见《Skynet源码之:消息队列》,这里负责全局消息队列和次级消息队列的管理
代码:
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
模块初始化
函数:skynet_module_instance_init()
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
但是不同的模块具有不同的 module_init() 函数
重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》
最后,可以判断该模块是否初始化成功
代码:
// 这里传入了几个参数
// mod:刚才skynet_module_query()查出的模板
// inst:刚才skynet_module_instance_create()创建的实列化
// ctx:就是本服务的结构体
// param:需要传给模板实列的参数
int r = skynet_module_instance_init(mod, inst, ctx, param);
// 注意:
// param:这个参数的是根据不同的模板,具有不同的值
// 例如:
// service_logger模板,传入的参数是:config->logger,即是用户配置的日志路径和名字
// service_snlua模板,传入的参数是:需要启动的服务脚本的名字,即是name.lua中的名字
总结
至此,一个服务的启动就已经完成了
但这还是远远不够的:
1,skynet_context结构体中的 void * cb_ud 和 skynet_cb cb字段是什么作用?如何赋值的?
2,我们启动一个服务,都是通过skynet.newservice()实现的,底层的机制是什么呢?
3,启动后的服务,是如何处理消息的呢?
4,服务中调用lua层,c层的命令,又是如何实现的呢?
一切的一切,都需要归结到《Skynet源码之:service_snlua》中去解释
Skynet的API
在写完本章和《Skynet源码之:service_snlua》之后,我的脑子已经一团浆糊
主要是关系的东西太多太多了,不知道从何写起了,即使自己心中知道实现,但就是凑不起来
因为确实很多很杂,有时候关系lua-c的交互,有时候又到定时器去了
接着又是协程的执行,以及服务之间各种调用和封装,还有消息的注册打包之类的
再加上消息队列等,还有各种坑点和使用注意事项
我想了一个办法:围绕skynet的API来描述,应该是最为清晰的
主要特别关注:skynet.lua 文件 和 manager.lua
计算机里没有黑魔法,不要急,慢慢来,总有一天我会搞懂的
有关服务启动
1,skynet.newservice()
2,skynet.lanuch()
3,bootstrap.lua
4,skynet.init_service
5,skynet.init()
6,skynet.start()
7,skynet.require()
skynet.call()
我们看下 skynet.call() 的实现
-- skynet.lua文件 第708行
function skynet.call(addr, typename, ...)
local tag = session_coroutine_tracetag[running_thread]
if tag then
c.trace(tag, "call", 2)
c.send(addr, skynet.PTYPE_TRACE, 0, tag)
end
-- 先找到此消息类型的处理模块
-- p的内容详细可以看下一段
local p = proto[typename]
local session = auxsend(addr, p.id , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
p的内容如下
-- skynet.lua文件中第1017行
-- 注册用户自己的消息类型以及对应的处理
-- register protocol
do
local REG = skynet.register_protocol
-- 默认注册了对lua消息的处理
REG {
name = "lua",
id = skynet.PTYPE_LUA,
pack = skynet.pack, -- 注册pack函数
unpack = skynet.unpack, -- 注册unpack函数
}
REG {
name = "response",
id = skynet.PTYPE_RESPONSE,
}
REG {
name = "error",
id = skynet.PTYPE_ERROR,
unpack = function(...) return ... end,
dispatch = _error_dispatch,
}
end
function skynet.register_protocol(class)
local name = class.name
local id = class.id
assert(proto[name] == nil and proto[id] == nil)
assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
proto[name] = class
proto[id] = class
end
再来看看
-- skynet.lua文件中第691行
-- 最终都会调用到c层面的函数实现
skynet.pack = assert(c.pack)
skynet.packstring = assert(c.packstring)
skynet.unpack = assert(c.unpack)
skynet.tostring = assert(c.tostring)
skynet.trash = assert(c.trash)
再温习一下skynet.start()中经常会使用的函数
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
-- 也就是说,在启动服务的时候,我们一般会注册skynet.dispatch(typename, func)
-- 并且因为 skynet 默认有 lua protocol ,所以我们不用调用 skynet.register_protocol
--
-- 如果你要注册自己的消息类型,一般需要
-- skynet.register_protocol()
-- skynet.dispatch()
回顾完之前的代码,我们可以看看auxsend(addr, p.id , p.pack(…))的实现
auxsend = auxsend_checkrewind
而auxsend_checkrewind的实现是
local function auxsend_checkrewind(addr, proto, msg, sz)
local session = csend(addr, proto, nil, msg, sz) -- 可以看到最后还是调用到csend
if session and session > dangerzone_low and session <= dangerzone_up then
-- enter dangerzone
set_checkconflict(session)
end
return session
end
现在重要的问题来到了 c 层面的 lsend,本质就是lua-skynet.c文件中的 send_message() 函数
static int
send_message(lua_State *L, int source, int idx_type) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
const char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}
int type = luaL_checkinteger(L, idx_type+0);
int session = 0;
if (lua_isnil(L,idx_type+1)) {
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}
int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TSTRING: {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
// ***************************************************************************************
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
// 重点代码在这里
// ***************************************************************************************
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
if (session == -2) {
// package is too large
lua_pushboolean(L, 0);
return 1;
}
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
}
为什么我们需要2个函数来实现 send_message () ?即:
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
我们可以查看 skynet_sendname() 的实现知道,最终 skynet_sendname() 函数做了一些处理,例如对:
skynet.call(":00008","lua",) // 对本地服务发送消息,但是地址形式是十六进制:":00008"
skynet.call(".cslave","lua",) // 对本地服务发送消息,但是形式是字符串形式:".cslave"
// 如果以上的格式都不是,那就是可能“@remote_service”的格式
// 此时就应该直接发送个harbor服务,由harbor节点发送给远程的节点
最终,skynet_sendname() 都是把 “:00008” 或者 “.cslave” 转化为地址,再最后调用 skynet_send()
我们来看看 skynet_send() 的实现
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -2;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
if (data) {
skynet_error(context, "Destination address can't be 0");
skynet_free(data);
return -1;
}
return session;
}
if (skynet_harbor_message_isremote(destination)) { // 判断是否是远程消息
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz & MESSAGE_TYPE_MASK;
rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
skynet_harbor_send(rmsg, source, session);
} else {// 本地消息
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
2,skynet.send()
3,skynet.pack()
4,skynet.register_protocol()
5,skynet.dispatch_message()
6,raw_dispatch_message()
7,skynet.request()
8,skynet.response()
9,skynet.ret()
有关功能
1,skynet.timeout()
2,skynet.error()
3,skynet.sleep()
4,skynet.wait()
5,skynet.yield()
流程总结
对于服务的实现来说,有太多的东西需要说,除了有关模块的准备外
下面我总结一下重要的流程
1,服务是如何被worker线程获取的:《Skynet源码之:进程启动》
2,服务的创建是怎么样进行的:《Skynet源码之:服务实现》
3,服务是如何工作的:《Skynet源码之:service_snlua》
4,服务的消息是如何流转的
1,worker线程在全局消息队列中,获取到服务的次级消息队列
2,从次级消息队列中,获取到一个消息msg
3,通过 dispatch_message() 函数(skynet_server.c文件),调用到服务建立时绑定的cb函数
4,服务的cb函数跟服务创建时的模板有关,一般为snlua
5,而snlua模板启动的服务,cb函数一般会绑定为在lua层的 skynet.dispatch_message() 函数(skynet.lua文件)
6,最后交由 raw_dispatch_message() 函数解析消息,并根据消息session_id,获得对应的协程
注意:
这里要着重提一下 raw_dispatch_message() 函数的实现
详情可见《Skynet源码之:service_snlua》中的dispatch_message部分
1,很多服务在启动时,会使用skynet.register_protocol() 函数来注册协议,例如
skynet.register_protocol {
name = "SYSTEM",
id = skynet.PTYPE_SYSTEM,
unpack = function(...) return ... end,
dispatch = function()
-- reopen signal
print("SIGHUP")
end
}
此时能,就会在skynet.lua中注册相对类型的处理函数
function skynet.register_protocol(class)
local name = class.name
local id = class.id
assert(proto[name] == nil and proto[id] == nil)
assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
proto[name] = class
proto[id] = class
end
2,最后在 raw_dispatch_message() 函数就会作出区分
-- 根据注册的消息类型,获取用户的注册函数
local p = proto[prototype]
if p == nil then
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
-- 那么这个p.dispatch又是哪里注册的呢?
-- 就是我们经常在main.lua文件中看到的
-- 某个main.lua文件
skynet.start(function()
skynet.dispatch("lua", function(_,_, command, ...)
skynet.trace()
local f = CMD[command]
skynet.ret(skynet.pack(f(...)))
end)
end)
-- 具体实现
-- skynet.lua文件
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
-- skynet.lua文件
-- raw_dispatch_message()中的部分代码
local f = p.dispatch
if f then
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
以上,整个服务的消息的执行流程都总结完毕了