scheduler.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.events import EVENT_JOB_REMOVED
  3. from apps.schedule.agent import agent
  4. from apps.schedule.models import Job
  5. from public import config
  6. class Scheduler(object):
  7. def __init__(self):
  8. self.cron_job_args = ['year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second', 'start_date',
  9. 'end_date']
  10. self.scheduler = BackgroundScheduler(timezone=getattr(config, 'TIME_ZONE', 'Asia/Shanghai'))
  11. self.jobs = {}
  12. self.already_init = False
  13. def init(self):
  14. if not self.already_init:
  15. for job in Job.query.filter_by(enabled=True).all():
  16. self.add_job(job)
  17. self.scheduler.start()
  18. self.already_init = True
  19. def __parse_args(self, trigger, trigger_args):
  20. if trigger == 'cron':
  21. args = {k: v for k, v in zip(self.cron_job_args, trigger_args.split(';')) if v}
  22. # 周需要单独处理,0 对应周一,与页面上的说明不一致
  23. day_of_week = int(args['day_of_week']) if args.get('day_of_week') else None
  24. if day_of_week == 0:
  25. args['day_of_week'] = 6
  26. elif day_of_week is not None:
  27. args['day_of_week'] = day_of_week - 1
  28. return args
  29. elif trigger == 'interval':
  30. return {'seconds': int(trigger_args)}
  31. elif trigger == 'date':
  32. return {'run_date': trigger_args}
  33. else:
  34. raise ValueError('未知的调度策略: %r' % trigger)
  35. def add_job(self, job):
  36. job_id = str(job.id)
  37. args = self.__parse_args(job.trigger, job.trigger_args)
  38. instance = self.scheduler.add_job(
  39. agent,
  40. job.trigger,
  41. id=job_id,
  42. args=(job.id, job.command_user, job.command, job.targets),
  43. **args)
  44. self.jobs[job_id] = instance
  45. def valid_job_trigger(self, trigger, trigger_args):
  46. try:
  47. args = self.__parse_args(trigger, trigger_args)
  48. job = self.scheduler.add_job(agent, trigger, args=(None, None, None, None), next_run_time=None, **args)
  49. job.remove()
  50. return True
  51. except ValueError:
  52. return False
  53. def remove_job(self, job_id):
  54. job_id = str(job_id)
  55. if self.scheduler.get_job(job_id):
  56. self.scheduler.remove_job(job_id)
  57. def update_job(self, job):
  58. job_id = str(job.id)
  59. if self.scheduler.get_job(job_id):
  60. args = self.__parse_args(job.trigger, job.trigger_args)
  61. self.scheduler.reschedule_job(job_id, trigger=job.trigger, **args)
  62. elif job.enabled:
  63. self.add_job(job)
  64. # 监听任务移除事件
  65. def listener(event):
  66. if event.code == EVENT_JOB_REMOVED:
  67. scheduler.jobs.pop(event.job_id, None)
  68. scheduler = Scheduler()
  69. scheduler.scheduler.add_listener(listener, EVENT_JOB_REMOVED)
  70. scheduler.init()