侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

【原创】OpenStack Swift源码分析(八)Updater && Auditor服务

2022-06-12 星期日 / 0 评论 / 0 点赞 / 133 阅读 / 11142 字

Updater服务,用来帮助更新失败的数据,例如上传一个文件后,object-server会向container-server发送请求通知该container新添加了一个object, contain

    Updater服务,用来帮助更新失败的数据,例如上传一个文件后,object-server会向container-server发送请求通知该container新添加了一个object, container生成相应的数据后,会向account-server发送请求通知该account新添加了一个container。但是当系统出现网络中断,或者驱动器失败等情况时,通知更新的请求可能出现失败,这个时候我们把失败的请求保存到async_pending/目录下,然后Updater服务会处理这些请求。

    Auditor服务,用来当一个partition发生了bit rot(例如,字节的丢失),服务会认为这样的partition为损坏的,然后把该partition隔离到隔离目录中,然后,这样Replicator会自动帮我们从其他的dev上复制一个完好无损的partition过来。

Updater.py中的run_forever方法

 def run_forever(self, *args, **kwargs):        """Run the updater continuously."""        time.sleep(random() * self.interval)        while True:            self.logger.info(_('Begin object update sweep'))            begin = time.time()            pids = []            # read from container ring to ensure it's fresh            self.get_container_ring().get_nodes('')#主要是调用RIng中的_reload方法确保目前的系统环境是最新的。            for device in os.listdir(self.devices):                if self.mount_check and not /#检查挂载                        os.path.ismount(os.path.join(self.devices, device)):                    self.logger.increment('errors')                    self.logger.warn(                        _('Skipping %s as it is not mounted'), device)                    continue                while len(pids) >= self.concurrency:                    pids.remove(os.wait()[0])                pid = os.fork()                if pid:                    pids.append(pid)                else:                    signal.signal(signal.SIGTERM, signal.SIG_DFL)                    patcher.monkey_patch(all=False, socket=True)                    self.successes = 0                    self.failures = 0                    forkbegin = time.time()                    self.object_sweep(os.path.join(self.devices, device))#扫描object                    elapsed = time.time() - forkbegin                    self.logger.info(_('Object update sweep of %(device)s'                        ' completed: %(elapsed).02fs, %(success)s successes'                        ', %(fail)s failures'),                        {'device': device, 'elapsed': elapsed,                         'success': self.successes, 'fail': self.failures})                    sys.exit()            while pids:                pids.remove(os.wait()[0])            elapsed = time.time() - begin            self.logger.info(_('Object update sweep completed: %.02fs'),                    elapsed)            dump_recon_cache({'object_updater_sweep': elapsed},                             self.rcache, self.logger)            if elapsed < self.interval:                time.sleep(self.interval - elapsed)


object-sweep方法

def object_sweep(self, device):        """        If there are async pendings on the device, walk each one and update.        :param device: path to device        """        start_time = time.time()        async_pending = os.path.join(device, ASYNCDIR)        if not os.path.isdir(async_pending):            return        for prefix in os.listdir(async_pending):#扫描目录            prefix_path = os.path.join(async_pending, prefix)            if not os.path.isdir(prefix_path):                continue            last_obj_hash = None            for update in sorted(os.listdir(prefix_path), reverse=True):                update_path = os.path.join(prefix_path, update)                if not os.path.isfile(update_path):                    continue                try:                    obj_hash, timestamp = update.split('-')                except ValueError:                    self.logger.increment('errors')                    self.logger.error(                        _('ERROR async pending file with unexpected name %s')                        % (update_path))                    continue                if obj_hash == last_obj_hash:                    os.unlink(update_path)                else:                    self.process_object_update(update_path, device)#执行更行                    last_obj_hash = obj_hash                time.sleep(self.slowdown)            try:                os.rmdir(prefix_path)            except OSError:                pass        self.logger.timing_since('timing', start_time)

process_object_update方法

def process_object_update(self, update_path, device):        try:            update = pickle.load(open(update_path, 'rb'))#打开文件        except Exception:            self.logger.exception(                _('ERROR Pickle problem, quarantining %s'), update_path)            self.logger.increment('quarantines')            renamer(update_path, os.path.join(device,                'quarantined', 'objects', os.path.basename(update_path)))            return        successes = update.get('successes', [])        part, nodes = self.get_container_ring().get_nodes(#读取相应的信息                                update['account'], update['container'])        obj = '/%s/%s/%s' % /              (update['account'], update['container'], update['obj'])        success = True        new_successes = False        for node in nodes:            if node['id'] not in successes:                status = self.object_update(node, part, update['op'], obj,#进行更新                                        update['headers'])                if not is_success(status) and status != HTTP_NOT_FOUND:                    success = False                else:                    successes.append(node['id'])                    new_successes = True        if success:            self.successes += 1            self.logger.increment('successes')            self.logger.debug(_('Update sent for %(obj)s %(path)s'),                {'obj': obj, 'path': update_path})            os.unlink(update_path)        else:            self.failures += 1            self.logger.increment('failures')            self.logger.debug(_('Update failed for %(obj)s %(path)s'),                {'obj': obj, 'path': update_path})            if new_successes:                update['successes'] = successes                write_pickle(update, update_path, os.path.join(device, 'tmp'))

Auditor


auditor.py中的run_forever和run_once

def run_forever(self, *args, **kwargs):        """Run the object audit until stopped."""        # zero byte only command line option        zbo_fps = kwargs.get('zero_byte_fps', 0)        if zbo_fps:            # only start parent            parent = True        else:            parent = os.fork()  # child gets parent = 0        kwargs = {'mode': 'forever'}#设置模式        if parent:            kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps        while True:            try:                self.run_once(**kwargs)#执行run_onec 方法            except (Exception, Timeout):                self.logger.exception(_('ERROR auditing'))            self._sleep()    def run_once(self, *args, **kwargs):        """Run the object audit once."""        mode = kwargs.get('mode', 'once')        zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)        worker = AuditorWorker(self.conf,                               zero_byte_only_at_fps=zero_byte_only_at_fps)        worker.audit_all_objects(mode=mode)#审计所有的object

其中audit_ll_objects扫描当前的所有partition然后如果有需要隔离的调用object_audit方法

def object_audit(self, path, device, partition):#对partition进行一些检查 如果有问题抛出相应的异常,如果抛AuditException这样的异常,说明partition出现问题,需要隔离 然后同步        """        Audits the given object path.        :param path: a path to an object        :param device: the device the path is on        :param partition: the partition the path is on        """        try:            if not path.endswith('.data'):                return            try:                name = object_server.read_metadata(path)['name']            except (Exception, Timeout), exc:                raise AuditException('Error when reading metadata: %s' % exc)            _junk, account, container, obj = name.split('/', 3)            df = object_server.DiskFile(self.devices, device, partition,                                        account, container, obj, self.logger,                                        keep_data_fp=True)            try:                if df.data_file is None:                    # file is deleted, we found the tombstone                    return                try:                    obj_size = df.get_data_file_size()                except DiskFileError, e:                    raise AuditException(str(e))                except DiskFileNotExist:                    return                if self.zero_byte_only_at_fps and obj_size:                    self.passes += 1                    return                for chunk in df:                    self.bytes_running_time = ratelimit_sleep(                        self.bytes_running_time, self.max_bytes_per_second,                        incr_by=len(chunk))                    self.bytes_processed += len(chunk)                    self.total_bytes_processed += len(chunk)                df.close()                if df.quarantined_dir:                    self.quarantines += 1                    self.logger.error(                        _("ERROR Object %(path)s failed audit and will be "                          "quarantined: ETag and file's md5 do not match"),                        {'path': path})            finally:                df.close(verify_file=False)        except AuditException, err:            self.logger.increment('quarantines')            self.quarantines += 1            self.logger.error(_('ERROR Object %(obj)s failed audit and will '                'be quarantined: %(err)s'), {'obj': path, 'err': err})            object_server.quarantine_renamer(#移动partition到quarantined目录下,然后replication会复制一个完整的拷贝过来                os.path.join(self.devices, device), path)            return        except (Exception, Timeout):            self.logger.increment('errors')            self.errors += 1            self.logger.exception(_('ERROR Trying to audit %s'), path)            return        self.passes += 1



广告 广告

评论区