当前位置:首页 » 其他

腾讯协程库libco的原理分析

2015-09-23 10:17 本站整理 浏览(198)

我们以example_echosvr.cpp为例子,这个例子的作用就是读取客户端的请求,然后按原样返回给客户端。在它的main函数里创建了proccnt进程,每个进程里有cnt个task。这里的proccnt和cnt都是来自运行参数。

[code]for(int i = 0; i < cnt; i++) {
    task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
    task->fd = -1;

    co_create( &(task->co),NULL,readwrite_routine,task );
    co_resume( task->co );
}
在整个的架构里面,task的概念其实就是协程,协程通过stCoRoutine_t结构来描述,就像我们进程的task_struct一样,保存着运行时,关于协程运行环境的所有信息,所以每个task有一个叫做co的成员。对于函数co_create来说,它的第三个和第四个参数,代表了该协程运行入口函数及其所需的参数。跟pthread_create的参数有些类似,需要指定入口函数等信息。

看看co_create函数做了什么:

[code]int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
    if( !co_get_curr_thread_env() ) {
        co_init_curr_thread_env();
    }

    stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(),pfn,arg );
    *ppco = co;

    return 0;
}
在每个task运行之初,需要对当前的task环境进行初始化。函数co_init_curr_thread_env是对每个进程里的协程环境进行初始化。每个进程内通过stCoRoutineEnv_t结构来表示环境的总体信息。

[code]struct stCoRoutineEnv_t
{
    stCoRoutine_t *pCallStack[ 128 ];  // 一个进程里总共维护的128个协程
    int iCallStackSize;                // 每个协程的栈大小
    stCoEpoll_t *pEpoll;               // 所有协程公用的epoll实例
};
co_init_curr_thread_env函数的核心部分:

[code]        // env为struct stCoRoutineEnv_t类型
        struct stCoRoutine_t *self = co_create_env( env,NULL,NULL );

        self->cIsMain = 1;

// 每个协程里有个coctx_t类型的成员,这个属于协程较底层的支撑信息,后面细讲
    coctx_init( &self->ctx );

// 调用co_init_curr_thread_env时,pCallStack里面是空的,
// 这里的self是指当前的进程,也就是说当前的进程也算是协程的一种
    env->pCallStack[ env->iCallStackSize++ ] = self;

        // 申请epoll结构
    stCoEpoll_t *ev = AllocEpoll();

        // 与当前env绑定
    SetEpoll( env,ev );
co_create_env函数的细节:

[code]struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env,pfn_co_routine_t pfn,void *arg )
{
    stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );

    memset( lp,0,(long)((stCoRoutine_t*)0)->sRunStack );

// 每一个协程都指向进程内的env
    lp->env = env;
    lp->pfn = pfn;
    lp->arg = arg;

// 每个协程拥有128KB的私有栈
    lp->ctx.ss_sp = lp->sRunStack ;
    lp->ctx.ss_size = sizeof(lp->sRunStack) ;

    return lp;
}
接下来分析函数co_resume。

[code]void co_resume( stCoRoutine_t *co )
{
    stCoRoutineEnv_t *env = co->env;

//  在前面的描述里,当前pCallStack目前只有一个co,就是当前的进程,记得前面在
//  co_init_curr_thread_env函数里的self吗?
//  取到当前正在运行的co,也就是变量lpCurrRoutine所保存到值
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];

// 首次运行需要设置一些环境
    if ( !co->cStart ) {
// 设置运行环境的上下文信息,主要是在协程切换时一些需要的寄存器和栈等。
// 其中第二个参数是协程开始运行的入口函数,其实里面实际调用的函数co->pfn,
// 也就是co_create里设置的。
        coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
        co->cStart = 1;
    }

//  将要运行的协程入栈
    env->pCallStack[ env->iCallStackSize++ ] = co;

// 切换上下文,lpCurrRoutine挂起,co开始运行,运行点就是CoRoutineFunc。
// 细节暂时略过
    coctx_swap( &(lpCurrRoutine->ctx),&(co->ctx) );
}
在co_resume中我们看到,该函数会将执行权限切换到指定的协程,当前的协程会被挂起。而运行的协程,会在适当的时期交出执行权,否则会一直执行下去。那么回看前面的这个逻辑:

[code]for(int i=0;i<cnt;i++) {
    task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
    task->fd = -1;

    co_create( &(task->co),NULL,readwrite_routine,task );
    co_resume( task->co );
}
主进程执行cnt次循环,来启动cnt个协程。每次调用co_resume时,主进程便会挂起,直到被激活运行的协程执行到某个步骤后,它主动放弃CPU,把执行权再次交给主进程。

那么在每个task(协程)内,到底做了哪些动作呢?

[code]static void *readwrite_routine( void *arg )
{

    co_enable_hook_sys();

    task_t *co = (task_t*)arg;
    char buf[ 1024 * 16 ];
    for(;;)
    {
// 初始化阶段,fd为-1,表示当前没有就绪的任务需要处理
        if( -1 == co->fd )
        {     
// 当前协程入队列。这个动作的意义在于,当有任务要处理时,
// 从g_readwrite里依次取出,并分配任务给他们去执行。
            g_readwrite.push( co );
// 挂起当前协程,让出执行权给其他协程。
// 原则很简单,就是让上次挂起的协程执行,可以认为是返回到上次执行的运行点。
            co_yield_ct();
            continue;
        }
                ...
}
回到主进程,在启动了cnt个task之后的处理:

[code]for(int k=0;k<proccnt;k++) {
     ....
    stCoRoutine_t *accept_co = NULL;

// 启动一个协程专门做accept
    co_create( &accept_co,NULL,accept_routine,0 );

// accept协程会一直接受新连接,直到它交出执行权,才会重新回到主进程
    co_resume( accept_co );

    co_eventloop( co_get_epoll_ct(),0,0 );

    exit(0);
}
看看accept_routine的内部:

[code]static void *accept_routine( void * )
{

    for(;;) {
// 如果工作协程队列为空,就等待1秒或者等再来事件,重试
        if( g_readwrite.empty() ) {
            printf("empty\n"); //sleep
            struct pollfd pf = { 0 };
            pf.fd = -1;
            poll( &pf,1,1000);

            continue;

        }
        struct sockaddr_in addr;
        memset( &addr,0,sizeof(addr) );
        socklen_t len = sizeof(addr);

        int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len);
 // 未就绪,等待下次事件继续处理
        if( fd < 0 ) {
            struct pollfd pf = { 0 };
            pf.fd = g_listen_fd;
            pf.events = (POLLIN|POLLERR|POLLHUP);
// 当前运行在accept协程,co_poll会在等待事件的时候交出cpu,回到主进程
            co_poll( co_get_epoll_ct(),&pf,1,1000 );
            continue;
        }

// Fun!这里工作协程用尽,直接关闭当前连接...
        if( g_readwrite.empty() ) {
            close( fd );
            continue;
        }

// 弹出一个协程,去处理新连接
        SetNonBlock( fd );
        task_t *co = g_readwrite.top();
        co->fd = fd;
        g_readwrite.pop();

// 此时执行权会转移到某个线程,知道它交出cpu,当前协程才会再次执行
        co_resume( co->co );
    }

    return 0;
}
当readwrite_routine和accept_routine都会调用co_poll,但是accept会将执行权交给主进程,而task协程挂起后,执行权则会交给accept协程。这里交给accept协程,是为了进行后续的新连接的接收。那么前面由于执行碰到EAGAIN而挂起的task协程,则通过co_eventloop来驱动继续执行。

主进程中通过co_eventloop来调度事件来驱动各个协程的处理。具体的是通过stTimeoutItem_t结构中的pfnProcess来处理的。代码比较直观,就不细说了。

说完了上层的核心逻辑,我们关注下底层。先注意下co_resume函数:

[code]void co_resume( stCoRoutine_t *co )
{
    stCoRoutineEnv_t *env = co->env;
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
    if( !co->cStart )
    {
        coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
        co->cStart = 1;
    }
    env->pCallStack[ env->iCallStackSize++ ] = co;
    coctx_swap( &(lpCurrRoutine->ctx),&(co->ctx) );
}
其中的调用coctx_make的一些实现涉及到x86_64的架构细节:

16个64位寄存器,分别是:%rax,%rbx,%rcx,%rdx,%esi,%edi,%rbp,%rsp,

%r8,%r9,%r10,%r11,%r12,%r13,%r14,%r15。

其中:%rax 作为函数返回值使用。%rsp 栈指针寄存器,指向栈顶。%rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数。。。

有了上面的说明,我们再看coctx_make函数中,regs[RIP]是执行入口点,它的原型:

[code]static int CoRoutineFunc( stCoRoutine_t *co,void * );
而regs[ RDI ]和regs[ RSI ]就是它的两个参数。而运行的栈帧,通过regs[ RBX ]和regs[ RSP ]来指定。

[code]int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 )
{
        ...
    ctx->regs[ RBX ] = stack + ctx->ss_size - 1;
    ctx->regs[ RSP ] = (char*)(ctx->param) + 8;
    ctx->regs[ RIP ] = (char*)pfn;

    ctx->regs[ RDI ] = (char*)s;
    ctx->regs[ RSI ] = (char*)s1;

    return 0;
}
coctx_swap.S:

[code]#define _esp 0
#define _eip 4

#define _rsp 0
#define _rip 8
#define _rbx 16
#define _rdi 24
#define _rsi 32

.globl coctx_swap 
.type  coctx_swap, @function
coctx_swap:

    leaq -8(%rsp),%rsp
    pushq %rbp
    pushq %r12
    pushq %r13
    pushq %r14
    pushq %r15
    pushq %rdx
    pushq %rcx  
    pushq %r8
    pushq %r9
    leaq 80(%rsp),%rsp  

// rdi是coctx_swap的第一个参数,需要挂起的协程对应的coctx_t结构。
// 最开头的,是5个指针(void *regs[ 5 ]),用来保存一些寄存器信息
    movq %rbx,_rbx(%rdi) // +16 Bytes
    movq %rdi,_rdi(%rdi)  // +24Bytes
    movq %rsi,_rsi(%rdi)   // +32Bytes

    /* 保存coctx_swap第一个参数的运行状态  */
    movq (%rsp), %rcx       // +8Bytes 保存返回地址
    movq %rcx, _rip(%rdi)  
    leaq 8(%rsp), %rcx       // +0Bytes 来保存之前的栈顶
    movq %rcx, _rsp(%rdi)

    /* 将当前的环境设置为 coctx_swap第二个参数提供的值 */
    movq _rip(%rsi), %rcx
    movq _rsp(%rsi), %rsp
    pushq %rcx // 将执行入口点入栈,这样在ret执行之后,会取栈顶指针作为执行点

    movq _rbx(%rsi),%rbx
    movq _rdi(%rsi),%rdi
    movq _rsi(%rsi),%rsi

    leaq -80(%rsp),%rsp
    popq %r9
    popq %r8
    popq %rcx   
    popq %rdx
    popq %r15
    popq %r14
    popq %r13
    popq %r12
    popq %rbp
    leaq 8(%rsp),%rsp   

    xorl %eax, %eax
    ret