|
Jul
02
|
在前一篇文章 RPC程序的一般结构 - nfsd 讲到了 RPC 程序的一般结构.
svc_recv 函数从所有的连接(svc_xprt)里尝试读取一个完整的请求(svc_rqst), 不过, 函数返回时, 可能没有任何就绪的请求, 所以, nfsd 代码中通过一个 while 循环读取请求.
while ((err = svc_recv(rqstp, 60*60*HZ)) == -EAGAIN) ;
rpc 程序是多线程的, 所以代码中不少地方用到了锁. 主线程之外的线程作为生产者, 将就绪(可读, 已关闭等)的连接加入到 xprt_queue, 在主线程中 svc_recv 作为消费者, 一次从队列弹出一条连接, 尝试读取数据.
spin_lock_bh(&pool->sp_lock); xprt = svc_xprt_dequeue(pool); spin_unlock_bh(&pool->sp_lock);
加锁, 然后弹出一条就绪的连接.
if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { dprintk("svc_recv: found XPT_CLOSE\n"); svc_delete_xprt(xprt);
如果连接设置了 XPT_CLOSE 位, 说明连接已经关闭, 将其删除.
} else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { struct svc_xprt *newxpt; newxpt = xprt->xpt_ops->xpo_accept(xprt); if (newxpt) { spin_lock_bh(&serv->sv_lock); set_bit(XPT_TEMP, &newxpt->xpt_flags); list_add(&newxpt->xpt_list, &serv->sv_tempsocks); spin_unlock_bh(&serv->sv_lock); svc_xprt_received(newxpt); } svc_xprt_received(xprt);
如果这是一条监听连接, 则调用 xpo_accept 接受一条新连接. 这种操作和 socket 操作几乎是一样的. 新连接被加到队列中, 同样, 需要锁.
} else { rqstp->rq_deferred = svc_deferred_dequeue(xprt); if (rqstp->rq_deferred) { svc_xprt_received(xprt); len = svc_deferred_recv(rqstp); } else len = xprt->xpt_ops->xpo_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); }
最后, 连接中有数据需要读取, 调用 xpo_recvfrom 方法读取数据. xpo_recvfrom 方法的返回值指示已读取的字节数, 或者指示请求是否已经读取完整. 如果没有读取完整, 返回 -EAGAIN 作为 svc_recv 的返回值.

Recent Comments