侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

【原创】OpenStack Swift源码分析(四)proxy服务响应

2022-06-12 星期日 / 0 评论 / 0 点赞 / 146 阅读 / 23998 字

服务启动后,用户就可以像proxy发送请求了,我们都知道proxy会转发请求到相应的object-server上,进行处理。一个请求来了以后主要的流程会通过鉴权,然后server.py中的handle

.

    服务启动后,用户就可以像proxy发送请求了,我们都知道proxy会转发请求到相应的object-server上,进行处理。一个请求来了以后主要的流程会通过鉴权,然后server.py中的handle_request()方法会判断鉴权结果,如果通过了鉴权,执行相应的请求,下面我们就通过一个PUT请求的例子(上传文件)来分析服务转发的过程。

服务转发模型


对于一个PUT请求,proxy判断是什么controlller(object,container,account),之后调用相应controllerPUT ,比如你PUT一个object,就会执行obj.py下的PUT方法。

PUT请求主要的功能大概分三个部分,第一部分,会解析请求,进行相应的设置,第二部分转发请求,第三部分,根据转发请求的到的响应,制作最终的响应,然后返回。

如图根据你要上传的objectaccount container object ,通过ring文件会返回三个node,

然后生成这个三个nodeconn,然后每个conn初始化一个Queue和一个线程函数,


然后程序会想管道中put数据,然后线程函数会get 数据,然后转发。

数据全部转发后,跳出put操作,接受响应,然后进入第三步。

源码:

Application.__call__():

def __call__(self, env, start_response):        """        WSGI entry point.        Wraps env in webob.Request object and passes it down.        :param env: WSGI environment dictionary        :param start_response: WSGI callable        """        try:            if self.memcache is None:                self.memcache = cache_from_env(env)#如果缓存为空,缓存env             req = self.update_request(Request(env))            return self.handle_request(req)(env, start_response)#请求处理函数        except UnicodeError:            err = HTTPPreconditionFailed(request=req, body='Invalid UTF8')            return err(env, start_response)        except (Exception, Timeout):            start_response('500 Server Error',                    [('Content-Type', 'text/plain')])            return ['Internal server error./n']

重点来了 handle_request()

def handle_request(self, req):        """        Entry point for proxy server.        Should return a WSGI-style callable (such as webob.Response).        :param req: webob.Request object        """        try:            self.logger.set_statsd_prefix('proxy-server')            if req.content_length and req.content_length < 0:#content_length存在倒是小于0 返回错误。                self.logger.increment('errors')                return HTTPBadRequest(request=req,                                      body='Invalid Content-Length')            try:                if not check_utf8(req.path_info):#是utf8返回错误                    self.logger.increment('errors')                    return HTTPPreconditionFailed(request=req,                                                  body='Invalid UTF8')            except UnicodeError:                self.logger.increment('errors')                return HTTPPreconditionFailed(request=req, body='Invalid UTF8')            try:                controller, path_parts = self.get_controller(req.path)#根据path获取相应的controller,path_parts                p = req.path_info                if isinstance(p, unicode):                    p = p.encode('utf-8')            except ValueError:                self.logger.increment('errors')                return HTTPNotFound(request=req)            if not controller:                self.logger.increment('errors')                return HTTPPreconditionFailed(request=req, body='Bad URL')            if self.deny_host_headers and /                    req.host.split(':')[0] in self.deny_host_headers:                return HTTPForbidden(request=req, body='Invalid host header')            self.logger.set_statsd_prefix('proxy-server.' +                                          controller.server_type.lower())            controller = controller(self, **path_parts)#实例化controller            if 'swift.trans_id' not in req.environ:                # if this wasn't set by an earlier middleware, set it now                trans_id = 'tx' + uuid.uuid4().hex                req.environ['swift.trans_id'] = trans_id                self.logger.txn_id = trans_id            req.headers['x-trans-id'] = req.environ['swift.trans_id']            controller.trans_id = req.environ['swift.trans_id']            self.logger.client_ip = get_remote_client(req)            try:                handler = getattr(controller, req.method)#获取处理方法句柄                getattr(handler, 'publicly_accessible')            except AttributeError:                return HTTPMethodNotAllowed(request=req)            if path_parts['version']:                req.path_info_pop()            if 'swift.authorize' in req.environ:#swift.authrize是swift_auth提供的句柄,                # We call authorize before the handler, always. If authorized,                # we remove the swift.authorize hook so isn't ever called                # again. If not authorized, we return the denial unless the                # controller's method indicates it'd like to gather more                # information and try again later.                resp = req.environ['swift.authorize'](req)#如果存在这个句柄调用这个方法                if not resp:                    # No resp means authorized, no delayed recheck required.                    del req.environ['swift.authorize']#如果鉴权成功,删除环境句柄                else:                    # Response indicates denial, but we might delay the denial                    # and recheck later. If not delayed, return the error now.                    if not getattr(handler, 'delay_denial', None):                        return resp            # Save off original request method (GET, POST, etc.) in case it            # gets mutated during handling.  This way logging can display the            # method the client actually sent.            req.environ['swift.orig_req_method'] = req.method            return handler(req)#执行方法        except (Exception, Timeout):            self.logger.exception(_('ERROR Unhandled exception in request'))            return HTTPServerError(request=req)
.

最终执行obj.py中的PUT方法。

其中_send_file是转发线程函数,_connect_put_node()方法建立连接。

def _send_file(self, conn, path):        """Method for a file PUT coro"""        while True:            chunk = conn.queue.get()#从队列里取            if not conn.failed:                try:                    with ChunkWriteTimeout(self.app.node_timeout):                        conn.send(chunk)#发送                except (Exception, ChunkWriteTimeout):                    conn.failed = True                    self.exception_occurred(conn.node, _('Object'),                        _('Trying to write to %s') % path)            conn.queue.task_done()#一次取任务的结束    def _connect_put_node(self, nodes, part, path, headers,                          logger_thread_locals):        """Method for a file PUT connect"""        self.app.logger.thread_locals = logger_thread_locals        for node in nodes:            try:                with ConnectionTimeout(self.app.conn_timeout):                    conn = http_connect(node['ip'], node['port'],                            node['device'], part, 'PUT', path, headers)#与相应的node建立连接                with Timeout(self.app.node_timeout):                    resp = conn.getexpect()#?                if resp.status == HTTP_CONTINUE:                    conn.node = node                    return conn#返回连接                elif resp.status == HTTP_INSUFFICIENT_STORAGE:                    self.error_limit(node)            except:                self.exception_occurred(node, _('Object'),                    _('Expect: 100-continue on %s') % path)    @public    @delay_denial    def PUT(self, req):@PUT方法        """HTTP PUT request handler."""        (container_partition, containers, _junk, req.acl,         req.environ['swift_sync_key'], object_versions) = /            self.container_info(self.account_name, self.container_name,                account_autocreate=self.app.account_autocreate)#获取相关的container,account信息。        if 'swift.authorize' in req.environ:#????????????????            aresp = req.environ['swift.authorize'](req)            if aresp:                return aresp        if not containers:#没有相应的container返回NOTFOUND            return HTTPNotFound(request=req)        if 'x-delete-after' in req.headers:#设置object的有效期限            try:                x_delete_after = int(req.headers['x-delete-after'])            except ValueError:                    return HTTPBadRequest(request=req,                                          content_type='text/plain',                                          body='Non-integer X-Delete-After')            req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)        if 'x-delete-at' in req.headers:            try:                x_delete_at = int(req.headers['x-delete-at'])                if x_delete_at < time.time():                    return HTTPBadRequest(body='X-Delete-At in past',                        request=req, content_type='text/plain')            except ValueError:                return HTTPBadRequest(request=req, content_type='text/plain',                                      body='Non-integer X-Delete-At')            delete_at_container = str(x_delete_at /                self.app.expiring_objects_container_divisor *                self.app.expiring_objects_container_divisor)            delete_at_part, delete_at_nodes = /                self.app.container_ring.get_nodes(                    self.app.expiring_objects_account, delete_at_container)        else:            delete_at_part = delete_at_nodes = None        partition, nodes = self.app.object_ring.get_nodes(#从ring文件中找出相应的partition,node。            self.account_name, self.container_name, self.object_name)        # do a HEAD request for container sync and checking object versions#时间戳        if 'x-timestamp' in req.headers or (object_versions and not                                    req.environ.get('swift_versioned_copy')):            hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},                                 environ={'REQUEST_METHOD': 'HEAD'})            hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,                hreq.path_info, len(nodes))        # Used by container sync feature        if 'x-timestamp' in req.headers:            try:                req.headers['X-Timestamp'] = /                    normalize_timestamp(float(req.headers['x-timestamp']))                if hresp.environ and 'swift_x_timestamp' in hresp.environ and /                    float(hresp.environ['swift_x_timestamp']) >= /                        float(req.headers['x-timestamp']):                    return HTTPAccepted(request=req)            except ValueError:                return HTTPBadRequest(request=req, content_type='text/plain',                    body='X-Timestamp should be a UNIX timestamp float value; '                         'was %r' % req.headers['x-timestamp'])        else:            req.headers['X-Timestamp'] = normalize_timestamp(time.time())        # Sometimes the 'content-type' header exists, but is set to None.        content_type_manually_set = True        if not req.headers.get('content-type'):            guessed_type, _junk = mimetypes.guess_type(req.path_info)            req.headers['Content-Type'] = guessed_type or /                                                'application/octet-stream'            content_type_manually_set = False        error_response = check_object_creation(req, self.object_name)        if error_response:            return error_response        if object_versions and not req.environ.get('swift_versioned_copy'):            is_manifest = 'x-object-manifest' in req.headers or /                          'x-object-manifest' in hresp.headers            if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:#判断是否需要调用COPY                # This is a version manifest and needs to be handled                # differently. First copy the existing data to a new object,                # then write the data from this request to the version manifest                # object.                lcontainer = object_versions.split('/')[0]                prefix_len = '%03x' % len(self.object_name)                lprefix = prefix_len + self.object_name + '/'                ts_source = hresp.environ.get('swift_x_timestamp')                if ts_source is None:                    ts_source = time.mktime(time.strptime(                                            hresp.headers['last-modified'],                                            '%a, %d %b %Y %H:%M:%S GMT'))                new_ts = normalize_timestamp(ts_source)                vers_obj_name = lprefix + new_ts                copy_headers = {                    'Destination': '%s/%s' % (lcontainer, vers_obj_name)}                copy_environ = {'REQUEST_METHOD': 'COPY',                                'swift_versioned_copy': True                               }                copy_req = Request.blank(req.path_info, headers=copy_headers,                                environ=copy_environ)                copy_resp = self.COPY(copy_req)                if is_client_error(copy_resp.status_int):                    # missing container or bad permissions                    return HTTPPreconditionFailed(request=req)                elif not is_success(copy_resp.status_int):                    # could not copy the data, bail                    return HTTPServiceUnavailable(request=req)        reader = req.environ['wsgi.input'].read        data_source = iter(lambda: reader(self.app.client_chunk_size), '')        source_header = req.headers.get('X-Copy-From')        source_resp = None        if source_header:#如果请求为从COPY-From            source_header = unquote(source_header)            acct = req.path_info.split('/', 2)[1]            if isinstance(acct, unicode):                acct = acct.encode('utf-8')            if not source_header.startswith('/'):                source_header = '/' + source_header            source_header = '/' + acct + source_header            try:                src_container_name, src_obj_name = /                    source_header.split('/', 3)[2:]            except ValueError:                return HTTPPreconditionFailed(request=req,                    body='X-Copy-From header must be of the form'                    '<container name>/<object name>')            source_req = req.copy_get()            source_req.path_info = source_header            source_req.headers['X-Newest'] = 'true'            orig_obj_name = self.object_name            orig_container_name = self.container_name            self.object_name = src_obj_name            self.container_name = src_container_name            source_resp = self.GET(source_req)            if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:                return source_resp            self.object_name = orig_obj_name            self.container_name = orig_container_name            new_req = Request.blank(req.path_info,                        environ=req.environ, headers=req.headers)            data_source = source_resp.app_iter            new_req.content_length = source_resp.content_length            if new_req.content_length is None:                # This indicates a transfer-encoding: chunked source object,                # which currently only happens because there are more than                # CONTAINER_LISTING_LIMIT segments in a segmented object. In                # this case, we're going to refuse to do the server-side copy.                return HTTPRequestEntityTooLarge(request=req)            new_req.etag = source_resp.etag            # we no longer need the X-Copy-From header            del new_req.headers['X-Copy-From']            if not content_type_manually_set:                new_req.headers['Content-Type'] = /                    source_resp.headers['Content-Type']            if new_req.headers.get('x-fresh-metadata', 'false').lower() /                    not in TRUE_VALUES:                for k, v in source_resp.headers.items():                    if k.lower().startswith('x-object-meta-'):                        new_req.headers[k] = v                for k, v in req.headers.items():                    if k.lower().startswith('x-object-meta-'):                        new_req.headers[k] = v            req = new_req        node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)        pile = GreenPile(len(nodes))        for container in containers:            nheaders = dict(req.headers.iteritems())            nheaders['Connection'] = 'close'            nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container            nheaders['X-Container-Partition'] = container_partition            nheaders['X-Container-Device'] = container['device']            nheaders['Expect'] = '100-continue'            if delete_at_nodes:                node = delete_at_nodes.pop(0)                nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node                nheaders['X-Delete-At-Partition'] = delete_at_part                nheaders['X-Delete-At-Device'] = node['device']            pile.spawn(self._connect_put_node, node_iter, partition,                       req.path_info, nheaders, self.app.logger.thread_locals)        conns = [conn for conn in pile if conn]        if len(conns) <= len(nodes) / 2:            self.app.logger.error(                _('Object PUT returning 503, %(conns)s/%(nodes)s '                'required connections'),                {'conns': len(conns), 'nodes': len(nodes) // 2 + 1})            return HTTPServiceUnavailable(request=req)        chunked = req.headers.get('transfer-encoding')        bytes_transferred = 0        try:            with ContextPool(len(nodes)) as pool:#如果就是上传一个新的object,最前的初始化,和逻辑操作做好后,下面是真正的转发请求,使用queue                for conn in conns:                    conn.failed = False                    conn.queue = Queue(self.app.put_queue_depth)                    pool.spawn(self._send_file, conn, req.path)                while True:                    with ChunkReadTimeout(self.app.client_timeout):                        try:                            chunk = next(data_source)                        except StopIteration:                            if chunked:                                [conn.queue.put('0/r/n/r/n') for conn in conns]                            break                    bytes_transferred += len(chunk)                    if bytes_transferred > MAX_FILE_SIZE:                        return HTTPRequestEntityTooLarge(request=req)                    for conn in list(conns):                        if not conn.failed:                            conn.queue.put('%x/r/n%s/r/n' % (len(chunk), chunk)                                            if chunked else chunk)                        else:                            conns.remove(conn)                    if len(conns) <= len(nodes) / 2:                        self.app.logger.error(_('Object PUT exceptions during'                            ' send, %(conns)s/%(nodes)s required connections'),                            {'conns': len(conns), 'nodes': len(nodes) / 2 + 1})                        return HTTPServiceUnavailable(request=req)                for conn in conns:                    if conn.queue.unfinished_tasks:                        conn.queue.join()            conns = [conn for conn in conns if not conn.failed]        except ChunkReadTimeout, err:            self.app.logger.warn(                _('ERROR Client read timeout (%ss)'), err.seconds)            self.app.logger.increment('client_timeouts')            return HTTPRequestTimeout(request=req)        except (Exception, Timeout):            self.app.logger.exception(                _('ERROR Exception causing client disconnect'))            return HTTPClientDisconnect(request=req)        if req.content_length and bytes_transferred < req.content_length:            req.client_disconnect = True            self.app.logger.warn(                _('Client disconnected without sending enough data'))            self.app.logger.increment('client_disconnects')            return HTTPClientDisconnect(request=req)        statuses = []#后面的操作,生成响应,然后返回。        reasons = []        bodies = []        etags = set()        for conn in conns:            try:                with Timeout(self.app.node_timeout):                    response = conn.getresponse()                    statuses.append(response.status)                    reasons.append(response.reason)                    bodies.append(response.read())                    if response.status >= HTTP_INTERNAL_SERVER_ERROR:                        self.error_occurred(conn.node,                            _('ERROR %(status)d %(body)s From Object Server ' /                            're: %(path)s') % {'status': response.status,                            'body': bodies[-1][:1024], 'path': req.path})                    elif is_success(response.status):                        etags.add(response.getheader('etag').strip('"'))            except (Exception, Timeout):                self.exception_occurred(conn.node, _('Object'),                    _('Trying to get final status of PUT to %s') % req.path)        if len(etags) > 1:            self.app.logger.error(                _('Object servers returned %s mismatched etags'), len(etags))            return HTTPServerError(request=req)        etag = len(etags) and etags.pop() or None        while len(statuses) < len(nodes):            statuses.append(HTTP_SERVICE_UNAVAILABLE)            reasons.append('')            bodies.append('')        resp = self.best_response(req, statuses, reasons, bodies,                    _('Object PUT'), etag=etag)        if source_header:            resp.headers['X-Copied-From'] = quote(                                                source_header.split('/', 2)[2])            if 'last-modified' in source_resp.headers:                resp.headers['X-Copied-From-Last-Modified'] = /                    source_resp.headers['last-modified']            for k, v in req.headers.items():                if k.lower().startswith('x-object-meta-'):                    resp.headers[k] = v        resp.last_modified = float(req.headers['X-Timestamp'])        return resp

广告 广告

评论区