flask部署apscheduler方案

[toc]

apscheduler部署方案

前言

下面所讲的apscheduler多进程,指的是使用flask_apscheduler与apscheduler结合起多个worker就起多个apscheduler,如果直接使用python自带的werkzeug启动或者单进程启动会少很多问题,这样的结果是并发量是有限制的。

gunicorn

apscheduler多进程

workers = 16

这种方案会导致起多个apscheduler,如果用锁,如文件锁控制只起一个的话,请求不落在启动的worker上则timer会处于pending状态

apscheduler多进程优化

workers = 16

--preload # 只在命令有效,如果配置在配置文件中不生效

这样实际上也只是起一个worker,而且gunicorn只能用prefork模式,这样对apscheduler会有影响。

总结

  • import multiprocessing
  • bind = '0.0.0.0:8203' #绑定的ip及端口号
  • #workers = multiprocessing.cpu_count() * 2 + 1 #进程数
  • workers = 16
  • backlog = 2048 #监听队列,允许挂起的链接数
  • worker_class = "eventlet"
  • debug = True
  • chdir = '/app/microservice/timer_service/timer_app/' #你项目的根目录,比如我的app.py文件在/home/ubuntu/app目录下,就填写'/home/ubuntu/app'
  • proc_name = 'gunicorn.proc'
  • daemon = True
  • pidfile = "/app/microservice/timer_service/deploy_config/gunicorn.pid"
  • reload=True
  • accesslog = "/app/microservice/timer_service/deploy_config/access.log"
  • errorlog = "/app/microservice/timer_service/deploy_config/error.log"
  • threads = 32

uwsgi

选择uwsgi一个很重要的原因是gunicorn只能是prefork模式,这种模式对apscheduler的影响后面会讲。

使用enable-threads启动,因为apscheduler放在后台的话,是以线程启动的

apscheduler多进程

不支持即便配置lazy-apps=true,现象与gunicorn.apscheduler多进程 一样

总结

  • [uwsgi]
  • http=:8203
  • projectpath = /app/microservice/timer_service
  • pythonpath = %(projectpath)/timer_app
  • wsgi-file = run.py
  • module = run
  • callable = app
  • processes = 8
  • threads = 64
  • enable-threads = true
  • preload=True
  • lazy-apps=true
  • master = false
  • pidfile = %(projectpath)/deploy_config/pid_%n.pid
  • #mule=init.py
  • #daemonize = %(projectpath)/logs/uwsgi_log.log

解决方案

问题

gunicorn由于prefork会导致暂停至超时再次恢复,会出现作业依然在,虽然不执行,原因是下面函数没有执行,理论上暂停再次恢复的流程是先修改时间job._modify(**changes),再跑一次循环self.wakeup(),而wakeup的作用是在下面函数中执行一遍,但是出现下面问题:

  • gunicorn的prefork模式如上述操作中没有进入到下面函数,只有进入_process_jobs才可能Miss;
  • 重新启动应用的时候进入了,就是说再次启动这个作业会Miss;

未找到原因(有可能是由于prefork认为_main_loop是个阻塞的操作线程不切换?)

  • # .venv/lib/python3.6/site-packages/apscheduler/blocking.py
  • def _main_loop(self):
  • wait_seconds = TIMEOUT_MAX
  • while self.state != STATE_STOPPED:
  • self._event.wait(wait_seconds)
  • self._event.clear()
  • wait_seconds = self._process_jobs()

解决

  • # 执行时间过久
  • Scheduler.print_jobs()

使用uwsgi mule模块,分离web和apscheduler,通过grpc调用即可

note

这种创建方式貌似更好

  • app = create_app(config=ProductionConfig())
  • def job_listener(event):
  • get_ = "msg from job '%s'" % (event.job)
  • logging.info(get_)
  • # This code below never gets invoked when I check with worker_id() == 1
  • # The only time it is run is with worker_id() value of 0
  • app.sched = Scheduler()
  • app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
  • app.sched.add_listener(job_listener,
  • events.EVENT_JOB_EXECUTED |
  • events.EVENT_JOB_MISSED |
  • events.EVENT_JOB_ERROR)
  • app.sched.start()

参考

https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/RPC.html

http://cn.voidcc.com/question/p-spijgpdn-bod.html

https://www.cnblogs.com/zhangliang91/p/11603916.html https://segmentfault.com/q/1010000010169238

https://stackoverflow.com/questions/24352634/would-starting-apscheduler-in-a-uwsgi-app-end-up-with-one-scheduler-for-each-wor

本文作者:朝圣

本文链接:www.zh-noone.cn/2020/7/Scheduler

版权声明:本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0许可协议。转载请注明出处!

redis数据结构
0 条评论