侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

【原创】OpenStack Swift源码分析(七)Replication服务

2022-06-12 星期日 / 0 评论 / 0 点赞 / 181 阅读 / 12779 字

Replication服务,用来保证系统的一致性,当面临暂时性的网络中断驱动器失败致使存储的文件丢失,或者其他一些情况时,通过rsync来同步文件。 Repliactiion实际上是一个守护进

    Replication服务,用来保证系统的一致性,当面临暂时性的网络中断驱动器失败致使存储的文件丢失,或者其他一些情况时,通过rsync来同步文件。

    Repliactiion实际上是一个守护进程,通常情况下会每个30秒扫描一下本地文件的hashes.pkl(存储文件的hash)然后跟远端的其他副本进行比较,如果不相同,进行相应的更新。


Daemon.py

Daemon.py是所有守护进程的模板,服务启动,其实就是执行模板中的run_daemon方法



def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):    # very often the config section_name is based on the class name    # the None singleton will be passed through to readconf as is    if section_name is '':#ObjectReplicator =>object-replicator        section_name = sub(r'([a-z])([A-Z])', r'/1-/2',                           klass.__name__).lower()    conf = utils.readconf(conf_file, section_name,                          log_name=kwargs.get('log_name'))    # once on command line (i.e. daemonize=false) will over-ride config    once = once or /            conf.get('daemonize', 'true').lower() not in utils.TRUE_VALUES    # pre-configure logger    if 'logger' in kwargs:        logger = kwargs.pop('logger')    else:        logger = utils.get_logger(conf, conf.get('log_name', section_name),           log_to_console=kwargs.pop('verbose', False), log_route=section_name)    # disable fallocate if desired    if conf.get('disable_fallocate', 'no').lower() in utils.TRUE_VALUES:        utils.disable_fallocate()    try:        klass(conf).run(once=once, **kwargs)#执行守护进程run方法,例如ObjectReplicator.run    except KeyboardInterrupt:        logger.info('User quit')  logger.info('Exited')

ObjectReplicator会继承模板Daemon ,执行它的方法run

  def run(self, once=False, **kwargs):          """Run the daemon"""          utils.validate_configuration()          utils.drop_privileges(self.conf.get('user', 'swift'))          utils.capture_stdio(self.logger, **kwargs)            def kill_children(*args):              signal.signal(signal.SIGTERM, signal.SIG_IGN)              os.killpg(0, signal.SIGTERM)              sys.exit()            signal.signal(signal.SIGTERM, kill_children)          if once:              self.run_once(**kwargs)          else:              self.run_forever(**kwargs)#通常会执行run_forever方法

run_forever方法


  def run_forever(self, *args, **kwargs):          self.logger.info(_("Starting object replicator in daemon mode."))          # Run the replicator continually          while True:              start = time.time()              self.logger.info(_("Starting object replication pass."))              # Run the replicator              self.replicate()#启动复制器              total = (time.time() - start) / 60              self.logger.info(                  _("Object replication complete. (%.02f minutes)"), total)              dump_recon_cache({'object_replication_time': total},                               self.rcache, self.logger)              self.logger.debug(_('Replication sleeping for %s seconds.'),                  self.run_pause)              sleep(self.run_pause)#一般是30秒

replicate方法 

这个方法主要实现的是 收集需要本地的所有partition,然后进行比对,然后执行相应的操作

  def replicate(self):          """Run a replication pass"""          self.start = time.time()          self.suffix_count = 0          self.suffix_sync = 0          self.suffix_hash = 0          self.replication_count = 0          self.last_replication_count = -1          self.partition_times = []          stats = eventlet.spawn(self.heartbeat)#心跳          lockup_detector = eventlet.spawn(self.detect_lockups)#发现死锁          eventlet.sleep()  # Give spawns a cycle          try:              self.run_pool = GreenPool(size=self.concurrency)              jobs = self.collect_jobs()#收集需要 rsync的parttition,node              for job in jobs:                  dev_path = join(self.devices_dir, job['device'])                  if self.mount_check and not os.path.ismount(dev_path):                      self.logger.warn(_('%s is not mounted'), job['device'])                      continue                  if not self.check_ring():#check_ring()                      self.logger.info(_("Ring change detected. Aborting "                              "current replication pass."))                      return                  if job['delete']:                      self.run_pool.spawn(self.update_deleted, job)#同步方式的选择                  else:                      self.run_pool.spawn(self.update, job)              with Timeout(self.lockup_timeout):                  self.run_pool.waitall()          except (Exception, Timeout):              self.logger.exception(_("Exception in top-level replication loop"))              self.kill_coros()          finally:              stats.kill()              lockup_detector.kill()              self.stats_line()

update()方法

进行比对的方法,其他的dev 发送REPLICATE请求。


def update(self, job):        """        High-level method that replicates a single partition.        :param job: a dict containing info about the partition to be replicated        """        self.replication_count += 1        self.logger.increment('partition.update.count.%s' % (job['device'],))        begin = time.time()        try:            hashed, local_hash = tpool_reraise(get_hashes, job['path'],                    do_listdir=(self.replication_count % 10) == 0,                    reclaim_age=self.reclaim_age)            self.suffix_hash += hashed            self.logger.update_stats('suffix.hashes', hashed)            attempts_left = len(job['nodes'])            nodes = itertools.chain(job['nodes'],                        self.object_ring.get_more_nodes(int(job['partition'])))            while attempts_left > 0:                # If this throws StopIterator it will be caught way below                node = next(nodes)                attempts_left -= 1                try:                    with Timeout(self.http_timeout):                        resp = http_connect(node['ip'], node['port'],#向其他的node发送REPLICATE请求,获得remote_hash                                node['device'], job['partition'], 'REPLICATE',                            '', headers={'Content-Length': '0'}).getresponse()                        if resp.status == HTTP_INSUFFICIENT_STORAGE:                            self.logger.error(_('%(ip)s/%(device)s responded'                                    ' as unmounted'), node)                            attempts_left += 1                            continue                        if resp.status != HTTP_OK:                            self.logger.error(_("Invalid response %(resp)s "                                "from %(ip)s"),                                {'resp': resp.status, 'ip': node['ip']})                            continue                        remote_hash = pickle.loads(resp.read())                        del resp                    suffixes = [suffix for suffix in local_hash if#设置suffixes如果与remote_hash不相同跳出本次循环                            local_hash[suffix] != remote_hash.get(suffix, -1)]                    if not suffixes:                        continue                    hashed, recalc_hash = tpool_reraise(get_hashes,                        job['path'], recalculate=suffixes,                        reclaim_age=self.reclaim_age)                    self.logger.update_stats('suffix.hashes', hashed)                    local_hash = recalc_hash                    suffixes = [suffix for suffix in local_hash if#程序到这里说明需要rsync,然后再发REPLICATE更新suffixes                            local_hash[suffix] != remote_hash.get(suffix, -1)]                    self.rsync(node, job, suffixes)#使用rsync来同步文件                    with Timeout(self.http_timeout):                        conn = http_connect(node['ip'], node['port'],                            node['device'], job['partition'], 'REPLICATE',                            '/' + '-'.join(suffixes),                            headers={'Content-Length': '0'})                        conn.getresponse().read()                    self.suffix_sync += len(suffixes)                    self.logger.update_stats('suffix.syncs', len(suffixes))                except (Exception, Timeout):                    self.logger.exception(_("Error syncing with node: %s") %                                            node)            self.suffix_count += len(local_hash)        except (Exception, Timeout):            self.logger.exception(_("Error syncing partition"))        finally:            self.partition_times.append(time.time() - begin)            self.logger.timing_since('partition.update.timing', begin)

REPLICATE方法

两个作用一个是,返回,本地的hash,另一个如果需要更新hash,就更新hashes.pkl文件

def REPLICATE(self, request):        """        Handle REPLICATE requests for the Swift Object Server.  This is used        by the object replicator to get hashes for directories.        """        start_time = time.time()        try:            device, partition, suffix = split_path(                unquote(request.path), 2, 3, True)            validate_device_partition(device, partition)        except ValueError, e:            self.logger.increment('REPLICATE.errors')            return HTTPBadRequest(body=str(e), request=request,                                  content_type='text/plain')        if self.mount_check and not check_mount(self.devices, device):            self.logger.increment('REPLICATE.errors')            return HTTPInsufficientStorage(drive=device, request=request)        path = os.path.join(self.devices, device, DATADIR, partition)        if not os.path.exists(path):            mkdirs(path)        suffixes = suffix.split('-') if suffix else []        _junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes)        self.logger.timing_since('REPLICATE.timing', start_time)        return Response(body=pickle.dumps(hashes))

get_hashes方法

获取和更新hashes的方法,其实就是读取hashes.pkl和更新hashes.pkl

def get_hashes(partition_dir, recalculate=[], do_listdir=False,               reclaim_age=ONE_WEEK):    hashed = 0    hashes_file = join(partition_dir, HASH_FILE)    modified = False    hashes = {}    mtime = -1    try:        with open(hashes_file, 'rb') as fp:            hashes = pickle.load(fp)#打开hashes.pkl文件        mtime = os.path.getmtime(hashes_file)    except Exception:        do_listdir = True    if do_listdir:        for suff in os.listdir(partition_dir):            if len(suff) == 3 and isdir(join(partition_dir, suff)):                hashes.setdefault(suff, None)        modified = True    hashes.update((hash_, None) for hash_ in recalculate)    for suffix, hash_ in hashes.items():        if not hash_:            suffix_dir = join(partition_dir, suffix)            if isdir(suffix_dir):                try:                    hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)                    hashed += 1                except OSError:                    logging.exception(_('Error hashing suffix'))            else:                del hashes[suffix]            modified = True    if modified:#修改hashes.pkl        with lock_path(partition_dir):            if not os.path.exists(hashes_file) or /                        os.path.getmtime(hashes_file) == mtime:                write_pickle(                    hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)                return hashed, hashes        return get_hashes(partition_dir, recalculate, do_listdir,                          reclaim_age)    else:        return hashed, hashes


广告 广告

评论区