时间:2021-07-01 10:21:17 帮助过:23人阅读
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= compact->smallest_snapshot &&
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
------
理解了大体设计, 啃代码的时间到了. 跟我一起看看leveldb::Status status = leveldb::DB::Open(options, "testdb", &db);会触发什么模块吧.
leveldb::DB::Open来自http://db_impl.cc 1490行,
Status DB::Open(const Options& options, const std::string& dbname,
                DB** dbptr) { // static工厂函数
  *dbptr = NULL;
  DBImpl* impl = new DBImpl(options, dbname);
源代码有几点习惯挺好的, 值得学习.
接上, new然后跳到117行的构造函数,
1 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) 2 : env_(raw_options.env), // Env* const 3 internal_comparator_(raw_options.comparator), // const InternalKeyComparator 4 internal_filter_policy_(raw_options.filter_policy), // const InternalFilterPolicy 5 options_(SanitizeOptions(dbname, &internal_comparator_, // const Options 6 &internal_filter_policy_, raw_options)), 7 owns_info_log_(options_.info_log != raw_options.info_log), // bool 8 owns_cache_(options_.block_cache != raw_options.block_cache), // bool 9 dbname_(dbname), // const std::string 10 db_lock_(NULL), // FileLock* 11 shutting_down_(NULL), // port::AtomicPointer 12 bg_cv_(&mutex_), // port::CondVar 13 mem_(NULL), // MemTable* 14 imm_(NULL), // MemTable* 15 logfile_(NULL), // WritableFile* 16 logfile_number_(0), // uint64_t 17 log_(NULL), // log::Writer* 18 seed_(0), // uint32_t 19 tmp_batch_(new WriteBatch), // WriteBatch* 20 bg_compaction_scheduled_(false), // bool 21 manual_compaction_(NULL) { // ManualCompaction* 22 has_imm_.Release_Store(NULL); 23 24 // Reserve ten files or so for other uses and give the rest to TableCache. 25 const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; 26 table_cache_ = new TableCache(dbname_, &options_, table_cache_size); 27 28 versions_ = new VersionSet(dbname_, &options_, table_cache_, 29 &internal_comparator_); 30 }
Google C++ Style虽然禁止函数默认参数, 但允许你扔个Options.
解释下成员变量的含义,
我决定先搞懂memory barrier的原子指针再继续分析, 就先到这了.
我以前从来没有C++多线程的经验, 借着看源码的机会, 才有机会了解. 曾今工作时, 我写Python爬虫就用thread-safe队列, 以为原子性全是靠锁实现的. 所谓的无锁就是先修改再检查要不要反悔的乐观锁. 我错了, X86 CPU的赋值(Store)和读取(Load)操作天然可以做到无锁.
相关问题: C++的6种memory order
那memory barrier这个名词是哪里蹦出来的呢? Load是原子性操作, CPU不会Load流程走到一半, 就切换到另一个线程去了, 也就是Load本身是不会在多线程环境下产生问题的. 真正导致问题的是做这个操作的时机不确定!
1. 编译器有可能让指令乱序, 比如, int a=b; long c=b; 编译器一旦判定a和c没有依赖性, 就有权力让这两个取值操作以任意顺序执行. 因为有可能有CPU指令可以一下取4个int, 乱序可以凑个整.
2. CPU会让指令乱序, 原因同上, 但额外还有个原因是分支预测. AB线程都读写一个中间量c, B在处理c, 你预期B好了, A才会取. 但万一A分支预测成功, B在处理的时候, A已经提前Load c进寄存器, 这就没得玩了...
所以, 必须要有指令告诉CPU和编译器, 不要改变这个变量的存取顺序. 这就是Memory Barrier了. call MemoryBarrier保证前后一行是严格按照代码顺序的.
atomic_pointer.h 126-143行, 注意MemoryBarrier()的摆放,
1 class AtomicPointer { 2 private: 3 void* rep_; 4 public: 5 AtomicPointer() { } 6 explicit AtomicPointer(void* p) : rep_(p) {} 7 inline void* NoBarrier_Load() const { return rep_; } 8 inline void NoBarrier_Store(void* v) { rep_ = v; } 9 inline void* Acquire_Load() const { 10 void* result = rep_; 11 MemoryBarrier(); 12 return result; 13 } 14 inline void Release_Store(void* v) { 15 MemoryBarrier(); 16 rep_ = v; 17 } 18 };
大公司的开源项目真的是一个宝库! 就算用不到, 各种踩了无数坑的库, 编码规则和跨平台代码都是一般人没机会完善的.
另外, 有菊苣在问题leveldb中atomic_pointer里面memory barrier的几点疑问?提到MemoryBarrier不保证CPU不乱序. 我觉得这个应该不用担心. 因为MemoryBarrier的counterpart是std::atomic, 肯定严格保证语义相同啊. 实在不放心用std::atomic是坠吼的.
------
继续上次没读完的Open部分代码.
http://db_impl.cc 139-146行,
  has_imm_.Release_Store(NULL); // atomic pointer
  // Reserve ten files or so for other uses and give the rest to TableCache.
  const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
  versions_ = new VersionSet(dbname_, &options_, table_cache_,
                             &internal_comparator_);
has_imm_就是我上面描述的atomic pointer, 我推测这里大概率Google程序员雇了一个临时工(233), 把可以列表构造的has_imm_放到了函数部分, 因为这里不存在任何race的可能性. db new完了. 说下一个很重要的原则, 构造函数究竟要做什么? 阿里和Google共同的观点: 轻且无副作用(基本就是赋值). 业务有需求的话, 两步构造或者工厂函数, 二选一.
回到最早的工厂函数, 一个靠谱数据库的Open操作, 用脚趾头也能想到要从日志恢复数据,
1 DB::Open(const Options& options, const std::string& dbname, 2 DB** dbptr) { // 工厂函数 3 *dbptr = NULL; // 设置结果默认值, 指针传值 4 5 DBImpl* impl = new DBImpl(options, dbname); 6 impl->mutex_.Lock(); // 数据恢复时上锁, 禁止所有可能的后台任务 7 VersionEdit edit; 8 // Recover handles create_if_missing, error_if_exists 9 bool save_manifest = false; 10 Status s = impl->Recover(&edit, &save_manifest); // 读log恢复状态 11 if (s.ok() && impl->mem_ == NULL) { 12 // Create new log and a corresponding memtable. 复位 13 uint64_t new_log_number = impl->versions_->NewFileNumber(); 14 WritableFile* lfile; 15 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), 16 &lfile); 17 if (s.ok()) { 18 edit.SetLogNumber(new_log_number); 19 impl->logfile_ = lfile; 20 impl->logfile_number_ = new_log_number; 21 impl->log_ = new log::Writer(lfile); 22 impl->mem_ = new MemTable(impl->internal_comparator_); 23 impl->mem_->Ref(); 24 } 25 } 26 if (s.ok() && save_manifest) { 27 edit.SetPrevLogNumber(0); // No older logs needed after recovery. 28 edit.SetLogNumber(impl->logfile_number_); 29 s = impl->versions_->LogAndApply(&edit, &impl->mutex_); // 同步VersionEdit到MANIFEST文件 30 } 31 if (s.ok()) { 32 impl->DeleteObsoleteFiles(); // 清理无用文件 33 impl->MaybeScheduleCompaction(); // 有写入就有可能要compact 34 } 35 impl->mutex_.Unlock(); // 初始化完毕 36 if (s.ok()) { 37 assert(impl->mem_ != NULL); 38 *dbptr = impl; 39 } else { 40 delete impl; 41 } 42 return s; 43 }
------
就这样, Open操作的脉络大概应该是有了
[leveldb] 2.open操作介绍
标签:描述 缓存 重写 atom 比较 create opp obsolete 数据结构