Ceph FileStore

May 4, 2016
Author:Eric
Source:http://blog.wjin.org/posts/ceph-filestore.html
Declaration: this work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License. Creative Commons License

Introduction

ceph后端存储引擎有多种实现(filestore, kstore, memstore, bluestore), bluestore将来会成为默认的后端存储, 但是需要一些时间,现在大部分部署都是使用filestore。filestore的代码还是比较好理解的, 执行流程可以参考网上的这篇文章

在阅读代码的过程中,一些细节还是需要注意,比如不同PG的OP操作可以并行执行,同一PG内部OP请求必须串行执行, 各个限流组件怎么协调工作,journal回放的时候需要注意非幂等操作。

本文顺着代码流程,分析写流程中一些值得关注的细节,然后总结下throttle, 非幂等操作和tuning参数。

涉及到的相关线程:

操作语义:

备注一下,这里的commit指sync线程将page cache的数据sync到data disk的意思,不是journal的disk。容易引起误解的地方是,osd_op_tp在提交事务的时候, 会有两个回调,一个是ondisk(客户端可以认为是on commit),表示数据已经落盘,因为journal一般采用O_DIRECT + O_DSYNC方式,写journal成功就表示数据落盘, 可以调用ondisk回调,通知客户端写成功,所以journal能改善写的性能,将随机转化为顺序,并且多个写可以合并成一次journal的写。 另外一个是onreadable,表示数据可读,在FileStore::op_tp线程池将数据写入page cache后,就可以读数据,可以调用onreadable回调。

重要数据结构:

class JournalingObjectStore : public ObjectStore {
protected:

  class SubmitManager {
    Mutex lock;
    uint64_t op_seq; // journal提交的序列号,全局唯一
    uint64_t op_submitted;
	......
  } submit_manager;

  class ApplyManager {
    Mutex apply_lock;
    bool blocked;
    Cond blocked_cond;
    int open_ops;
    uint64_t max_applied_seq; // apply到文件系统page cache的序列号

    Mutex com_lock;
    map<version_t, vector<Context*> > commit_waiters;
    uint64_t committing_seq, committed_seq; // 将文件系统page cache的数据fsync到磁盘的序列号,用来通知journal释放空间
	......
  } apply_manager;

  ......
};

Write

OSD::osd_op_tp

OSD::osd_op_tp线程池在执行PG写操作的时候,是通过函数queue_transactions提交请求的:

int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
				  TrackedOpRef osd_op,
				  ThreadPool::TPHandle *handle)
{
  ......

  // posr,定义在PG类中,对于同一个PG,一定是一样的,这里的default_osr根本就不会用,Jewel新代码已经删除了
  // OpSequencer非常关键,同一个PG会使用同样的OpSequencer,保证PG操作串行化
  OpSequencer *osr;
  if (!posr)
    posr = &default_osr;
  if (posr->p) {
    osr = static_cast<OpSequencer *>(posr->p);
  } else {
    osr = new OpSequencer; // PG的第一次操作的时候,会创建一个OpSequencer,以后就会复用
    osr->parent = posr;
    posr->p = osr;
  }

  ......

  if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
    Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);

    op_queue_reserve_throttle(o, handle); // filestore层对整个op的限流,释放的时候是在FileStore::_finish_op

    journal->throttle(); // 对journal的限流

    // 在journal层面为op生成唯一sequence, 因为journal是单线程写,所以写一定是串行的
    uint64_t op_num = submit_manager.op_submit_start();
    o->op = op_num;

    if (m_filestore_journal_parallel) {
		......
    } else if (m_filestore_journal_writeahead) { // ext4, xfs都需要wal
      
      osr->queue_journal(o->op); // journal层面的sequence记录在OpSequencer中的journal queue中, JournalingObjectStore::finisher线程中会deque_journal

      _op_journal_transactions(o->tls, o->op, // 提交OP,注意这里的callback以及sequence
			       new C_JournaledAhead(this, osr, o, ondisk),
			       osd_op);
    } else {
      assert(0);
    }
    submit_manager.op_submit_finish(op_num); // op提交到journal队列完成
    return 0;
  }

  ......
}

void JournalingObjectStore::_op_journal_transactions(
  list<ObjectStore::Transaction*>& tls, uint64_t op,
  Context *onjournal, TrackedOpRef osd_op)
{
  if (journal && journal->is_writeable()) {
	......
    journal->submit_entry(op, tbl, data_align, onjournal, osd_op); // 放入journal队列,等待write线程执行journal写请求
  } else if (onjournal) {
    apply_manager.add_waiter(op, onjournal);
  }
}

void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
			       Context *oncommit, TrackedOpRef osd_op)
{
  ......

  // 获取journal限流资源
  throttle_ops.take(1);
  throttle_bytes.take(e.length());

  {
	// 注意锁的顺序
    Mutex::Locker l1(writeq_lock);  // ** lock **
    Mutex::Locker l2(completions_lock);  // ** lock **

	// write线程执行完成后,会处理这里的completion
    completions.push_back(
      completion_item(
	seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));

    if (writeq.empty())
      writeq_cond.Signal();

    writeq.push_back(write_item(seq, e, alignment, osd_op)); // 放入队列,等待write线程执行
  }
}

执行到这里,请求已经提交到journal的队列里面,OSD::osd_op_tp工作就结束了。

FileJournal::write_thread

写journal线程通过Filejournal::write_thread完成,流程比较简单,执行完成后,就会调用:

void FileJournal::queue_completions_thru(uint64_t seq)
{
  ......

  // journal的一次写可以同时写入多个op请求日志,所以这里是循环处理所有已经完成的op
  // 将回调全部放入finisher线程的队列
  while (!completions_empty()) {
    completion_item next = completion_peek_front();
    if (next.seq > seq) // sequence判断是否已经写入完成
      break;
    completion_pop_front();

	......
    if (next.finish) // 放入finisher队列,等待回调
      finisher->queue(next.finish); // finisher线程实际上是JournalingObjectStore中的finisher
	......
  }
  finisher_cond.Signal();
}

需要注意的是,在journal准备写和写完成处理completions的时候,调用队列的锁太频繁,可以优化。 master branch已经有类似patch: pr6701

JournalingObjectStore::finisher

这里的回调就是C_JournaledAhead,然后会执行下面这个函数,主要干两件事情:1)将op放入filestore队列排队 2)将ondisk回调放入FileStore::ondisk_finisher:

void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
{
  queue_op(osr, o); // 将op在filestore层面排队,准备写入文件系统

  list<Context*> to_queue;
  osr->dequeue_journal(&to_queue); // journal已经写成功,出队列

  // journal写好了,数据就真正落盘了,所以执行ondisk回调
  // 注意此时数据还未写入文件系统,所以不可读
  if (ondisk) {
    ondisk_finisher.queue(ondisk); // 放入ondisk_finisher的队列,等待回调
  }

  if (!to_queue.empty()) {
    ondisk_finisher.queue(to_queue);
  }
}

journal是单线程顺序执行的,且每条op请求都有唯一的sequence,使得queue_op一定是按提交时候的顺序调用。 但是同一个PG可能连续提交了很多次op请求,这些请求会放入PG对应的OpSequencer中进行排队,然后同时将OpSequencer放入 op_wq队列等待FileStore::op_tp执行,所以如果PG连续提交请求,OpSequencer会在op_wq中同时出现多次, op_tp中可能多个线程同时获取同一个OpSequencer准备执行写文件系统的操作:

void FileStore::queue_op(OpSequencer *osr, Op *o)
{
  // queue_op按提交时候的顺序调用,必然导致属于同一个OpSequencer的OP按照提交顺序
  // 在OpSequencer内部排队, 保证了PG内部op的先后顺序
  osr->queue(o);
  op_wq.queue(osr);
}

FileStore::op_tp

PG对应的OpSequencer排队以后,说明PG有OP需要执行,这时候线程池就会对其处理,入口函数:

void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
  wbthrottle.throttle(); // filestore层面writeback的限流

  ......

  // op_tp线程池的多个线程可以并发对同一个OpSequencer执行请求
  // 锁保证同一个OpSequencer中(也即PG中)只能有一个OP在执行
  osr->apply_lock.Lock();

  Op *o = osr->peek_queue(); // 获取一个op

  apply_manager.op_apply_start(o->op);
  int r = _do_transactions(o->tls, o->op, &handle); // 执行写请求到文件系统
  apply_manager.op_apply_finish(o->op);
}

写执行完成后,线程还会执行一个finish函数:

void FileStore::_finish_op(OpSequencer *osr)
{
  list<Context*> to_queue;
  Op *o = osr->dequeue(&to_queue); // 将op从OpSequencer出队列
  osr->apply_lock.Unlock();  // 释放锁,这时候其他线程就可以继续对此QpSequencer执行apply操作

  op_queue_release_throttle(o); // 释放filestore的throttle,见queue_transactions

  if (o->onreadable_sync) {
    o->onreadable_sync->complete(0);
  }
  if (o->onreadable) {
    op_finisher.queue(o->onreadable);
  }
  if (!to_queue.empty()) {
    op_finisher.queue(to_queue); // 放入op_finisher队列,等待执行apply回调,标志数据可读
  }
  delete o;
}

OpSequencer中apply_lock保证PG内部OP的串行化,并不是保证内部队列q和jq的互斥,q和jq的互斥是另外一把锁qlock在保证。 所以在apply的过程中,OSD::osd_op_tp可以继续向jq中提交请求,更重要的是,JournalingObjectStore::finisher线程可以继续将 写journal完成的op在q中排队。

如前所述,同一个OpSequencer可能进入FileStore::op_wq多次,然后被多个FileStore:op_tp中的线程获取执行,然后q和jq共用一把锁,是否会影响性能? 其实也还好,虽然FileStore::osd_tp是线程池,会有多个线程,但是这些线程在开始处理apply 的时候,会先获取apply_lock,然后在执行完成的时候,从q出队列op的时候获取qlock,所以不会同时出现多个FileStore::osd_tp的线程去 抢qlock这个锁,可以认为同一时刻q只增加了两个线程去抢qlock,即JournalingObjectStore::finisher 和 其中一个FileStore::osd_tp线程。

FileStore::sync_thread

sync线程实现比较简单,目的是获取一个序列号,保证此序列号之前的数据都已经apply过了,即数据已经在page cache中, 然后执行fsync,更新序列号,这样可以保证此序列号之前的数据已经存入disk中,以后不在需要,journal可以做trim释放空间。

需要注意在获取序列号的过程中,会导致FileStore::op_tp block住,影响apply流程,对性能有损失, 可以适当调整参数filestore_max_sync_interval。有一个潜在问题是,如果长时间不sync,可能会导致执行sync的时候, 整个目录数据过多,导致一次sync时间太长,也可能导致系统内存不足而OOM,这些需要结合kernel参数dirty_ratio 和 dirty_expire_centisecs调优。

void FileStore::sync_entry()
{
  lock.Lock();
  while (!stop) {
	......

    op_tp.pause(); // 暂停apply线程池的处理
    if (apply_manager.commit_start()) { // 如果有新的请求需要commit, 返回true

      uint64_t cp = apply_manager.get_committing_seq(); // 获取已经apply过的序列号

	  ......

      if (backend->can_checkpoint()) {
		  ......

      } else {
		apply_manager.commit_started(); // 设置block为false,主要是为journal replay服务
		op_tp.unpause(); // 恢复线程池

		int err = backend->syncfs(); // 这里会sync osd的整个current目录

		err = write_op_seq(op_fd, cp); // 记录下commit的序列号
	
		err = ::fsync(op_fd); // 保证更新序列号的操作落盘
      }
      
      apply_manager.commit_finish(); // 完成commit,通知journal
      wbthrottle.clear();

	  ......

    } else {
      op_tp.unpause();
    }
	......
  }
  stop = false;
  lock.Unlock();
}

bool JournalingObjectStore::ApplyManager::commit_start()
{
  bool ret = false;

  uint64_t _committing_seq = 0;
  {
    Mutex::Locker l(apply_lock);

    blocked = true; // 这个仅仅为journal replay起作用

    while (open_ops > 0) { // 等待其他inflight apply 完成
      blocked_cond.Wait(apply_lock);
    }

    {
      Mutex::Locker l(com_lock);
      if (max_applied_seq == committed_seq) {
		blocked = false;
		goto out;
      }

      _committing_seq = committing_seq = max_applied_seq; // 更新序列号
    }
  }
  ret = true;

 out:
  if (journal)
    journal->commit_start(_committing_seq);  // tell the journal too
  return ret;
}

这里比较晦涩的地方是,sync线程先pause住FileStore::op_tp线程池,然后调用commit_start(), pause后说明线程池不会再有新的apply请求了,为什么还设置变量blocked为true?

首先,设置这个变量为true,目的是防止继续apply:

uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
  Mutex::Locker l(apply_lock);
  while (blocked) { // 新的apply操作将会阻塞
    blocked_cond.Wait(apply_lock);
  }

  assert(!blocked);
  assert(op > committed_seq);
  open_ops++;
  return op;
}

其次,有一种特殊情况,即journal在做replay的时候,apply的操作不是在FileStore::op_op线程池内完成, 而是在其他线程调用mount的时候,回放日志完成apply,所以pause op_tp不起作用,停止不了apply操作。 如果回放日志太多或太久,导致sync线程开始工作,那么此时需要将回放日志的线程暂停一下, 以便获取序列号,这时候blocked就起作用了,可以阻塞调用mount的线程,等commit完成后唤醒继续replay。

void JournalingObjectStore::ApplyManager::commit_started()
{
  blocked = false; // 设置回false
  blocked_cond.Signal(); // 唤醒
}

另外还需要注意,sync线程调用commit_start()是有可能被阻塞的,需要等所有的inflight apply完成, 所以apply完成后会检查是否有blocked,这里和刚才的情况不一样,虽然都是阻塞在blocked变量上:

void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
  ......
  if (blocked) {
    blocked_cond.Signal(); // 唤醒sync线程
  }
  ......
}

Throttle

FileStore实现中,提供了三个限流的地方:

journal

int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
				  TrackedOpRef osd_op,
				  ThreadPool::TPHandle *handle)
{
	......
    journal->throttle();
	......
}

int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
{
	......
	put_throttle(1, peek_write().bl.length());
	......

}

filestore apply

int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
				  TrackedOpRef osd_op,
				  ThreadPool::TPHandle *handle)
{
	......
	op_queue_reserve_throttle(o, handle);
	......
}

void FileStore::_finish_op(OpSequencer *osr)
{
	......
    op_queue_release_throttle(o);
	......
}

filestore writeback

wbthrottle,参见另外一篇文章

Non-idempotent OP

在osd异常崩溃的情况下,journal中的数据不一定全部都存放在了FileStore的data disk中,因为apply到了FileStore中,并不代表数据就在disk中了, 此时很有可能数据在page cache中,需要sync线程调用fdatasync之类的系统调用才能保证数据落盘。

所以为了保证异常情况下数据的一致性,需要对journla的日志做回放,从什么地方开始回放,FileStore中会将已经apply到文件系统并进行 过fdatasync的序列号记录在文件commit_op_seq中,回放的时候就从此文件记录的序列号开始。

然而,回放的时候,部分op可能已经在disk中生效,但是commit_op_seq并没有体现,此时如果仍然回放,对于有些操作,反复执行多次会出问题,也即非幂等操作。

一个例子:

假设如上操作都已经体现在了disk中,但是sync线程并未来得及更新commit_op_seq,此时系统崩溃。 再次启动后,osd启动回放日志,第二次执行clone操作,将拷贝到新版本的数据,而不是期望版本的数据。 FileStore需要保证回放处理这种情况的正确性。

具体做法是在对象文件的属性中记录下最后操作的一个三元组(序列号,事务编号,OP编号),因为journal提交的时候有一个唯一的序列号,通过这个序列号, 就可以找到提交时候的事务,然后根据事务编号和OP编号最终定位出最后操作的OP。

struct SequencerPosition {
  uint64_t seq;  ///< seq
  uint32_t trans; ///< transaction in that seq (0-based)
  uint32_t op;    ///< op in that transaction (0-based)
  ......
};

看clone例子,操作前,先检查下,如果可以继续执行,就执行操作,操作完成后,设置一个guard,这样对于非幂等操作,如果上次执行过, 肯定是有记录的,再一次执行的时候check就会失败,就不继续执行。

int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
		      const SequencerPosition& spos)
{
  ......
  if (_check_replay_guard(cid, newoid, spos) < 0)
    return 0;

  ......

  // clone is non-idempotent; record our work.
  _set_replay_guard(**n, spos, &newoid);

  ......
}

Tuning

# journal 
journal_queue_max_bytes
journal_queue_max_ops

# filestore apply 
filestore_queue_max_bytes
filestore_queue_max_ops

# filestore writeback
filestore_wbthrottle_enable
filestore_wbthrottle_xfs_bytes_start_flusher
filestore_wbthrottle_xfs_bytes_hard_limit
filestore_wbthrottle_xfs_ios_start_flusher
filestore_wbthrottle_xfs_ios_hard_limit
filestore_wbthrottle_xfs_inodes_start_flusher

filestore writeback打开以后,一方面需要注意对应限流参数的调整,ext4和xfs是共用一套参数, 另一方面,如果io压力持续过大,可能导致FileStore::op_tp被throttle住而超时,也有可能导致 FileStore::op_wq限流起作用。

如果关闭writeback,可能导致FileStore:sync_thread超时,需要调整参数filestore_commit_timeout, ssd情况下可以关闭wbthrottle。

其他影响性能的参数:

filestore_op_threads
filestore_fd_cache_size

journal_max_write_bytes
journal_max_write_entries

filestore_max_sync_interval