博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
leveldb代码精读 插入操作
阅读量:2434 次
发布时间:2019-05-10

本文共 6075 字,大约阅读时间需要 20 分钟。

leveldb插入数据时,必然做的操作是先写logfile,再将数据放到cache里
不过在此之前,会先进行一下预处理
1 将要写的数据封装到到writer里,将write加入写队列,等待轮到它写。
2 检查cache是否已满,是否需要“做检查点”
3 leveldb的cache有两个状态,当前状态和只读状态。
当cache写满,需要写文件时,会将cache转成只读状态,进行写文件和文件压缩操作。
所以每次写文件前都要先等待之前的只读cache完成自己的使命。
4 由于新数据需要些到level 0文件,而level 0文件的个数是有限制的
当达到soft limit时,需要sleep1毫秒,将cpu资源让给正在进行中的压缩操作。
当达到hard limits时,直接进入等待。
5 维护cache状态,维护file number,创建新文件。。。
6 尝试进行一次文件压缩。
对外的put函数

点击(此处)折叠或打开

  1. Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  2.   return DB::Put(o, key, val);
  3. }
  4. Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  5.   WriteBatch batch;
  6.   batch.Put(key, value);
  7.   return Write(opt, &batch);
  8. }
真正的功能入口函数

点击(此处)折叠或打开

  1. Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  2.   // 将WriteBatch封装到一个Writer里,设置一些选项。
  3.   Writer w(&mutex_);
  4.   w.batch = my_batch;
  5.   w.sync = options.sync;
  6.   w.done = false;
  7.   MutexLock l(&mutex_);
  8.   /*
  9.   把Writer放到到写队列里,等待writer升到 writers_.front 就能开始了。
  10.   */
  11.   writers_.push_back(&w);
  12.   while (!w.done && &w != writers_.front()) {
  13.     w.cv.Wait();
  14.   }
  15.   if (w.done) {
  16.     return w.status;
  17.   }
  18.   // May temporarily unlock and wait.
  19.   /*
  20.   MakeRoomForWrite会对写入操作所需的条件进行一系列判断,
  21.   如:
  22.   level 0 文件数是否超过限制,
  23.   cache是否还有空间,是否真的需要写文件,
  24.   imm cache是否能够释放
  25.   是否需要做压缩
  26.   条件都满足,确定需要写文件时,进行:
  27.   生成新的文件号,新的logfile,将当前cache转成imm cache等操作
  28.   参数为my_batch == NULL的意思是:如果my_batch为空,则视为想让MakeRoomForWrite尝试做一次压缩。
  29.   */
  30.   Status status = MakeRoomForWrite(my_batch == NULL);
  31.   // sequence是指写batch的次数
  32.   uint64_t last_sequence = versions_->LastSequence();
  33.   Writer* last_writer = &w;
  34.   /*
  35.   my_batch是有可能为空的,可以利用空batch手动让MakeRoomForWrite进行压缩操作。
  36.   */
  37.   if (status.ok() && my_batch != NULL) {
    // NULL batch is for compactions
  38.     /*
  39.     WriteBatch里有一个字符串rep_,存放转码成存储格式后的数据。
  40.     BuildBatchGroup的工作是从writers_里找其它的WriteBatch,他们的rep_拼到一个WriteBatch里
  41.     但是最终的rep_长度不能超过 1 << 20
  42.     */
  43.     WriteBatch* updates = BuildBatchGroup(&last_writer);
  44.     // WriteBatchInternal是一个由静态函数组成的工具类
  45.     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  46.     last_sequence += WriteBatchInternal::Count(updates);
  47.     // Add to log and apply to memtable. We can release the lock
  48.     // during this phase since &w is currently responsible for logging
  49.     // and protects against concurrent loggers and concurrent writes
  50.     // into mem_.
  51.     {
  52.       mutex_.Unlock();
  53.       // 写logfile
  54.       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
  55.       bool sync_error = false;
  56.       if (status.ok() && options.sync) {
  57.         status = logfile_->Sync();
  58.         if (!status.ok()) {
  59.           sync_error = true;
  60.         }
  61.       }
  62.       if (status.ok()) {
  63.         // 将数据放入cache
  64.         status = WriteBatchInternal::InsertInto(updates, mem_);
  65.       }
  66.       mutex_.Lock();
  67.       if (sync_error) {
  68.         // The state of the log file is indeterminate: the log record we
  69.         // just added may or may not show up when the DB is re-opened.
  70.         // So we force the DB into a mode where all future writes fail.
  71.         RecordBackgroundError(status);
  72.       }
  73.     }
  74.     if (updates == tmp_batch_) tmp_batch_->Clear();
  75.     // 更新sequence
  76.     versions_->SetLastSequence(last_sequence);
  77.   }
  78.   while (true) {
  79.     Writer* ready = writers_.front();
  80.     writers_.pop_front();
  81.     if (ready != &w) {
  82.       ready->status = status;
  83.       ready->done = true;
  84.       ready->cv.Signal();
  85.     }
  86.     if (ready == last_writer) break;
  87.   }
  88.   // Notify new head of write queue
  89.   if (!writers_.empty()) {
  90.     writers_.front()->cv.Signal();
  91.   }
  92.   return status;
  93. }
之所以需要MakeRoom是因为新数据需要写入level 0 数据文件,但是level 0文件数量有限制。
可能需要做压缩来减少level 0 文件的数量。
同时当前cache也需要转成imm cache,需要判断之前的imm cache是否还占着位置。

点击(此处)折叠或打开

  1. Status DBImpl::MakeRoomForWrite(bool force) {
  2.   mutex_.AssertHeld();
  3.   assert(!writers_.empty());
  4.   // 决定是否允许通过sleep来给压缩操作让出系统资源。
  5.   bool allow_delay = !force;
  6.   Status s;
  7.   while (true) {
  8.     if (!bg_error_.ok()) {
  9.       // Yield previous error
  10.       s = bg_error_;
  11.       break;
  12.     } else if (
  13.         /*
  14.         当允许delay,并且level 0的文件数已经超过了8个,就要sleep 1毫秒,给复制压缩的线程工作让出CPU资源。
  15.         sleep一次后就将allow_delay设成false,这次写入操作就不需要再sleep了。
  16.         */
  17.         allow_delay &&
  18.         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
  19.       // We are getting close to hitting a hard limit on the number of
  20.       // L0 files. Rather than delaying a single write by several
  21.       // seconds when we hit the hard limit, start delaying each
  22.       // individual write by 1ms to reduce latency variance. Also,
  23.       // this delay hands over some CPU to the compaction thread in
  24.       // case it is sharing the same core as the writer.
  25.       mutex_.Unlock();
  26.       env_->SleepForMicroseconds(1000);
  27.       allow_delay = false; // Do not delay a single write more than once
  28.       mutex_.Lock();
  29.     } else if (!force &&
  30.                /*
  31.                当cache不满时,先不写文件。
  32.                */
  33.                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
  34.       // There is room in current memtable
  35.       break;
  36.     } else if (imm_ != NULL) {
  37.       /*
  38.       leveldb有两种cache,一个是当前cache,就是目前正在写新数据的cache。
  39.       当cache满了,需要写文件时,就将当前cache转成immunity cache,是一个只读cache,由指针imm_管理。
  40.       imm cache 用户查询操作和压缩操作。
  41.       如果imm cache存在,就要等它的对应的文件压缩完成才能将当前cache转成imm cache。
  42.       */
  43.       // We have filled up the current memtable, but the previous
  44.       // one is still being compacted, so we wait.
  45.       Log(options_.info_log, "Current memtable full; waiting...\n");
  46.       bg_cv_.Wait();
  47.     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
  48.       // 达到了level 0 文件数的硬指标限制,不能再写新的了。
  49.       // There are too many level-0 files.
  50.       Log(options_.info_log, "Too many L0 files; waiting...\n");
  51.       bg_cv_.Wait();
  52.     } else {
  53.       // 检查条件结束,开始正式工作
  54.       // Attempt to switch to a new memtable and trigger compaction of old
  55.       assert(versions_->PrevLogNumber() == 0);
  56.       // 生成新的logfile number
  57.       uint64_t new_log_number = versions_->NewFileNumber();
  58.       WritableFile* lfile = NULL;
  59.       // 创建新文件
  60.       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  61.       if (!s.ok()) {
  62.         // Avoid chewing through file number space in a tight loop.
  63.         versions_->ReuseFileNumber(new_log_number);
  64.         break;
  65.       }
  66.       delete log_;
  67.       delete logfile_;
  68.       // 将Logfile指向新文件,设置新log number
  69.       logfile_ = lfile;
  70.       logfile_number_ = new_log_number;
  71.       log_ = new log::Writer(lfile);
  72.       // 将当前cache切换成imm cache,创建新的当前cache
  73.       imm_ = mem_;
  74.       has_imm_.Release_Store(imm_);
  75.       mem_ = new MemTable(internal_comparator_);
  76.       mem_->Ref();
  77.       force = false; // Do not force another compaction if have room
  78.       /*
  79.       如果需要,进行一次压缩
  80.       这里面进行了一下判断,调了回调函数
  81.       最终真正的功能入口是DBImpl::BackgroundCompaction()
  82.       */
  83.       MaybeScheduleCompaction();
  84.     }
  85.   }
  86.   return s;
  87. }

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/26239116/viewspace-1847246/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/26239116/viewspace-1847246/

你可能感兴趣的文章
笔记︱风控分类模型种类(决策、排序)比较与模型评估体系(ROC/gini/KS/lift)
查看>>
MySQL存储引擎之MyISAM与InnoDB区别
查看>>
Python numpy小练习
查看>>
Linux命令英文解释(按英文字母顺序)
查看>>
秋招面试准备-数据库知识
查看>>
数据分析岗-机器学习相关知识
查看>>
分类模型的效果评估
查看>>
深入理解什么是Java双亲委派模型
查看>>
MySQL优化Limit查询语句
查看>>
轻松入门MySQL主从复制原理
查看>>
SpringCloud全家桶---Zuul网关
查看>>
基于zuul和ribbon的灰度发布方案
查看>>
JVM常用垃圾收集器参数说明
查看>>
MySQL索引基础知识梳理
查看>>
MySQL事务ACID底层实现原理
查看>>
关于MySQL wait_timeout问题记录
查看>>
基础算法面试题---如何用栈实现队列
查看>>
基础算法面试题---如何用队列实现栈(1)
查看>>
基础算法面试题---如何用队列实现栈(2)
查看>>
基础算法面试题---如何数组实现栈和队列
查看>>