欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 明星 > 一文读懂flask--gunicorn是如何启动flask应用

一文读懂flask--gunicorn是如何启动flask应用

2024/10/24 5:22:35 来源:https://blog.csdn.net/weixin_42143049/article/details/141642459  浏览:    关键词:一文读懂flask--gunicorn是如何启动flask应用

一文读懂flask–gunicorn是如何启动flask应用

1.gunicorn是如何启动flask应用的

在了解了开发模式下flask是如何启动,并且是如何监听请求,在收到请求后又是怎么处理请求之后,接下来进一步了解在生产环境中最常用的gunicorn是如何启动、如何监听请求,并且保证一个请求只会被一个worker处理,而不会被多个worker处理,以及woker又是如何将请求分发到flask应用中的,同时这里也会提供一个简单的示例演示如何为每一个woker添加ros的初始化节点用以保证对外暴露的接口能够正常通过ros服务进行通信

1.1 gunicorn启动过程

gunicorn中入口方法是gunicorn.app.base:Application类的run方法,这里主要分析如何通过api的方式启动,不对命令行方式做解读,以下是定义一个自定义的Flask应用,并通过继承gunicorn.app.base:BaseApplication的方式拓展gunicorn的启动方式,将通过命令行的方式修改为何执行普通python监本一样的方式通过gunicorn启动我们的应用,以下是一个简单的代码示例:

class CustomWorker(SyncWorker):worker_count = 0  # 记录初始化次数,给woker节点编号def __init__(self, *args, **kwargs):super(CustomWorker, self).__init__(*args, **kwargs)self.__class__.worker_count += 1@classmethoddef get_worker_count(cls):return cls.worker_countdef init_process(self):if not rospy.core.is_initialized():  # 判断当前woker是否已经初始化过节点node_name = f'dispatch_interface_services_worker{self.get_worker_count()}'  # 使用worker编号给节点命名rospy.init_node(node_name)logger.info(f"ROS Node {node_name} initialized in worker {os.getpid()}")super(CustomWorker, self).init_process()class RobotApp(Flask):def __init__(self, import_name):super(RobotApp, self).__init__(import_name)self.setup_blueprints()@staticmethoddef check():return Response(None)def setup_blueprints(self):"""这个方法主要用来注册路由和视图"""......class Application(BaseApplication):def __init__(self, service_name, options=None):“”“options: options = {'bind': f"127.0.0.1:8888",  # 绑定地址和端口'workers': 4,  # 指定 workers 数量'accesslog': "-",  # 输出到标准输出,取消gunicorn接管日志输出'worker_class': 'CustomWorker', 自定义worker,在这里初始化ros节点,包路径,需要从工作目录到这个类所在的完整路径'timeout': 300}”“”self.options = options or {}super(Application, self).__init__()self.application = Noneself.logger = logger  # 自定义一直输出,如果项目中已经使用了其他的日志框架,而不想让gunicorn接管日志输出,需要将日志框架的对象指定给这里def load_config(self):config = {key: value for key, value in iteritems(self.options)if key in self.cfg.settings and value is not None}for key, value in iteritems(config):self.cfg.set(key.lower(), value)def load(self):return RobotApp(__name__)if __name__ == '__main__':app = Application("dispatch_interface_services", options={"bind": "0.0.0.0:5000", "workers": 1'worker_class': 'CustomWorker'})app.run()

以上是一个简单的实例,演示了如何通过gunicornapi的方式开发,并提供了通过自定义worker的方式为每个worker启动的时候添加自己的逻辑,这里主要是为了给每个worker初始化一个固定名称的ros节点,对于Flask应用来说是没有任何改动,只是在启动gunicorn的时候换了一种更加灵活和方便的方式,接下来就看一下gunicorn又是如何启动的。

在上述代码中,可以看到自定义的Application类是继承了gunicorn.app.base.BaseApplication类,然后直接通过调用run方法进行启动的,那么就从这个方法开始,看一下启动过程都有哪些步骤,下面这段代码是BaseApplicationrun方法的具体实现:

class BaseApplication(object):"""An application interface for configuring and loadingthe various necessities for any given web framework."""......def run(self):try:Arbiter(self).run()except RuntimeError as e:print("\nError: %s\n" % e, file=sys.stderr)sys.stderr.flush()sys.exit(1)class Arbiter(object):"""Arbiter maintain the workers processes alive. It launches orkills them if needed. It also manages application reloadingvia SIGHUP/USR2."""......def run(self):"Main master loop."self.start() # 调用start方法,初始化配置信息,创建相关文件描述符,注册signal信号,创建socket等操作util._setproctitle("master [%s]" % self.proc_name)try:self.manage_workers()  # 根据配置的worker数管理woker,少了则创建,多了会销毁while True:  # 开启无限循环监听注册的信号事件,并做出相应处理self.maybe_promote_master()sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else Noneif sig is None:self.sleep()self.murder_workers()self.manage_workers()continueif sig not in self.SIG_NAMES:self.log.info("Ignoring unknown signal: %s", sig)continuesigname = self.SIG_NAMES.get(sig)handler = getattr(self, "handle_%s" % signame, None)if not handler:self.log.error("Unhandled signal: %s", signame)continueself.log.info("Handling signal: %s", signame)handler()self.wakeup()except (StopIteration, KeyboardInterrupt):self.halt()except HaltServer as inst:self.halt(reason=inst.reason, exit_status=inst.exit_status)except SystemExit:raiseexcept Exception:self.log.info("Unhandled exception in main loop",exc_info=True)self.stop(False)if self.pidfile is not None:self.pidfile.unlink()sys.exit(-1)

在这个run方法中主要通过以下几个步骤来启动处理请求的worker,并对worker进行管理:

  • 调用start方法,初始化配置信息,创建相关文件描述符,注册signal信号,创建socket等操作
  • 根据配置的worker数管理woker,少了则创建,多了会销毁,保持worker数和配置的数量保持一致
  • 开启无限循环监听注册的信号事件,并做出相应处理

解下来看一下又是怎么创建一个新的worker和对其进行管理的:

class BaseApplication(object):"""An application interface for configuring and loadingthe various necessities for any given web framework."""......def manage_workers(self):"""\Maintain the number of workers by spawning or killingas required."""if len(self.WORKERS) < self.num_workers: # 判断当前worker数量self.spawn_workers() # 创建新的workerworkers = self.WORKERS.items()workers = sorted(workers, key=lambda w: w[1].age)while len(workers) > self.num_workers:(pid, _) = workers.pop(0)self.kill_worker(pid, signal.SIGTERM)  # 销毁已经存在的workeractive_worker_count = len(workers)if self._last_logged_active_worker_count != active_worker_count:self._last_logged_active_worker_count = active_worker_countself.log.debug("{0} workers".format(active_worker_count),extra={"metric": "gunicorn.workers","value": active_worker_count,"mtype": "gauge"})def spawn_worker(self):self.worker_age += 1worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,self.app, self.timeout / 2.0,self.cfg, self.log)  # 调用指定的worker类,实例化workerself.cfg.pre_fork(self, worker)pid = os.fork()if pid != 0:worker.pid = pidself.WORKERS[pid] = workerreturn pid# Do not inherit the temporary files of other workersfor sibling in self.WORKERS.values():sibling.tmp.close()# Process Childworker.pid = os.getpid()try:util._setproctitle("worker [%s]" % self.proc_name)self.log.info("Booting worker with pid: %s", worker.pid)self.cfg.post_fork(self, worker)worker.init_process()  # 调用worker的方法,我这里在这个方法为worker自定义了初始化ros节点,也可根据需要添加其他逻辑,sys.exit(0)except SystemExit:raiseexcept AppImportError as e:self.log.debug("Exception while loading the application",exc_info=True)print("%s" % e, file=sys.stderr)sys.stderr.flush()sys.exit(self.APP_LOAD_ERROR)except Exception:self.log.exception("Exception in worker process")if not worker.booted:sys.exit(self.WORKER_BOOT_ERROR)sys.exit(-1)finally:self.log.info("Worker exiting (pid: %s)", worker.pid)try:worker.tmp.close()self.cfg.worker_exit(self, worker)except Exception:self.log.warning("Exception during worker exit:\n%s",traceback.format_exc())def spawn_workers(self):"""\Spawn new workers as needed.This is where a worker process leaves the main loopof the master process."""for _ in range(self.num_workers - len(self.WORKERS)):  # 遍历配置的worker数量,创建指定数量的workerself.spawn_worker()time.sleep(0.1 * random.random())

在这里通过执行以下几个步骤,来完成对worker的初始化,以及加载相关配置信息,按照指定的worker数量创建worker

  • 判断当前worker数量, 小于指定的worker数,则创建新的worker
  • 遍历配置的worker数量,创建指定数量的worker
  • 调用指定的worker类,实例化worker
  • 调用worker的方法,我这里在这个方法为worker自定义了初始化ros节点,也可根据需要添加其他逻辑

1.2 处理请求过程

在上述的创建worker的过程中,会为每一个worker分配监听的文件描述符、ros节点信息、并开启请求监听,到这里启动worker的过程就已经结束,接下来主要看一下是怎么为worker分配socket描述符以及如何开启监听的,又是怎么做到只有一个一个请求只会被一个worker处理的,下面是worker的初始化过程,在这里也有一个简单的继承关系如下,在前面我自定义的worker是继承gunicorn.workers.sync:SyncWorker,完整的继承关系如下

CustomWorker -> gunicorn.workers.sync.SyncWorker -> gunicorn.workers.base.Worker

而在前面调用的init_process方法除了在自定义CustomWorker中实现外,便是在gunicorn.workers.base.Worker中有实现,而在CustomWorker中只是添加了我自己的逻辑,最终还是调用了父类的这个方法,所以这里直接看gunicorn.workers.base.Worker中的实现,以下是部分源码

class Worker(object):......def init_process(self):"""\If you override this method in a subclass, the last statementin the function should be to call this method withsuper().init_process() so that the ``run()`` loop is initiated."""# set environment' variablesif self.cfg.env:for k, v in self.cfg.env.items():os.environ[k] = vutil.set_owner_process(self.cfg.uid, self.cfg.gid,initgroups=self.cfg.initgroups)  # 设置worker的拥有者,通过用户名配置,并配置用户组# Reseed the random number generatorutil.seed()# For waking ourselves upself.PIPE = os.pipe()  # 创建一个单向的管道for p in self.PIPE:util.set_non_blocking(p)util.close_on_exec(p)# Prevent fd inheritancefor s in self.sockets:util.close_on_exec(s)util.close_on_exec(self.tmp.fileno())self.wait_fds = self.sockets + [self.PIPE[0]]  # 指定监听的描述符信息,包括上述的管道和之前创建的网络socketself.log.close_on_exec()self.init_signals()# start the reloaderif self.cfg.reload:def changed(fname):self.log.info("Worker reloading: %s modified", fname)self.alive = Falseos.write(self.PIPE[1], b"1")self.cfg.worker_int(self)time.sleep(0.1)sys.exit(0)reloader_cls = reloader_engines[self.cfg.reload_engine]self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,callback=changed)self.load_wsgi()  # 通过调用app的load方法加载Flask应用的实例,并赋值给self.wsgi,就不在这里展示具体代码,感兴趣的可以去看一下if self.reloader:self.reloader.start()self.cfg.post_worker_init(self)# Enter main run loopself.booted = Trueself.run()  # 开始请求监听

在这个方法中首先会给worker设置拥有者,通过用户名配置,并配置用户组信息,再创建一个单向的管道,指定监听的描述符信息,包括上述的管道和之前创建的网络socket,通过调用appload方法加载Flask应用的实例,并赋值给self.wsgi,最后开始请求监听,那么接着看一下gunicorn是如何实现请求监听,并在收到请求是处理请求的

class SyncWorker(base.Worker):......def run_for_multiple(self, timeout):while self.alive:self.notify()try:ready = self.wait(timeout)except StopWaiting:returnif ready is not None:for listener in ready:if listener == self.PIPE[0]:continuetry:self.accept(listener)  # 这里是通过调用操作系统提供的接口获取请求信息并进行处理,在这里保证了同一个收到的请求数据只会被一个worker获取并处理except EnvironmentError as e:if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,errno.EWOULDBLOCK):raiseif not self.is_parent_alive():returndef run(self):# if no timeout is given the worker will never wait and will# use the CPU for nothing. This minimal timeout prevent it.timeout = self.timeout or 0.5# self.socket appears to lose its blocking status after# we fork in the arbiter. Reset it here.for s in self.sockets:s.setblocking(0)if len(self.sockets) > 1:  # 如果指定了监听的地址,这里只会有一个,但是如果指定的是类似于0.0.0.0:8888的会监听主机上所有网卡的特定端口,会有多个socketself.run_for_multiple(timeout)else:self.run_for_one(timeout)......

这个方法里面比较简单,只是通过需要监听的sockets数量来开启不同的监听。如果指定了监听的地址,这里只会有一个,但是如果指定的是类似于0.0.0.0:8888的会监听主机上所有网卡的特定端口,会有多个socket,则需要监听多个socket,同时通过调用操作系统提供的接口获取请求信息并进行处理,在这里保证了同一个收到的请求数据只会被一个worker获取并处理

class SyncWorker(base.Worker):def accept(self, listener):client, addr = listener.accept()  # 获取客户端连接的文件描述符,以及客户端地址client.setblocking(1)util.close_on_exec(client)self.handle(listener, client, addr)......def handle(self, listener, client, addr):req = Nonetry:if self.cfg.is_ssl:client = ssl.wrap_socket(client, server_side=True,**self.cfg.ssl_options)parser = http.RequestParser(self.cfg, client, addr)  # 指定请求解析类req = next(parser)  # 通过调用指定的解析类的mesg_class属性的Request类来进行请求解析self.handle_request(listener, req, client, addr)  # 开始处理请求except http.errors.NoMoreData as e:self.log.debug("Ignored premature client disconnection. %s", e)except StopIteration as e:self.log.debug("Closing connection. %s", e)except ssl.SSLError as e:if e.args[0] == ssl.SSL_ERROR_EOF:self.log.debug("ssl connection closed")client.close()else:self.log.debug("Error processing SSL request.")self.handle_error(req, client, addr, e)except EnvironmentError as e:if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):self.log.exception("Socket error processing request.")else:if e.errno == errno.ECONNRESET:self.log.debug("Ignoring connection reset")elif e.errno == errno.ENOTCONN:self.log.debug("Ignoring socket not connected")else:self.log.debug("Ignoring EPIPE")except Exception as e:self.handle_error(req, client, addr, e)finally:util.close(client)

在这里做了以下几个步骤获取到请求信息,并开始处理请求的

  • 获取客户端连接的文件描述符,以及客户端地址
  • 指定请求解析类
  • 通过调用指定的解析类的mesg_class属性的Request类来进行请求解析
  • 开始处理请求

到这里便已经完成了请求数据的读取以及解析为python中的对象,接下来便是请求的处理

class SyncWorker(base.Worker):......def handle_request(self, listener, req, client, addr):environ = {}resp = Nonetry:self.cfg.pre_request(self, req)  # 执行自定义的开始处理请求前的操作,比如打印日志,添加记录或者给请求添加某个属性等request_start = datetime.now()resp, environ = wsgi.create(req, client, addr,listener.getsockname(), self.cfg)# Force the connection closed until someone shows# a buffering proxy that supports Keep-Alive to# the backend.resp.force_close()self.nr += 1if self.nr >= self.max_requests:self.log.info("Autorestarting worker after current request.")self.alive = Falserespiter = self.wsgi(environ, resp.start_response)  # 这里便是前面提到的Flask应用的实例,通过调用这个实例的__call__方法处理请求try:if isinstance(respiter, environ['wsgi.file_wrapper']):resp.write_file(respiter)else:for item in respiter:resp.write(item)resp.close()request_time = datetime.now() - request_startself.log.access(resp, req, environ, request_time)finally:if hasattr(respiter, "close"):respiter.close()except EnvironmentError:# pass to next try-except levelutil.reraise(*sys.exc_info())except Exception:if resp and resp.headers_sent:# If the requests have already been sent, we should close the# connection to indicate the error.self.log.exception("Error handling request")try:client.shutdown(socket.SHUT_RDWR)client.close()except EnvironmentError:passraise StopIteration()raisefinally:try:self.cfg.post_request(self, req, environ, resp)  # 执行自定义的请求结束后的操作,可以是删除某个属性、回收某个资源等操作except Exception:self.log.exception("Exception in post_request hook")

这个方法里面主要是先执行请求开始前的钩子函数,以及调用Flask应用的__call__方法处理请求。过程和之前的文章一样,这里就不在赘述,之后请求处理完成后调用请求后的钩子函数,到这里就了解完了gunicorn是如何启动Flask项目,以及是如何监听请求,又是如何分发请求的

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com