Skynet源码之:服务实现(14)

JavenLaw

服务实现的复杂性

服务实现是最为复杂的一个部分,其复杂不是在于本身的代码,而是服务几乎把所有的模块都联系到了一起

从消息队列message_queue,服务管理handle_storage,模块加载modules,监视器monitor,Skynet中API的实现

以及c-lua的调用,lualib的调用等,各种命令的传递,都深度关涉到服务实现

可以说服务实现是使用这些部分的一个综合协调体

​ 服务会使用模块加载的创建,初始化,释放,通知

​ 服务会使用服务管理的创建,释放,查找,命名

​ 服务会使用消息队列的发消息,收消息,处理消息

​ 服务还会被worker线程获取执行,被监视器monitor监控

​ 服务更需要实现对Skynet中各类API接口的封装

​ 同时服务也被网络通信绑定

​ 也关系到lua协程、以及任务的执行

对此,我会尽最大的努力来描述,并尽可能地把上面的模块串联起来


第一个启动的服务

随着以下步骤的实现,我们迎来了第一个服务的启动

if (config->daemon) { // 守护进程
	if (daemon_init(config->daemon)) {
		exit(1);
	}
}
skynet_harbor_init(config->harbor); // 节点建立
skynet_handle_init(config->harbor); // 服务管理
skynet_mq_init(); // 消息队列
skynet_module_init(config->module_path);// 模块加载
skynet_timer_init(); // 定时器
skynet_socket_init(); // 网络
skynet_profile_enable(config->profile); // 性能监控

第一个启动的就是logger服务

struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
	fprintf(stderr, "Can't launch %s service\n", config->logservice);
	exit(1);
}


服务启动函数

skynet_context_new() 函数用来新建一个服务,实现如下:

// 位于skynet_server.c文件中

struct skynet_context * 
skynet_context_new(const char * name, const char *param) {
    
    // 第一步:查找模块,详细见《Skynet源码之:模块加载》
	struct skynet_module * mod = skynet_module_query(name);

	if (mod == NULL)
		return NULL;

    // 第二步:模块实例化,详细见《Skynet源码之:模块加载》
    void *inst = skynet_module_instance_create(mod);
    if (inst == NULL)
        return NULL;
    
    // 第三步:分配服务内存
    struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
    
    // 这个宏是在开发阶段检查框架是否有bug而存在的
    // 详细见:https://github.com/cloudwu/skynet/discussions/1753
    CHECKCALLING_INIT(ctx)
	
    // 第四步:对服务结构体进行一些赋值
    ctx->mod = mod;
    ctx->instance = inst;
    
    // 对服务的引用进程初始化,并设置为2
    // 详细见《Skynet专题之:原子操作》
    // 这里为什么要设置为2呢?后面会再说
    ATOM_INIT(&ctx->ref, 2);
    ctx->cb = NULL;
    ctx->cb_ud = NULL;
    ctx->session_id = 0;
    
    // 对服务的日志打印进行初始化
    // 详细见《Skynet专题之:原子操作》
    // 这里可以全局搜索:logfile,也能了解其用法
    // 或参考:https://github.com/cloudwu/skynet/discussions/1851
    // 这是有关debug_console中logon和logoff命令的实现
    ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
	
    ctx->init = false;
    
    // 有关debug_console中endless命令的实现
    ctx->endless = false;

    ctx->cpu_cost = 0;
    ctx->cpu_start = 0;
    ctx->message_count = 0;
    ctx->profile = G_NODE.profile;
    // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
    ctx->handle = 0;	
    ctx->handle = skynet_handle_register(ctx);
    struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
    
    // init function maybe use ctx->handle, so it must init at last
    // 往全局管理中添加服务节点,详细见《Skynet源码之:环境准备》
    context_inc();

    CHECKCALLING_BEGIN(ctx)
    int r = skynet_module_instance_init(mod, inst, ctx, param);
    CHECKCALLING_END(ctx)
    
    if (r == 0) {
        struct skynet_context * ret = skynet_context_release(ctx);
        if (ret) {
            ctx->init = true;
        }
        // 这里为什么需要把次级消息队列放入全局队列中呢?
        // 在skynet_mq.c文件中,第85行,有明确的注释
        // When the queue is create (always between service create and service init) ,
		// set in_global flag to avoid push it to global queue .
		// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
        // 详细请看《Skynet源码之:消息队列》
        skynet_globalmq_push(queue);
        if (ret) {
            skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
        }
        return ret;
    } else {
        skynet_error(ctx, "FAILED launch %s", name);
        uint32_t handle = ctx->handle;
        skynet_context_release(ctx);
        skynet_handle_retire(handle);
        struct drop_t d = { handle };
        skynet_mq_release(queue, drop_message, &d);
        return NULL;
    }
}


查找服务模块

函数:skynet_module_query()

​ 首先是skynet_module结构体,在skynet_module.h文件中定义

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 最后,我们得到一个指向 struct skynet_module * mod 的指针,被赋值给mod

代码:

struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
		return NULL;


创建模块实例化

函数:skynet_module_instance_create()

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 但是不同的模块具有不同的 module_create() 函数

​ 重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》

​ 最后,我们得到一个指向 该服务的实例 的指针,被赋值给inst

代码:

void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
	return NULL;


服务结构体

先来看看服务的结构体以及它的赋值

// 服务结构体
struct skynet_context {
	void * instance; // 指向服务的实例,即前面的inst
	struct skynet_module * mod; // 指向服务所属的模块,即前面的mod
	void * cb_ud;
	skynet_cb cb; // 属于服务的回调函数
	struct message_queue *queue; // 属于服务的次级消息队列
	ATOM_POINTER logfile; // 用于日志打印,详细见《Skynet源码之:日志打印》
	uint64_t cpu_cost;	// in microsec,统计此服务花费的cpu时间总数
	uint64_t cpu_start;	// in microsec,记录服务处理某条消息时的开始时间,等消息处理完毕时,会根据 cpu_end - cpu_start 来计算
	char result[32]; // 用于存放stat命令的结果,展示给控制台
	uint32_t handle; // 属于此服务的handle,在一个Skynet节点中,handle是唯一的
	int session_id; // 此服务消息序列的id
	ATOM_INT ref; // 原子性操作,用于记录服务的引用(非常重要,引用计数关系着服务内存的释放)
	int message_count; // 服务消息的总数
	bool init; // 服务是否完成初始化
	bool endless; // 服务是否陷入死循环
	bool profile; // 服务是否开启性能监控
	
	CHECKCALLING_DECL
};


注册服务

函数:skynet_handle_register()

详细见《Skynet源码之:服务管理》,这里面管理着所有的服务

代码:

// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
// 需要先把handle置为0,表明这是个空置的服务
// 因为在skynet_handle_register(ctx)注册的时候,可能遇到服务全部退出的情况
// 所以设置为0之后,不至于skynet_handle_retireall()找不到对应的handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);


创建消息队列

函数:skynet_mq_create()

详细见《Skynet源码之:消息队列》,这里负责全局消息队列和次级消息队列的管理

代码:

struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);


模块初始化

函数:skynet_module_instance_init()

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 但是不同的模块具有不同的 module_init() 函数

​ 重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》

​ 最后,可以判断该模块是否初始化成功

代码:

// 这里传入了几个参数
// mod:刚才skynet_module_query()查出的模板
// inst:刚才skynet_module_instance_create()创建的实列化
// ctx:就是本服务的结构体
// param:需要传给模板实列的参数
int r = skynet_module_instance_init(mod, inst, ctx, param);

// 注意:
// 		param:这个参数的是根据不同的模板,具有不同的值
// 例如:
// 		service_logger模板,传入的参数是:config->logger,即是用户配置的日志路径和名字
// 		service_snlua模板,传入的参数是:需要启动的服务脚本的名字,即是name.lua中的名字


总结

至此,一个服务的启动就已经完成了

但这还是远远不够的:

​ 1,skynet_context结构体中的 void * cb_ud 和 skynet_cb cb字段是什么作用?如何赋值的?

​ 2,我们启动一个服务,都是通过skynet.newservice()实现的,底层的机制是什么呢?

​ 3,启动后的服务,是如何处理消息的呢?

​ 4,服务中调用lua层,c层的命令,又是如何实现的呢?

一切的一切,都需要归结到《Skynet源码之:service_snlua》中去解释


Skynet的API

在写完本章和《Skynet源码之:service_snlua》之后,我的脑子已经一团浆糊

主要是关系的东西太多太多了,不知道从何写起了,即使自己心中知道实现,但就是凑不起来

因为确实很多很杂,有时候关系lua-c的交互,有时候又到定时器去了

接着又是协程的执行,以及服务之间各种调用和封装,还有消息的注册打包之类的

再加上消息队列等,还有各种坑点和使用注意事项

我想了一个办法:围绕skynet的API来描述,应该是最为清晰的

主要特别关注:skynet.lua 文件 和 manager.lua

计算机里没有黑魔法,不要急,慢慢来,总有一天我会搞懂的


有关服务启动

​ 1,skynet.newservice()

​ 2,skynet.lanuch()

​ 3,bootstrap.lua

​ 4,skynet.init_service

​ 5,skynet.init()

​ 6,skynet.start()

​ 7,skynet.require()


skynet.call()

我们看下 skynet.call() 的实现

-- skynet.lua文件 第708行
function skynet.call(addr, typename, ...)
local tag = session_coroutine_tracetag[running_thread]
if tag then
	c.trace(tag, "call", 2)
	c.send(addr, skynet.PTYPE_TRACE, 0, tag)
end

-- 先找到此消息类型的处理模块
-- p的内容详细可以看下一段
local p = proto[typename] 
    
local session = auxsend(addr, p.id , p.pack(...))
if session == nil then
	error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end

p的内容如下

-- skynet.lua文件中第1017行
-- 注册用户自己的消息类型以及对应的处理
-- register protocol
do
	local REG = skynet.register_protocol

    -- 默认注册了对lua消息的处理
    REG {
        name = "lua",
        id = skynet.PTYPE_LUA,
        pack = skynet.pack, -- 注册pack函数
        unpack = skynet.unpack, -- 注册unpack函数
    }

    REG {
        name = "response",
        id = skynet.PTYPE_RESPONSE,
    }

    REG {
        name = "error",
        id = skynet.PTYPE_ERROR,
        unpack = function(...) return ... end,
        dispatch = _error_dispatch,
    }
end

function skynet.register_protocol(class)
	local name = class.name
	local id = class.id
	assert(proto[name] == nil and proto[id] == nil)
	assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
	proto[name] = class
	proto[id] = class
end

再来看看

-- skynet.lua文件中第691行
-- 最终都会调用到c层面的函数实现
skynet.pack = assert(c.pack)
skynet.packstring = assert(c.packstring)
skynet.unpack = assert(c.unpack)
skynet.tostring = assert(c.tostring)
skynet.trash = assert(c.trash)

再温习一下skynet.start()中经常会使用的函数

function skynet.dispatch(typename, func)
	local p = proto[typename]
	if func then
		local ret = p.dispatch
		p.dispatch = func
		return ret
	else
		return p and p.dispatch
	end
end

-- 也就是说,在启动服务的时候,我们一般会注册skynet.dispatch(typename, func)
-- 并且因为 skynet 默认有 lua protocol ,所以我们不用调用 skynet.register_protocol
-- 
-- 如果你要注册自己的消息类型,一般需要
-- skynet.register_protocol()
-- skynet.dispatch()

回顾完之前的代码,我们可以看看auxsend(addr, p.id , p.pack(…))的实现

auxsend = auxsend_checkrewind

而auxsend_checkrewind的实现是

local function auxsend_checkrewind(addr, proto, msg, sz)
	local session = csend(addr, proto, nil, msg, sz) -- 可以看到最后还是调用到csend
	if session and session > dangerzone_low and session <= dangerzone_up then
		-- enter dangerzone
		set_checkconflict(session)
	end
	return session
end

现在重要的问题来到了 c 层面的 lsend,本质就是lua-skynet.c文件中的 send_message() 函数

static int
send_message(lua_State *L, int source, int idx_type) {
	struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
	uint32_t dest = (uint32_t)lua_tointeger(L, 1);
	const char * dest_string = NULL;
	if (dest == 0) {
		if (lua_type(L,1) == LUA_TNUMBER) {
			return luaL_error(L, "Invalid service address 0");
		}
		dest_string = get_dest_string(L, 1);
	}

    int type = luaL_checkinteger(L, idx_type+0);
    int session = 0;
    if (lua_isnil(L,idx_type+1)) {
        type |= PTYPE_TAG_ALLOCSESSION;
    } else {
        session = luaL_checkinteger(L,idx_type+1);
    }

    int mtype = lua_type(L,idx_type+2);
    switch (mtype) {
    case LUA_TSTRING: {
        size_t len = 0;
        void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
        if (len == 0) {
            msg = NULL;
        }
        // ***************************************************************************************
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type, session , msg, len);
        } else {
            session = skynet_send(context, source, dest, type, session , msg, len);
        }
        // 重点代码在这里
        // ***************************************************************************************
        break;
    }
    case LUA_TLIGHTUSERDATA: {
        void * msg = lua_touserdata(L,idx_type+2);
        int size = luaL_checkinteger(L,idx_type+3);
        if (dest_string) {
            session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        } else {
            session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
        }
        break;
    }
    default:
        luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
    }
    if (session < 0) {
        if (session == -2) {
            // package is too large
            lua_pushboolean(L, 0);
            return 1;
        }
        // send to invalid address
        // todo: maybe throw an error would be better
        return 0;
    }
    lua_pushinteger(L,session);
    return 1;
}

为什么我们需要2个函数来实现 send_message () ?即:

if (dest_string) {
      session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
      session = skynet_send(context, source, dest, type, session , msg, len);
 }

我们可以查看 skynet_sendname() 的实现知道,最终 skynet_sendname() 函数做了一些处理,例如对:

skynet.call(":00008","lua",) // 对本地服务发送消息,但是地址形式是十六进制:":00008"

skynet.call(".cslave","lua",) // 对本地服务发送消息,但是形式是字符串形式:".cslave"

// 如果以上的格式都不是,那就是可能“@remote_service”的格式
// 此时就应该直接发送个harbor服务,由harbor节点发送给远程的节点

最终,skynet_sendname() 都是把 “:00008” 或者 “.cslave” 转化为地址,再最后调用 skynet_send()

我们来看看 skynet_send() 的实现

int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
	if ((sz & MESSAGE_TYPE_MASK) != sz) {
		skynet_error(context, "The message to %x is too large", destination);
		if (type & PTYPE_TAG_DONTCOPY) {
			skynet_free(data);
		}
		return -2;
	}
	_filter_args(context, type, &session, (void **)&data, &sz);

    if (source == 0) {
        source = context->handle;
    }

    if (destination == 0) {
        if (data) {
            skynet_error(context, "Destination address can't be 0");
            skynet_free(data);
            return -1;
        }

        return session;
    }
    if (skynet_harbor_message_isremote(destination)) { // 判断是否是远程消息
        struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
        rmsg->destination.handle = destination;
        rmsg->message = data;
        rmsg->sz = sz & MESSAGE_TYPE_MASK;
        rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
        skynet_harbor_send(rmsg, source, session);
    } else {// 本地消息
        struct skynet_message smsg;
        smsg.source = source;
        smsg.session = session;
        smsg.data = data;
        smsg.sz = sz;

        if (skynet_context_push(destination, &smsg)) {
            skynet_free(data);
            return -1;
        }
    }
    return session;
}

​ 2,skynet.send()

​ 3,skynet.pack()

​ 4,skynet.register_protocol()

​ 5,skynet.dispatch_message()

​ 6,raw_dispatch_message()

​ 7,skynet.request()

​ 8,skynet.response()

​ 9,skynet.ret()


有关功能

​ 1,skynet.timeout()

​ 2,skynet.error()

​ 3,skynet.sleep()

​ 4,skynet.wait()

​ 5,skynet.yield()

流程总结

对于服务的实现来说,有太多的东西需要说,除了有关模块的准备外

下面我总结一下重要的流程

1,服务是如何被worker线程获取的:《Skynet源码之:进程启动》

2,服务的创建是怎么样进行的:《Skynet源码之:服务实现》

3,服务是如何工作的:《Skynet源码之:service_snlua》

4,服务的消息是如何流转的

​ 1,worker线程在全局消息队列中,获取到服务的次级消息队列

​ 2,从次级消息队列中,获取到一个消息msg

​ 3,通过 dispatch_message() 函数(skynet_server.c文件),调用到服务建立时绑定的cb函数

​ 4,服务的cb函数跟服务创建时的模板有关,一般为snlua

​ 5,而snlua模板启动的服务,cb函数一般会绑定为在lua层的 skynet.dispatch_message() 函数(skynet.lua文件)

​ 6,最后交由 raw_dispatch_message() 函数解析消息,并根据消息session_id,获得对应的协程

注意:

​ 这里要着重提一下 raw_dispatch_message() 函数的实现

​ 详情可见《Skynet源码之:service_snlua》中的dispatch_message部分

​ 1,很多服务在启动时,会使用skynet.register_protocol() 函数来注册协议,例如

skynet.register_protocol {
    name = "SYSTEM",
    id = skynet.PTYPE_SYSTEM,
    unpack = function(...) return ... end,
    dispatch = function()
        -- reopen signal
        print("SIGHUP")
    end
}

​ 此时能,就会在skynet.lua中注册相对类型的处理函数

function skynet.register_protocol(class)
	local name = class.name
	local id = class.id
	assert(proto[name] == nil and proto[id] == nil)
	assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
	proto[name] = class
	proto[id] = class
end

​ 2,最后在 raw_dispatch_message() 函数就会作出区分

-- 根据注册的消息类型,获取用户的注册函数
local p = proto[prototype]
if p == nil then
    if prototype == skynet.PTYPE_TRACE then
        -- trace next request
        trace_source[source] = c.tostring(msg,sz)
    elseif session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, prototype)
    end
    return
end

-- 那么这个p.dispatch又是哪里注册的呢?
-- 就是我们经常在main.lua文件中看到的
-- 某个main.lua文件
skynet.start(function()
	skynet.dispatch("lua", function(_,_, command, ...)
		skynet.trace()
		local f = CMD[command]
		skynet.ret(skynet.pack(f(...)))
	end)
end)

-- 具体实现
-- skynet.lua文件
function skynet.dispatch(typename, func)
	local p = proto[typename]
	if func then
		local ret = p.dispatch
		p.dispatch = func
		return ret
	else
		return p and p.dispatch
	end
end

-- skynet.lua文件
-- raw_dispatch_message()中的部分代码
local f = p.dispatch
if f then
    local co = co_create(f)
    session_coroutine_id[co] = session
    session_coroutine_address[co] = source
    local traceflag = p.trace
    if traceflag == false then
        -- force off
        trace_source[source] = nil
        session_coroutine_tracetag[co] = false
    else
        local tag = trace_source[source]
        if tag then
            trace_source[source] = nil
            c.trace(tag, "request")
            session_coroutine_tracetag[co] = tag
        elseif traceflag then
            -- set running_thread for trace
            running_thread = co
            skynet.trace()
        end
    end
    suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
    trace_source[source] = nil
    if session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, proto[prototype].name)
    end
end

以上,整个服务的消息的执行流程都总结完毕了