Skynet源码之:服务管理(6)

JavenLaw

在开始介绍服务管理的代码之前,我想解释一下:为什么我们称它为服务管理,而实际代码文件命名却是handle.c?在Skynet中有众多的的服务ctx,这个ctx是个复杂的结构,而如何管理众多的服务ctx是我们需要考虑的问题。最简单的办法就是给这些服务ctx进行编号,我们根据编号来查找,增加,删除,修改这些服务

而这些handle就是每个服务ctx的编号,handle和服务ctx唯一对应,能通过handle找到服务ctx这个结构的所有信息,handle像一个名单,管理着所有的服务ctx因此我们称handle.c模块为服务管理


理解handle_storage结构体

1,让我们看看首先在哪里调用的skynet_handle_init()代码

在skynet_start.c文件中,第272行,调用skynet_handle_init(config->harbor),代码如下:

// 用于记录 服务名字 和 服务handle 的映射关系
struct handle_name {
	char * name; // 服务名字
	uint32_t handle; // 服务handle
};

// H的定义:H是一个指针,指向的地址存储着handle_storage这个结构体
static struct handle_storage *H = NULL; // 全局单例

void 
skynet_handle_init(int harbor) {
	assert(H==NULL);
	struct handle_storage * s = skynet_malloc(sizeof(*H));
	s->slot_size = DEFAULT_SLOT_SIZE; // DEFAULT_SLOT_SIZE == 4
	s->slot = skynet_malloc(s->slot_size * sizeof(struct skynet_context *));
	memset(s->slot, 0, s->slot_size * sizeof(struct skynet_context *));
	
	rwlock_init(&s->lock);
	// reserve 0 for system
	s->harbor = (uint32_t) (harbor & 0xff) << HANDLE_REMOTE_SHIFT; // HANDLE_REMOTE_SHIFT == 24
    // harbor == 16777216,二进制为 1000000000000000000000000
	s->handle_index = 1;
	s->name_cap = 2;
	s->name_count = 0;
	s->name = skynet_malloc(s->name_cap * sizeof(struct handle_name));
	
	H = s; // 赋值给全局单例H
	
	// Don't need to free H
}

那handle_storage的结构又是怎么样的呢?

struct handle_storage {
	struct rwlock lock; // 读写锁

	uint32_t harbor; // skynet的节点id
	uint32_t handle_index; // 记录下一个能用的handle
	int slot_size; // 记录最大能容纳多少数量的服务
	struct skynet_context ** slot;
    // slot 属于数组指针,代表这个指针指向的是一个数组
    // 这个数组里每个元素存储的数据,也是一个指针
    // 数组元素里存放的指针,最后指向 真正存放skynet_context结构体的地方
    // 属于二级指针数组
	
	int name_cap; // 名字最大容量
	int name_count; // 名字现有数量
	struct handle_name *name;
};

那我们该如何理解这个handle_storage结构呢?

先简单理解一下harbor

假设你和朋友是个土地管理者

你管理着A(0001),B(0002),C(0003),D(0004)一共4块土地

朋友也管理着A(0001),B(0002),C(0003),D(0004)一共4块土地

某天有个人拿到一个地址:C(0003),问这个地址究竟是哪里啊?是你管理的C(0003),还是朋友管理的C(0003)?

因此,我们在初始化skynet_handle_init(int harbor)时候,传入harbor,用于标记节点,这个节点就是skynet的一个进程(详细的介绍看harbor部分)

后来你管理的就变成了A(10001),B(10002),C(10003),D(10004)一共4块土地,harbor为1,你管理的土地前都是1开头

朋友管理的变成了A(20001),B(20002),C(20003),D(20004)一共4块土地,harbor为2,朋友管理的土地前都是2开头

harbor的设计,详细请看《Skynet源码之:节点建立》

继续了解handle_storage结构体

既然你要管理A(10001),B(10002),C(10003),D(10004)一共4块土地

就一定需要一个结构体来记录这些土地的信息,并且这个结构体的生命周期是伴随整个程序的

struct handle_storage * s = skynet_malloc(sizeof(*H)); // 分配handle_storage内存

handle_storage结构体就像一个名单,用来记录所有的服务,在 skynet_malloc分配内存后,就开始初始化:

void 
skynet_handle_init(int harbor) {
	assert(H==NULL);
	struct handle_storage * s = skynet_malloc(sizeof(*H)); // 分配handle_storage内存
    
    // ----- 开始初始化 -----
    
	s->slot_size = DEFAULT_SLOT_SIZE; // DEFAULT_SLOT_SIZE的值为4,即管理的服务数量
    // 重点:分配一个容量为4的指针数组,s->slot是指向这个数组的指针
    // 数组元素存放的是一个指针,该指针 真正指向struct skynet_context的地址
    // 即为二级指针数组
	s->slot = skynet_malloc(s->slot_size * sizeof(struct skynet_context *));
	memset(s->slot, 0, s->slot_size * sizeof(struct skynet_context *)); // 全部置为0
	
	rwlock_init(&s->lock);
	// reserve 0 for system 0这个index是为系统保留的,不使用
    // harbor == 16777216,二进制为 1000000000000000000000000
	s->harbor = (uint32_t) (harbor & 0xff) << HANDLE_REMOTE_SHIFT; // 设置harbor_id
	s->handle_index = 1; // 0是Skynet保留的,跳过0,直接使用1作为第一个能用的handle
	s->name_cap = 2; // 设置名字容量为2
	s->name_count = 0; // 已设置的名字数量为0
    // 分配 名字容量 * sizeof(struct handle_name) 大小的内存
    // 后面用于存放 服务名字 和 服务handle 的映射关系
	s->name = skynet_malloc(s->name_cap * sizeof(struct handle_name)); 
	
    // ----- 初始化结束 -----
    
	H = s; // 赋值给全局单例H
	
	// Don't need to free H
}

在分配内存块后,得到struct handle_storage * s,此时设置了服务ctx的数量DEFAULT_SLOT_SIZE,也就是管理4个服务

然后最重要的来了:s->slot 的结构是:struct skynet_context ** slot,也就是说,slot指针并不是直接指向skynet_context的地址,而是一个二级指针数组!

这样理解:slot指向一个数组,数组元素中存放的数据是真正指向skynet_context的指针


如何绑定服务handle和ctx

2,对于一个空白的handle_storage结构体,分配了4个位置在slot,接下来就看看如何绑定handle和服务ctx了

// 将需要绑定的服务ctx,记录在 handle_storage.slot 中,并返回一个handle
uint32_t
skynet_handle_register(struct skynet_context *ctx) {
	struct handle_storage *s = H; // 拿到handle_storage结构体

	rwlock_wlock(&s->lock); // 先上锁,免得被其他人干扰
	
	for (;;) { // 如果服务数量非常庞大,这个循环执行的时间将会非常久
		int i;
		uint32_t handle = s->handle_index; //找到能用的handle,看来handle_index就是记录下一个可以使用的handle而已
		for (i=0;i<s->slot_size;i++,handle++) {
			if (handle > HANDLE_MASK) { // HANDLE_MASK == 0xffffff == 16777215
				// 0 is reserved,跳过0,直接使用1作为第一个能用的handle
				handle = 1;
                // 这里handle发生了回溯, 0xffffff的值为16777215,一般一个进程不可能会有这么多服务,内存不够用
                // 但是服务不断退出,又开启新服务,可能会导致handle用完,产生回溯的问题
                // 产生回溯就直接使用 handle == 1
			}
            // 注意这部分代码:
            // 并没有使用handle作为index来访问slot,而是进行了一次hash
            // 而实际却是:即使hash了,得到的hash值也是和handle一模一样的
            // 那么我们为什么还要再执行一次 int hash = handle & (s->slot_size-1)呢?
            // 云风的实现:
            // 		云风的blog:https://blog.codingnow.com/2015/04/handlemap.html
            // 		github仓库:https://github.com/cloudwu/handlemap
            // 网友的分析:
            // 		https://wmf.im/p/skynet%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E5%85%ADhandle%E7%9A%84%E7%AE%A1%E7%90%86/
            // 		https://www.jianshu.com/p/ed04d9c700ed
            
			int hash = handle & (s->slot_size-1); // hash一次就是确保取到一个小于slot_size-1的值
			if (s->slot[hash] == NULL) { // 找到空位,可以存放ctx
				s->slot[hash] = ctx; // 把 ctx 放进数组里面
				s->handle_index = handle + 1; // 把handle+1,留给下一个服务使用
	
				rwlock_wunlock(&s->lock);
				
                // harbor == 1000000000000000000000000
				handle |= s->harbor; // 或操作,就是给每个handle加上harbor_id, 即1000000000000000000000001
				return handle; // 返回handle值
			}
		}
		assert((s->slot_size*2 - 1) <= HANDLE_MASK); // 判断一下slot_size的容量是否小于HANDLE_MASK == 16777215
		struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));// 发生扩容,扩容2倍
		memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *)); // 把新申请的内存都置为0
        
		for (i=0;i<s->slot_size;i++) { // 把旧的handle移动到新的内存
            // 注意:在这里只有旧的handle存在,才会移动
            // 如果旧的handle已经被置为NULL,那么新的内存就会放着不动
			if (s->slot[i]) {
				int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1); // 重新计算hash值
                // 这里有个特别注意的地方:
                // 旧的hash值是怎么样,新的hash值就是怎么样
                // handle 为 20,hash = handle & (32-1),值为20&31 = 20 (旧容量)
                // handle 为 20,hash = handle & (64-1),值为20&63 = 20 (新容量)
                // 因此旧hash和新hash是一样的
                // 
                // 除此之外,假设 20 21 22这3个位置被使用了,后来某个21的服务被退出
                // 那么在新容量中 21 的位置也是一直存在的,会一直空着
                // 即 新的内存放着不会动
				assert(new_slot[hash] == NULL);
				new_slot[hash] = s->slot[i]; // 把ctx从 旧的位置 挪到 新的位置
			}
		}
		skynet_free(s->slot); // 要释放掉旧的内存
		s->slot = new_slot; // 赋值新的内存
		s->slot_size *= 2; // 设置容量为旧的2倍
	}
}


如何解绑服务handle和ctx

3,再来看看如何解绑handle和服务ctx

// 将需要绑定的handle传入,删除handle在 handle_storage.slot 中的记录
int
skynet_handle_retire(uint32_t handle) {
	int ret = 0;
	struct handle_storage *s = H; // 拿到handle_storage结构体
	
	rwlock_wlock(&s->lock); // 先上锁,免得被其他人干扰
	
	uint32_t hash = handle & (s->slot_size-1); // 计算出此handle的hash值
	struct skynet_context * ctx = s->slot[hash]; // 获得数组的值,即读取出 指向服务ctx的指针
	
	if (ctx != NULL && skynet_context_handle(ctx) == handle) { // 最后要再确认一次
		s->slot[hash] = NULL; // 直接清空数组的值,即清空指向 ctx地址 的指针
		ret = 1;
		int i;
		int j=0, n=s->name_count; // 这部分清理服务的命名
		for (i=0; i<n; ++i) {
			if (s->name[i].handle == handle) {
				skynet_free(s->name[i].name); // 释放名字内存
				continue;
			} else if (i!=j) {
				s->name[j] = s->name[i]; // 往前移动位置,免得出现空位
			}
			++j;
		}
		s->name_count = j;
	} else {
		ctx = NULL;
	}
	
	rwlock_wunlock(&s->lock);
	
	if (ctx) {
		// release ctx may call skynet_handle_* , so wunlock first.
		skynet_context_release(ctx); // 调用服务ctx的释放接口
	}
	
	return ret;
}

特别说明

从服务handle的注册和释放我们可以看到,为了实现handle不重复的目的,即使新分配的内存块,handle释放了,数组空间也是会占用在那里,不能再使用

虽然设计handle不重复是必要的,假设:服务A设定了一个定时器,在N小时后触发,但中间服务A退出,如果此时服务B重启时候用了服务A的handle,等定时器一到,就会把消息发给了服务B,造成错乱。虽然我们的handle在设计上也是会回溯的,但是handle的数量非常庞大,单台机器单个进程不太可能支持这么多服务需要的内存。即使服务handle会回溯,也要在一定时间后才会发生,到那个时候某些服务也早已经应该退出

但我们依然要控制服务的数量,因为新服务在找空位置handle的时候,是通过for的循环来寻找,即:假设我们启动了10000个服务,其中 2001 ~ 8000的服务退出了,实际工作的服务只有1 ~ 2000 和 8001 ~ 10000,也就是说共4000个服务工作。此时如果要启动新的服务,就需要循环10000次,才能找到 第10001这个能用的handle

服务管理的设计,限制了我们开启服务的数量,另外服务本身的建立也是繁琐的流程,会有很大耗时

之前有个热更方案:每次玩家退出时,都把对应的agent服务退出。新玩家进来时,再启动一个新服务agent。当需要热更的时候,就调用cacheclear()清除掉旧代码,这样新启动的agent服务就会加载新的代码,从而完成热更

现在看来这个方案不太可行,在服务管理的handle这个模块就会占用很多的位置,会导致for循环到后面耗时很长,并且每次启动新agent服务都要建立新的agent这也会增加skynet_malloc和skynet_free的调用,导致内存分配太过频繁,效率降低甚至卡住。比较好的做法是:建立一定数量的agent,即使玩家退出,这个服务也不应该退出,只是清理当前玩家数据即可。简单说就是:做一个服务池,每次玩家登录就从服务池中取一个服务出来;当玩家退出时,把玩家数据清理掉之后还可以继续给下一个玩家用

优化方案:在每个agent服务启动的时候,agent都会去查找配置的conf_code_version,并把它记录在字段中。当玩家退出时,agent会提前检查一下自身的self_code_version和配置的conf_code_version,如果此时 self_code_version 和 conf_code_version 相同,则表明没有热更新,此时服务就不退出,而是放进服务池,等待服务下一个玩家。如果 self_code_version 和 conf_code_version 不相同,那表明已经有了热更新版本,此时agent不放回服务池,而是自己直接退出这样的优点是:在没有热更新的时候,服务是可以重新使用的,也就是不会占用太多的handle,服务并不会每次都退出,就不会导致每次退出-启动都产生新的handle。例如:产生了1000个服务,此时没有热更新,那么这个handle数量就维持在1000。当热更新一次,这1000个服务就需要退出,此时就会造成1000个handle_id不能再使用,再新建1000个服务会占用 1001 ~ 2000的范围,会导致for一共2000次。特别需要注意的是:服务池1000个服务,400个分配出去使用,更新后,400个服务自己能检测到更新,因此会在退出时会自己销毁。剩余的600个服务依然放在服务池,因此需要玩家从服务池获取服务的时候,再次检查一次,self_code_version 和 conf_code_version是否相同,如果不相同,则需要立马启动一个新服务给玩家,并把这个服务立即销毁。另外一点还需要保证的是:执行codeclear() 和 conf_code_version 的时机要确保同步,这点需要仔细设计

热更新最好的方案是:开启进程A和进程B的方案来更新,即把热更新的代码在进程B中启动,并把进程A的流量关闭,通过流量控制,把流量导入到进程B,等到进程A中的玩家全部退出后,再重启进程A,从而完成全部的更新


退出全部服务

4,如果想要把全部服务都退出呢?

void 
skynet_handle_retireall() {
	struct handle_storage *s = H; // 拿到handle_storage结构体
    // 为什么这里不需要上锁呢?而是在for循环里上锁?
	for (;;) {
		int n = 0;
		int i;
		for (i=0;i<s->slot_size;i++) {
			rwlock_rlock(&s->lock); // 为什么在这里上锁?
			struct skynet_context * ctx = s->slot[i];
			uint32_t handle = 0;
			if (ctx) {
				handle = skynet_context_handle(ctx); // 根据服务ctx,拿到handle
				++n;
			}
			rwlock_runlock(&s->lock);
			if (handle != 0) {
				skynet_handle_retire(handle); // 调用函数,一个一个轮流解绑 服务ctx和handle 的关系
			}
		}
		if (n==0)
			return;
	}
}

// 在 skynet_handle_retireall() 函数中,做了遍历 handle_storage 结构体中的 slot 数组的操作
// 需要在遍历的过程中获取每个槽位对应的 skynet_context 上下文,并进行相应的处理
// 为了保证数据的一致性和线程安全,需要使用读写锁 rwlock 对 handle_storage 结构体进行保护

// 在函数的开始处,获取了 handle_storage 结构体的指针 s ,此时还没有开始遍历 slot 数组
// 接下来进入一个无限循环,每次循环中都会遍历 slot 数组
// 在每次遍历之前,通过调用 rwlock_rlock(&s->lock) 上读锁,对 handle_storage 结构体进行保护
// 这是因为在遍历 slot 数组的过程中,需要访问其中的元素,而读锁可以允许多个线程同时读取数据,不会互斥
// 在遍历 slot 数组的过程中,先获取当前槽位的 skynet_context 上下文,并根据上下文获取对应的 handle
// 这里的目的是判断该槽位是否存在有效的上下文,如果有,则需要进行相应的处理,否则继续遍历下一个槽位
// 
// 在获取完 handle 后,通过调用 rwlock_runlock(&s->lock) 解读锁,释放对 handle_storage 结构体的保护
// 这样其他线程就可以在获取读锁后继续访问 handle_storage
// 
// 判断获取的 handle 是否为 0
// 如果不为 0,则调用 skynet_handle_retire(handle) 函数进行处理,解绑服务上下文和 handle 的关系
// 但 skynet_handle_retire(handle) 函数处理的时间可能非常长
// 所以前一步,一旦获取到 handle 后,就先立马解开读锁
// 这就解释了为什么不能在一开始就加锁
// 
// 循环结束后,如果遍历过程中没有发现有效的上下文,即 `n==0`,则函数返回
//
// 总结起来,就是为了减少锁的粒度,避免过多的锁竞争,这样可以在保证数据一致性的同时,提高并发性能


通过handle查询服务ctx

5,那我们该如何通过handle查询到服务ctx呢?

struct skynet_context * 
skynet_handle_grab(uint32_t handle) {
	struct handle_storage *s = H; // 拿到handle_storage结构体
	struct skynet_context * result = NULL;
	
	rwlock_rlock(&s->lock); // 先上锁,免得被其他人干扰
	
	uint32_t hash = handle & (s->slot_size-1); // 计算出hash值
	struct skynet_context * ctx = s->slot[hash]; // 获得指向 服务ctx地址 的指针
	if (ctx && skynet_context_handle(ctx) == handle) { // 再确认一下服务ctx内的handle与查找的handle对不对得上
		result = ctx; // 赋值给result
		skynet_context_grab(result); // 既然你来寻找服务ctx的地址,你肯定是要引用它,那我需要把此服务ctx的引用+1
	}
	
	rwlock_runlock(&s->lock);
	
	return result; // 返回结果
}


给服务handle命名

6,虽然我们可以使用handle来标记服务ctx,但是程序员在写代码的时候,是比较麻烦的:1是不好记忆这个handle是哪个服务,2是你也不确定下次这个服务的handle还是不是这个,导致代码没法写。就像DNS系统一样,我们只需要记住www.skynet.com就行,不需要记住具体的ip地址。我们需要一个名字系统,把字符的名字映射到对应的handle,handle再找到具体的服务ctx。这就是skynet的命名API,代码如下:

const char * 
skynet_handle_namehandle(uint32_t handle, const char *name) {
	rwlock_wlock(&H->lock);

	const char * ret = _insert_name(H, name, handle); // 把 服务名字 和 服务handle 插入记录中的具体实现
	
	rwlock_wunlock(&H->lock);
	
	return ret;
}

看看具体实现:

// 实际就是采用二分法,把名字插入进去
static const char *
_insert_name(struct handle_storage *s, const char * name, uint32_t handle) {
	int begin = 0;
	int end = s->name_count - 1;
	while (begin<=end) {
		int mid = (begin+end)/2;
		struct handle_name *n = &s->name[mid];
		int c = strcmp(n->name, name);
		if (c==0) {
			return NULL;
		}
		if (c<0) {
			begin = mid + 1;
		} else {
			end = mid - 1;
		}
	}
	char * result = skynet_strdup(name); // 其实就是把 字符串name 复制到 一块新的内存
	
	_insert_name_before(s, result, handle, begin); // 实际插入名字列表中进行记录
	
	return result;
}

// 把 字符串 复制到 一块新的内存
char *
skynet_strdup(const char *str) {
	size_t sz = strlen(str);
	char * ret = skynet_malloc(sz+1);
	memcpy(ret, str, sz+1);
	return ret;
}

static void
_insert_name_before(struct handle_storage *s, char *name, uint32_t handle, int before) {
	if (s->name_count >= s->name_cap) {
		s->name_cap *= 2;
        
        // HANDLE_MASK   ==   16777216
        // MAX_SLOT_SIZE == 1073741824
        // 为什么handle的数量限制是 16777216,名字容量限制却是 1073741824
		assert(s->name_cap <= MAX_SLOT_SIZE); 
		struct handle_name * n = skynet_malloc(s->name_cap * sizeof(struct handle_name));
		int i;
		for (i=0;i<before;i++) {
			n[i] = s->name[i];
		}
		for (i=before;i<s->name_count;i++) {
			n[i+1] = s->name[i];
		}
		skynet_free(s->name);
		s->name = n;
	} else {
		int i;
		for (i=s->name_count;i>before;i--) {
			s->name[i] = s->name[i-1];
		}
	}
	s->name[before].name = name;
	s->name[before].handle = handle;
	s->name_count ++;
}

看看handle_name的结构:

struct handle_name {
	char * name; // 命名的名字
	uint32_t handle; // 服务的handle
};

再来回顾一下handle_storage的结构:

struct handle_storage {
	struct rwlock lock;
	
	uint32_t harbor; // 本进程所属的节点
	uint32_t handle_index; // 用于记录下一个能用的handle
	int slot_size; // 用于记录服务的最大容量
	struct skynet_context ** slot; // 属于指针数组,该值指向一个数组,数组元素存放的数据是真正指向skynet_context的指针
	
	int name_cap; // 设置服务命名的容量
	int name_count; // 设置已经命名的数量
	struct handle_name *name; // 名称和handle的结构体
};


通过name查询服务ctx

7,既然可以注册名字,那就可以通过名字来查找handle,Skynet的API如下:

uint32_t 
skynet_handle_findname(const char * name) {
	struct handle_storage *s = H; // 拿到handle_storage结构体
	
	rwlock_rlock(&s->lock);
	
	uint32_t handle = 0;
	
    // 开始二分法进行查找
	int begin = 0;
	int end = s->name_count - 1;
	while (begin<=end) {
		int mid = (begin+end)/2;
		struct handle_name *n = &s->name[mid];
		int c = strcmp(n->name, name);
		if (c==0) {
			handle = n->handle;
			break;
		}
		if (c<0) {
			begin = mid + 1;
		} else {
			end = mid - 1;
		}
	}
	
	rwlock_runlock(&s->lock);
	
	return handle;
}

为什么没有通过handle查找name呢?

我猜想大概率是几乎上层业务用不到,一般都是根据服务名字找具体的handle

底层的设计更不依赖名字,而是直接使用handle

为什么命名功能没有取消操作呢?

大概率是因为handle本身就是不重复的

因此一旦确定了handle和name的关系,就不需要变动了

如果一个name经常会对应不同的服务,那么在上层设计的时候

这个服务就不应该被命名,而是应该直接使用handle记录

被命名的服务一定是关键的,特殊的,稳定的,不易变动的

到这里服务管理handle_storage的全部内容就在这里了

服务管理handle_storage结构体,其实就是一块内存,里面存放着所有服务和handle的映射关系

实现了注册,注销,根据handle找到服务的功能

同时还增加了服务命名的功能

实现了命名,根据服务name找到handle的功能(没有取消命名的操作)


总结

下面是优化还需要斟酌:

参看:云风的 BLOG: skynet 服务启动优化 (codingnow.com)

云风的 BLOG: skynet 下的用户登陆问题 (codingnow.com)

1,对于服务管理handle_storage -> name_cap的设置,默认值为2(特别注意name_cap必须要是2的次方数)

而实际情况却是,当你启动Skynet时,自己业务逻辑几乎什么都不启动,这个命名服务就被使用了4个

这当中就触发了一次新的skynet_malloc,把容量name_cap*2,并且把数据搬迁到新的内存

优化办法:

在你业务代码已经稳定的情况下,即你大概可以确定有N个服务是需要命名的(开发测试阶段大概确定N)

然后你再返回Skynet的源码,修改服务管理handle_storage -> name_cap的设置为N,再编译(上线运营阶段编译)

这样几乎在这份代码的运行中,除了刚开始启动Skynet进程时,才需要分配一定量的内存

后面增加数据也不会导致扩容的发生,并且不会触发数据的迁移和复制

也有利于防止内存碎片的产生

2,对于服务管理handle_storage -> slot的设置,默认值为4(特别注意slot必须要是2的次方数)

而实际情况却是,当你启动Skynet时,自己业务逻辑几乎什么都不启动,基础服务就被启动了15个

这当中就触发了2次新的skynet_malloc,把容量slot*2之后,再slot * 2,并且发生了2次把数据迁移复制到新内存的行为

优化办法:

在你业务代码已经稳定的情况下,即你大概可以确定有N个服务是需要启动的(开发测试阶段)

例如你大概能确定这个进程容纳的服务是多少,例如这个进程承载多少玩家,就不会再让玩家进入了

几乎每个游戏服都是会有人数限制的,那这就意味着服务的数量几乎也是确定的

此时我们需要给这个服务数量定一个范围,多预留100个位置即可

那么在新的服务创建的时候,这个速度就不会发生扩容,迁移复制数据的操作

以上2点的优化还存在一个问题:

每个进程的作用都是不相同的,这意味着每个进程的服务数量都不一样,我们该如何设定name_cap和slot的值呢?

比较好的做法是:在配置中由用户进行配置,像harbor一样把值传进去

这样每个进程启动时,都能读取到属于自己配置的name_cap和slot

类似的优化还有:

进程A负责的内容比较复杂,开启的线程也比较多

进程B负责的内容没那么重要,也不繁忙,服务数量也不多

但是进程A和进程B被安排在同一台物理机器上运行

此时完全可以把进程B的线程数量设置的稍微小一些

先看看云风的blog,最后才能确定优化对不对:

云风的 BLOG: skynet 并发模型的一点改进思路 (codingnow.com)

云风的 BLOG: 关于 skynet 调度器的一点想法(续) (codingnow.com)