job.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from flask import Blueprint, request, abort
  2. from libs.tools import json_response, JsonParser, Argument, human_diff_time
  3. from apps.schedule.scheduler import scheduler
  4. from apps.schedule.models import Job
  5. from datetime import datetime
  6. from public import db
  7. from libs.decorators import require_permission
  8. blueprint = Blueprint(__name__, __name__)
  9. @blueprint.route('/', methods=['GET'])
  10. @require_permission('job_task_view')
  11. def get():
  12. form, error = JsonParser(
  13. Argument('page', type=int, default=1, required=False),
  14. Argument('pagesize', type=int, default=10, required=False),
  15. Argument('job_group', type=str, required=False),).parse(request.args)
  16. if error is None:
  17. if form.job_group:
  18. job = Job.query.filter_by(group=form.job_group).order_by(Job.enabled.desc())
  19. else:
  20. job = Job.query.order_by(Job.enabled.desc())
  21. total = job.count()
  22. job_data = job.limit(form.pagesize).offset((form.page - 1) * form.pagesize).all()
  23. jobs = [x.to_json() for x in job_data]
  24. now = datetime.now()
  25. for job in jobs:
  26. if not job['enabled']:
  27. job['next_run_time'] = '未启用'
  28. elif str(job['id']) in scheduler.jobs:
  29. next_run_time = scheduler.jobs[str(job['id'])].next_run_time
  30. if next_run_time is None:
  31. job['next_run_time'] = '已过期'
  32. else:
  33. job['next_run_time'] = human_diff_time(next_run_time.replace(tzinfo=None), now)
  34. elif job['trigger'] == 'date' and now > datetime.strptime(job['trigger_args'], '%Y-%m-%d %H:%M:%S'):
  35. job['next_run_time'] = '已过期'
  36. else:
  37. job['next_run_time'] = '异常'
  38. return json_response({'data': jobs, 'total': total})
  39. return json_response(message=error)
  40. @blueprint.route('/', methods=['POST'])
  41. @require_permission('job_task_add')
  42. def post():
  43. form, error = JsonParser(
  44. 'name', 'group', 'desc', 'command_user', 'command', 'targets',
  45. Argument('command_user', default='root')
  46. ).parse()
  47. if error is None:
  48. Job(**form).save()
  49. return json_response(message=error)
  50. @blueprint.route('/<int:job_id>', methods=['PUT'])
  51. @require_permission('job_task_edit')
  52. def put(job_id):
  53. form, error = JsonParser(
  54. 'name', 'group', 'desc', 'command', 'targets',
  55. Argument('command_user', default='root')
  56. ).parse()
  57. if error is None:
  58. job = Job.query.get_or_404(job_id)
  59. job.update(**form)
  60. return json_response(message=error)
  61. @blueprint.route('/<int:job_id>/trigger', methods=['POST'])
  62. @require_permission('job_task_add | job_task_edit')
  63. def set_trigger(job_id):
  64. form, error = JsonParser(
  65. Argument('trigger', filter=lambda x: x in ['cron', 'date', 'interval'], help='错误的调度策略!'),
  66. Argument('trigger_args')
  67. ).parse()
  68. if error is None:
  69. if not scheduler.valid_job_trigger(form.trigger, form.trigger_args):
  70. return json_response(message='数据格式校验失败!')
  71. job = Job.query.get_or_404(job_id)
  72. if job.update(**form):
  73. scheduler.update_job(job)
  74. return json_response(message=error)
  75. @blueprint.route('/<int:job_id>/switch', methods=['POST', 'DELETE'])
  76. @require_permission('job_task_edit')
  77. def switch(job_id):
  78. job = Job.query.get_or_404(job_id)
  79. if request.method == 'POST':
  80. job.update(enabled=True)
  81. scheduler.add_job(job)
  82. elif request.method == 'DELETE':
  83. job.update(enabled=False)
  84. scheduler.remove_job(job.id)
  85. else:
  86. abort(405)
  87. return json_response()
  88. @blueprint.route('/<int:job_id>', methods=['DELETE'])
  89. @require_permission('job_task_del')
  90. def delete(job_id):
  91. job = Job.query.get_or_404(job_id)
  92. job.delete()
  93. scheduler.remove_job(job.id)
  94. return json_response()
  95. @blueprint.route('/groups/', methods=['GET'])
  96. @require_permission('job_task_view')
  97. def fetch_groups():
  98. apps = db.session.query(Job.group.distinct().label('group')).all()
  99. return json_response([x.group for x in apps])