publish.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. from public import app
  2. from flask import Blueprint, g
  3. from apps.deploy.models import App, History
  4. from apps.assets.models import Host
  5. from apps.configuration.models import Environment
  6. from libs.tools import human_time, json_response, JsonParser, Argument
  7. from libs.decorators import with_app_context
  8. from libs.decorators import require_permission
  9. from libs.tools import QueuePool
  10. from apps.deploy.utils import *
  11. from threading import Thread
  12. from io import BytesIO
  13. from functools import partial
  14. import tarfile
  15. import uuid
  16. import time
  17. import os
  18. from apps.system.models import NotifyWay
  19. from libs.utils import send_ding_msg
  20. blueprint = Blueprint(__name__, __name__)
  21. @blueprint.route('/history/<int:app_id>', methods=['GET'])
  22. @require_permission('publish_app_publish_deploy')
  23. def history(app_id):
  24. histories = History.query.filter(
  25. History.app_id == app_id,
  26. History.deploy_message != '').order_by(History.created.desc()).limit(20).all()
  27. return json_response(histories)
  28. @blueprint.route('/update', methods=['POST'])
  29. @require_permission('publish_app_publish_deploy')
  30. def app_update():
  31. form, error = JsonParser(
  32. Argument('app_id', type=int),
  33. Argument('env_id', type=int),
  34. Argument('deploy_message', default=''),
  35. Argument('deploy_restart', type=bool),
  36. Argument('host_ids', type=list)
  37. ).parse()
  38. if error is None:
  39. if not g.user.check_deploy_permission(form.env_id, form.app_id):
  40. return json_response(message='Permission denied'), 403
  41. token = uuid.uuid4().hex
  42. q = QueuePool.make_queue(token, len(form.host_ids))
  43. for host_id in form.pop('host_ids'):
  44. Thread(target=do_update, args=(q, form, host_id)).start()
  45. return json_response(token)
  46. return json_response(message=error)
  47. @with_app_context
  48. def do_update(q, form, host_id):
  49. ctr, api_token, deploy_success = None, None, False
  50. history = History(host_id=host_id, created=human_time(), deploy_success=deploy_success, **form).save()
  51. send_message = partial(PublishMessage.make_message, q, host_id)
  52. try:
  53. send_message('正在检测环境 . . . ')
  54. pro = App.query.get_or_404(form.app_id)
  55. env = Environment.query.get_or_404(form.env_id)
  56. cli = Host.query.get_or_404(host_id)
  57. hooks = {x.name: x.command for x in DeployMenu.query.filter_by(app_id=form.app_id)}
  58. ctr = Container(cli.docker_uri, pro.identify + '.' + env.identify)
  59. image = app.config['DOCKER_REGISTRY_SERVER'] + '/' + pro.image.name
  60. image_tag = pro.image.latest
  61. ctr_info = ctr.info
  62. if ctr.api_version < '1.21':
  63. send_message('环境检测失败,docker版本过低,请升级至1.9.x以上!', level='error')
  64. return
  65. else:
  66. send_message('环境检测完成!', update=True)
  67. # 当容器不存在或镜像有更新时,需要获取新镜像并使用新镜像重新创建容器
  68. if ctr_info is None or ctr_info.Image != image + ':' + image_tag:
  69. send_message('正在更新镜像,版本{0} . . . '.format(image_tag))
  70. ctr.pull_image(image, image_tag)
  71. send_message('镜像更新完成!', update=True)
  72. if ctr_info:
  73. send_message('正在删除原有容器 . . .')
  74. ctr.remove()
  75. send_message('删除原有容器成功!', update=True)
  76. send_message('正在创建新容器 . . . ')
  77. api_token = create_container(ctr, pro, env, image='{0}:{1}'.format(image, image_tag))
  78. history.update(api_token=api_token)
  79. send_message('创建新容器成功!', update=True)
  80. # 复制文件
  81. send_message('正在初始化容器 . . . ')
  82. tar_buffer = BytesIO()
  83. with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
  84. add_file_to_tar(tar, os.path.join(app.config['BASE_DIR'], 'libs', 'scripts', 'entrypoint.sh'))
  85. # add_file_to_tar(tar, os.path.join(app.config['BASE_DIR'], 'libs', 'scripts', 'proxy_execute.sh'))
  86. ctr.put_archive('/', tar_buffer.getvalue())
  87. send_message('初始化容器成功!', update=True)
  88. # 启动容器
  89. send_message('正在启动新容器 . . . ')
  90. ctr.start()
  91. send_message('启动新容器成功!', update=True)
  92. # 执行init钩子
  93. send_message('正在执行应用初始化 . . .')
  94. exec_code, exec_output = ctr.exec_command_with_base64(hooks['容器创建'], timeout=120, with_exit_code=True)
  95. if exec_code != 0:
  96. send_message('执行应用初始化失败,退出状态码:{0}'.format(exec_code), level='error')
  97. send_message(exec_output, level='console')
  98. return
  99. else:
  100. send_message('执行应用初始化成功!', update=True)
  101. # 清理无用镜像
  102. if ctr.client.api_version >= "1.25":
  103. ctr.prune_images()
  104. # 当前容器如果为退出状态,则启动容器
  105. elif not ctr_info.running:
  106. send_message('容器当前为停止状态,正在启动容器 . . . ')
  107. ctr.start()
  108. send_message('启动容器成功!', update=True)
  109. # 执行发布操作
  110. send_message('正在执行应用更新 . . . ')
  111. send_publish_message(pro.notify_way_id, pro.name + ' 开始更新 . . .')
  112. exec_code, exec_output = ctr.exec_command_with_base64(hooks['应用发布'], form.deploy_message, timeout=120,
  113. with_exit_code=True)
  114. if exec_code != 0:
  115. send_message('执行应用更新失败,退出状态码:{0}'.format(exec_code), level='error')
  116. send_publish_message(pro.notify_way_id, pro.name + ' 发布失败!')
  117. send_message(exec_output, level='console')
  118. return
  119. else:
  120. send_message('执行应用更新成功!', update=True)
  121. # 根据选择执行重启容器操作
  122. if form.deploy_restart:
  123. send_message('正在重启容器 . . . ')
  124. ctr.restart(timeout=3)
  125. send_message('重启容器成功!', update=True)
  126. # 整个流程正常结束
  127. send_publish_message(pro.notify_way_id, pro.name + ' 发布成功')
  128. send_message('完成发布!', level='success')
  129. deploy_success = True
  130. except Exception as e:
  131. send_message('%s' % e, level='error')
  132. raise e
  133. finally:
  134. q.done()
  135. if deploy_success:
  136. history.update(deploy_success=True)
  137. class PublishMessage(object):
  138. start_time = time.time()
  139. @classmethod
  140. def make_message(cls, q, host_id, message, level='info', update=False):
  141. data = {
  142. 'hid': host_id,
  143. 'msg': message,
  144. 'level': level,
  145. 'update': update
  146. }
  147. if update:
  148. duration = time.time() - cls.start_time
  149. cls.start_time = time.time()
  150. data['duration'] = duration
  151. q.put(data)
  152. def send_publish_message(notify_way_id, message):
  153. if notify_way_id:
  154. notice_value = NotifyWay.query.filter_by(id=notify_way_id).first()
  155. send_ding_msg(token=notice_value.value, contacts=[], msg=message)