ceph源码中有三种网络通信的实现方式,SimpleMessenger实现比较早,每一对通信的peer之间创建四个线程维护连接状态(每一端两个线程,分别负责读和写), 这样当集群规模上去后,会导致大量的线程被创建。随着linux中epoll的实现,高并发的网络io都是借助于epoll这样的系统调用, 比如libevent库。ceph源码中也基于epoll实现了AsyncMessenger,这有助于减少集群中网络通信所需要的线程数, 目前实现虽然还不太稳定,并不是默认的通信组件,但是未来一定会取代SimpleMessenger。
服务端需要监听端口,等待连接请求到来,然后接受请求建立连接进行通信。
以osd进程为例,在进程启动的过程中,会创建Messenger对象,用于管理网络连接,监听端口,接收请求,源码在文件src/ceph_osd.cc:
int main(int argc, const char **argv)
{
......
// public用于客户端通信
Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
getpid());
// cluster用于集群内部通信
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
getpid());
......
}
// src/msg/Messenger.cc
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce)
{
......
// 在src/common/config_opts.h文件中,目前需要配置async相关选项才会生效
// OPTION(enable_experimental_unrecoverable_data_corrupting_features, OPT_STR, "ms-type-async")
// OPTION(ms_type, OPT_STR, "async")
else if ((r == 1 || type == "async") &&
cct->check_experimental_feature_enabled("ms-type-async"))
return new AsyncMessenger(cct, name, lname, nonce);
......
return NULL;
}
类AsyncMessenger的构造函数需要注意,虽然在osd进程的启动过程中,会创建6个messenger,但是他们全部共享一个WorkerPool,
函数lookup_or_create_singleton_object
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), did_bind(false),
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name); // 创建pool对象, 注意第二个参数是WorkerPool中的静态常量
// 创建一个本地连接对象用于向自己发送消息
local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
init_local_connection(); // 初始化本地对象
}
template<typename T>
void lookup_or_create_singleton_object(T*& p, const std::string &name) {
ceph_spin_lock(&_associated_objs_lock);
if (!_associated_objs.count(name)) { // name决定了一个进程只会有一个pool
p = new T(this); // new一个对象,这里是WorkerPool
_associated_objs[name] = reinterpret_cast<AssociatedSingletonObject*>(p); // 加入map
} else {
p = reinterpret_cast<T*>(_associated_objs[name]);
}
ceph_spin_unlock(&_associated_objs_lock);
}
另外需要注意,这个进程唯一的pool是在messenger的构造函数分配的,messenger的析构函数并不负责释放内存,因为多个messenger共享, 一个messenger销毁了并不代表其他messenger也一定会销毁,这个pool的指针存放在CephContext成员变量_associated_objs中, 因为daemon进程有一个全局唯一的CephContext,当CephContext析构的时候,会释放pool指针的内存。
一个osd进程只会有一个WorkerPool,那这个pool在初始化的时候干什么事情了?顾名思义,Worker的Pool,肯定是用来管理Worker的, 构造函数中恰恰就是新建了Worker类的对象,而Worker类继承于线程类,肯定就是单独干活的线程,源码在文件 src/msg/async/AsyncMessenger.[h|c]中:
WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
barrier_count(0)
{
assert(cct->_conf->ms_async_op_threads > 0);
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
Worker *w = new Worker(cct, this, i); // 新建Worker类对象
workers.push_back(w); // 保存在vector容器中, 用于跟踪所有的worker
}
......
}
class Worker : public Thread { // 继承线程类,说明Worker类单独包含线程
static const uint64_t InitEventNumber = 5000; // 事件个数
static const uint64_t EventMaxWaitUs = 30000000; // 事件最大的等待时间, 30秒
CephContext *cct;
WorkerPool *pool;
bool done;
int id;
public:
EventCenter center; // 事件中心
Worker(CephContext *c, WorkerPool *p, int i)
: cct(c), pool(p), done(false), id(i), center(c) {
center.init(InitEventNumber); // 初始化事件驱动, 实际上就是初始化了epoll相关的结构
}
void *entry();
void stop();
};
为了代码通用,这里单独抽象了一层出来,即EventCenter,用来管理各种事件的驱动,比如epoll, kqueue, select等。 源码在src/msg/async/Event.[h]c]:
class EventCenter {
......
FileEvent *file_events; // 所有io事件
EventDriver *driver; // 具体的驱动
map<utime_t, list<TimeEvent> > time_events; // 所有时间事件
......
};
// EventDriver接口
// epoll的驱动继承此接口,接口的实现就是对epoll三个系统调用epoll_create, epoll_ctl,epoll_wait的封装
class EventDriver {
public:
virtual ~EventDriver() {} // we want a virtual destructor!!!
virtual int init(int nevent) = 0;
virtual int add_event(int fd, int cur_mask, int mask) = 0;
virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
virtual int resize_events(int newsize) = 0;
};
class EpollDriver : public EventDriver {
int epfd; // epoll fd
struct epoll_event *events; // 等待事件的结构体指针,可以查看epoll相关资料
CephContext *cct;
int size;
......
};
Worker构造函数中,调用了center的init函数,看看center.init干了些什么事情?
int EventCenter::init(int n)
{
......
driver = new EpollDriver(cct); // 新建一个驱动对象
int r = driver->init(n); // 初始化具体的驱动
int fds[2]; // pipe用来唤醒worker线程,后文会分析到
if (pipe(fds) < 0) {
lderr(cct) << __func__ << " can't create notify pipe" << dendl;
return -1;
}
notify_receive_fd = fds[0];
notify_send_fd = fds[1];
......
create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify())); // 监听pipe的可读事件
return 0;
}
// 初始化epoll
int EpollDriver::init(int nevent)
{
events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent); // nevent就是Worker类中的InitEventNumber
memset(events, 0, sizeof(struct epoll_event)*nevent);
epfd = epoll_create(1024); // 获取一个epoll fd
size = nevent;
return 0;
}
从osd进程,到AsyncMessenger类,接着到所有messenger共享的WorkerPool,然后初始化进程唯一pool的每个Worker,然后worker中借助于EventCenter统一管理所有事件, 并且初始化了具体的事件处理机制,如epoll,似乎所有工作已经就绪? 其实不然,首先,worker的线程并没有启动,其次,osd进程的messenger也并没有绑定到特定端口进行监听,所以osd启动的过程中,还得有其他步骤。
在messenger创建以后,会设置策略以及限流的参数,接下来就会绑定地址,对网络层套接字的处理,比如socket/bind/listen/accept等,主要是通过类Processor来管理:
// 继续ceph_osd.cc代码
int main(int argc, const char **argv)
{
......
// 设置协议
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
......
// 设置策略以及限流
ms_public->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
client_byte_throttler.get(),
client_msg_throttler.get());
......
// 绑定地址
r = ms_public->bind(g_conf->public_addr);
if (r < 0)
exit(1);
r = ms_cluster->bind(g_conf->cluster_addr);
if (r < 0)
exit(1);
......
ms_public->start(); // 启动线程
......
err = osd->init(); // 这里很关键, 后文分析
......
ms_public->wait(); // 等待线程结束
......
}
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
{
......
// bind to a socket
set<int> avoid_ports;
int r = processor.bind(bind_addr, avoid_ports); // 调用processor对象进行处理
......
}
// processor的处理就是对socket API的封装:socket, bind, listen
// 创建套接字,绑定到特定端口,进行监听
int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
......
listen_sd = ::socket(family, SOCK_STREAM, 0);
......
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());
......
rc = ::listen(listen_sd, 128);
......
msgr->init_local_connection(); // 更新地址,但是因为还没有dispatch对象,不会处理连接
return 0;
}
void init_local_connection() {
Mutex::Locker l(lock);
_init_local_connection();
}
void _init_local_connection() {
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
ms_deliver_handle_fast_connect(local_connection.get());
}
void ms_deliver_handle_fast_connect(Connection *con) {
for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); // fast_dispatchers 目前为空
p != fast_dispatchers.end();
++p)
(*p)->ms_handle_fast_connect(con);
}
在绑定地址进行端口监听以后,就会等着连接到来,要处理连接请求,肯定得创建Worker线程来处理吧?
// ceph_osd.cc 会继续调用messenger->start(), 参见前面代码
int AsyncMessenger::start()
{
lock.Lock();
......
pool->start(); // 启动所有线程
lock.Unlock();
return 0;
}
void WorkerPool::start()
{
if (!started) {
for (uint64_t i = 0; i < workers.size(); ++i) {
workers[i]->create(); // 创建线程
}
started = true;
}
}
// 线程入口函数
void *Worker::entry()
{
......
center.set_owner(pthread_self());
while (!done) { // 线程一直循环处理事件
int r = center.process_events(EventMaxWaitUs); // 借助于事件中心处理事件, 注意最大的等待时间是30秒
}
return 0;
}
// 通过epoll_wait返回所有就绪的fd,然后一次调用其callback
int EventCenter::process_events(int timeout_microseconds)
{
......
vector<FiredFileEvent> fired_events;
next_time = shortest;
numevents = driver->event_wait(fired_events, &tv); // 获取当前的io事件
for (int j = 0; j < numevents; j++) {
int rfired = 0;
FileEvent *event;
{
Mutex::Locker l(file_lock);
event = _get_file_event(fired_events[j].fd);
}
if (event->mask & fired_events[j].mask & EVENT_READABLE) {
rfired = 1;
event->read_cb->do_request(fired_events[j].fd); // 处理可读事件
}
if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
if (!rfired || event->read_cb != event->write_cb)
event->write_cb->do_request(fired_events[j].fd); // 处理可写事件
}
}
......
}
int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
retval = epoll_wait(epfd, events, size,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // epoll_wait系统调用,等待就绪事件或超时返回
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = events + j;
if (e->events & EPOLLIN) mask |= EVENT_READABLE;
if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE;
// 记录下已经发生的事件
fired_events[j].fd = e->data.fd;
fired_events[j].mask = mask;
}
return numevents;
}
process_events函数中,需要注意的是,这里处理三种事件,与fd相关的读写事件,与时间相关的time事件,还有添加的外部事件, 在处理fd的时候,如果没有fd就绪就会一直wait等待超时(最大超时时间不超过下次时间事件的值)。但是,在这个过程中, 有两种情况需要被唤醒,一是添加了一个更小的时间事件(最近发生),二是添加了外部事件。
Worker线程循环不停的处理事件,其实就是调用epoll_wait,返回就绪事件的fd,然后调用fd对应的回调read_cb或write_cb,很明显,epoll_wait能够返回就绪的fd, 这个fd必然是之前添加进去的,什么时候添加的呢?还记得在第二步Bind的时候,Processor类中创建了listen_fd,要想监听来自这个fd的请求,必然要将其添加到epoll进行管理。
但是从osd代码运行到这里,似乎都没有添加的动作?在osd调用messenger->start()后,紧接着就是:
err = osd->init();
诀窍就在这里:
int OSD::init()
{
......
// i'm ready!
client_messenger->add_dispatcher_head(this);
cluster_messenger->add_dispatcher_head(this);
......
}
void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty(); // 刚开始当然为空, first为true
dispatchers.push_front(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_front(d);
if (first)
ready(); // 准备添加fd到epoll
}
void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
Mutex::Locker l(lock);
Worker *w = pool->get_worker(); // 获取一个worker干活
processor.start(w); // listen_sd在Processor中
}
int Processor::start(Worker *w)
{
ldout(msgr->cct, 1) << __func__ << " " << dendl;
// start thread
if (listen_sd > 0) {
worker = w;
// 创建可读事件, 最终会调用epoll_ctl将listen_sd加进epoll进行管理
w->center.create_file_event(listen_sd, EVENT_READABLE,
EventCallbackRef(new C_processor_accept(this))); // 注意事件的callback
}
return 0;
}
listen fd添加进去以后,初始化过程就算全部完成了。当新的连接请求到来,如前所述,worker线程会调用process_event函数,回调就会被执行:
// listen fd 的回调
class C_processor_accept : public EventCallback {
Processor *pro;
public:
C_processor_accept(Processor *p): pro(p) {}
void do_request(int id) {
pro->accept(); // 回调
}
};
void Processor::accept()
{
while (errors < 4) {
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); // 接受连接请求
if (sd >= 0) {
msgr->add_accept(sd); // 通过messenger处理接收套接字sd
continue;
} else {
......
}
}
}
AsyncConnectionRef AsyncMessenger::add_accept(int sd)
{
lock.Lock();
Worker *w = pool->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); // 创建连接
w->center.dispatch_event_external(EventCallbackRef(new C_conn_accept(conn, sd))); // 分发事件, 外部新的连接,所以叫external
accepting_conns.insert(conn); // 记录下即将生效的连接, 最终完成后会从此集合删除
lock.Unlock();
return conn;
}
void EventCenter::dispatch_event_external(EventCallbackRef e)
{
external_lock.Lock();
external_events.push_back(e); // 将事件的callback函数放入事件中心的队列中等待执行
external_lock.Unlock();
wakeup(); // 唤醒worker线程
}
不是很明白为什么需要放入队列,等待worker下一次的process_event调用,是否可以直接执行完毕?
不管怎么样,放入队列后,需要执行队列中的callback,什么时候会执行呢?很明显是在worker线程中的process_event函数, 但是worker线程可能睡眠在epoll_wait(epoll管理的所有fd都没就绪,只能等待超时),如果有新连接到来,需要立即接收连接请求, 所以要唤醒睡眠的worker线程,后面的wakeup函数就是达到此目的,这个函数向pipe的一端写入数据(pipe是在函数EventCenter::init()中创建的), 使得另一端可读,即notify_receive_fd就绪,epoll_wait会返回其可读事件,然后执行其回调(回调就是简单读pipe),使得worker线程得以继续处理, 然后执行刚才放入队列中的回调。
void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;
char buf[1];
buf[0] = 'c';
// wake up "event_wait"
int n = write(notify_send_fd, buf, 1); // 唤醒worker线程
// FIXME ?
assert(n == 1);
}
int EventCenter::process_events(int timeout_microseconds)
{
......
numevents = driver->event_wait(fired_events, &tv); // 本来worker线程可能睡眠在这里,会被wakeup唤醒
// 这时候至少有一个fd就绪,即notify_receive_fd
// 执行所有fd的callback, 对于notify_receive_fd,可以看其callback,就是简单读一下,什么也没干
for (int j = 0; j < numevents; j++) {
......
event->read_cb->do_request(fired_events[j].fd);
.....
}
......
// 紧接着处理刚才的队列, 这正是唤醒worker的目的
{
external_lock.Lock();
while (!external_events.empty()) {
EventCallbackRef e = external_events.front();
external_events.pop_front();
external_lock.Unlock();
if (e)
e->do_request(0); // 连接请求的callback
external_lock.Lock();
}
external_lock.Unlock();
}
......
}
从分析看,连接请求的callback会很快被执行。前面已经有了accept接收请求的fd,现在需要将那个fd加入epoll结构,管理起来,然后就可以进行通信, callback最终就是做这些事情:
// 队列中的回调类型
class C_conn_accept : public EventCallback {
AsyncConnectionRef conn;
int fd;
public:
C_conn_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
void do_request(int id) {
conn->accept(fd);
}
};
void AsyncConnection::accept(int incoming)
{
ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
assert(sd < 0);
sd = incoming;
state = STATE_ACCEPTING;
center->create_file_event(sd, EVENT_READABLE, read_handler); // sd就是连接成功的fd,加进epoll管理
process(); // 服务器端的状态机开始执行,会先向客户端发送BANNER消息
}
注意服务端AsyncConnection状态机的初始状态是STATE_ACCEPTING,服务器端的状态机会先向客户端发送BANNER消息。 以后收到消息,worker线程就会调用read_handler处理,然后调用process,状态机不停的转换状态:
// 注册的回调类
class C_handle_read : public EventCallback {
AsyncConnectionRef conn;
public:
C_handle_read(AsyncConnectionRef c): conn(c) {}
void do_request(int fd_or_id) {
conn->process(); // 调用connection处理
}
};
void AsyncConnection::process()
{
int r = 0;
int prev_state = state;
Mutex::Locker l(lock);
do {
prev_state = state;
// connection状态机
switch (state) {
case STATE_OPEN:
......
default:
{
if (_process_connection() < 0)
goto fail;
break;
}
}
}
return 0;
fail:
......
}
// 单独处理连接信息
int AsyncConnection::_process_connection()
{
int r = 0;
switch(state) {
case STATE_WAIT_SEND:
......
}
......
}
AsyncConnection就是负责通信的类,要理解这个状态机的原理,必须理解ceph的应用层通信协议, 可以参看官方文档的解释。
AsyncMessenger的框架就算介绍完成了,当有新的连接请求到来,就会重复执行以下这几步:
accept connection
add accept fd
communication
由此可以看出,线程数不是随连接数线性增加的,只由最开始初始化的时候启动了多少个worker决定。
客户端的操作主要是发起connect操作,建立连接进行通信。所有的客户端都是基于librados库,然后通过RadosClient连接集群的:
int librados::Rados::connect()
{
return client->connect();
}
int librados::RadosClient::connect()
{
......
// 创建messenger
messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"radosclient", nonce);
......
// 创建objecter
// 发送消息的时候,比如librbd代码,都是通过objecter处理
// objecter需要借助于messenger发送,所以需要将创建的messenger传给objecter类
objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
&finisher,
cct->_conf->rados_mon_op_timeout,
cct->_conf->rados_osd_op_timeout);
// 同理,连接monitor也需要处理消息的收发
monclient.set_messenger(messenger);
objecter->init();
messenger->add_dispatcher_tail(objecter);
messenger->add_dispatcher_tail(this);
messenger->start();
......
messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); // ID全局唯一,所以需要向monitor获取
......
}
connect操作只是初始化了messenger对象,真正需要通信的时候,才会去建立连接,以objecter.cc中的op_submit为例:
ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
{
......
int r = _get_session(op->target.osd, &s, lc);
......
}
int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
{
......
// session 不存在,会创建新的session,
s->con = messenger->get_connection(osdmap->get_inst(osd));
......
}
ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
{
......
conn = create_connect(dest.addr, dest.name.type());
......
}
AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
// create connection
Worker *w = pool->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); // 创建connection
conn->connect(addr, type); // 连接
assert(!conns.count(addr));
conns[addr] = conn;
return conn;
}
void connect(const entity_addr_t& addr, int type)
{
set_peer_type(type);
set_peer_addr(addr);
policy = msgr->get_policy(type);
_connect();
}
void AsyncConnection::_connect()
{
state = STATE_CONNECTING; // 这个初始化状态很关键,是客户端状态机的起始状态
stopping.set(0);
center->dispatch_event_external(read_handler); // 放入队列等待worker处理
}
这里和前面一样,worker会处理这个外部事件,read_handler就会调用process函数,紧接着就过度到_process_connection:
int AsyncConnection::_process_connection()
{
int r = 0;
switch(state) {
case STATE_CONNECTING: // 初始状态
{
......
sd = net.connect(get_peer_addr()); // 通过net类的功能,实际上就是调用connect系统调用,建立socket通信
// 连接成功后,将socket fd加入epoll进行管理
center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_WAIT_BANNER;
break;
}
}
}
接下来就是客户端和服务端的通信,都是通过AsyncConnection的状态机完成。同理,客户端即使创建多个messenger, 他们仍然共享一个workerpool,线程数由这个pool初始化的时候决定,不会随着连接的增加而线性增加。
进程中所有的AsyncMessenger共享一个workerpool管理所有worker
Worker线程通过EventCenter负责具体的事件处理
应用层的网络通信由AsyncConnection的状态机处理