agent.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import subprocess
  2. from datetime import datetime
  3. from threading import Thread
  4. from functools import partial
  5. from apps.assets.models import Host
  6. from apps.configuration.models import Environment
  7. from apps.deploy.models import App
  8. from apps.schedule.models import JobHistory
  9. from libs.utils import Container
  10. from libs.ssh import ssh_exec_command
  11. # targets有三种格式:
  12. # localhost 服务端本机执行
  13. # 1 整数类型,主机的id,代表在对应主机上执行
  14. # 1_2_3 用下划线连接的整数,主机id_应用id_环境id,同个三个id组合出某个主机上的某个容器内执行
  15. def agent(job_id, user, command, targets):
  16. threads, host_ids, app_ids, env_ids, info = [], set(), set(), set(), {'hosts': {}, 'apps': {}, 'environments': {}}
  17. write_history_callback = partial(JobHistory.write, job_id, datetime.now())
  18. for target in targets.split(','):
  19. if target == 'local':
  20. threads.append(Thread(target=local_executor, args=(command, write_history_callback)))
  21. elif target.isdigit():
  22. threads.append(Thread(target=host_executor, args=(info, command, target, write_history_callback)))
  23. host_ids.add(target)
  24. else:
  25. threads.append(Thread(target=container_executor, args=(info, user, command, target, write_history_callback)))
  26. cli_id, pro_id, env_id = target.split('_')
  27. host_ids.add(cli_id)
  28. app_ids.add(pro_id)
  29. env_ids.add(env_id)
  30. if app_ids:
  31. for app in App.query.filter(App.id.in_(app_ids)).all():
  32. info['apps'][str(app.id)] = app
  33. if host_ids:
  34. for cli in Host.query.filter(Host.id.in_(host_ids)).all():
  35. info['hosts'][str(cli.id)] = cli
  36. if env_ids:
  37. for env in Environment.query.filter(Environment.id.in_(env_ids)).all():
  38. info['environments'][str(env.id)] = env
  39. for t in threads:
  40. t.start()
  41. for t in threads:
  42. t.join()
  43. def local_executor(command, callback):
  44. task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  45. exit_code = task.wait()
  46. callback('local', exit_code, task.stdout.read(), task.stderr.read())
  47. def host_executor(info, command, target, callback):
  48. cli = info['hosts'][target]
  49. exit_code, stdout, stderr = ssh_exec_command(cli.ssh_ip, cli.ssh_port, command)
  50. callback(target, exit_code, stdout, stderr)
  51. def container_executor(info, user, command, target, callback):
  52. cli_id, pro_id, env_id = target.split('_')
  53. cli = info['hosts'][cli_id]
  54. ctr_name = '%s.%s' % (info['apps'][pro_id].identify, info['environments'][env_id].identify)
  55. ctr = Container(cli.docker_uri, ctr_name)
  56. exit_code, output = ctr.exec_command('sh -c %r' % command, with_exit_code=True, user=user)
  57. callback(target, exit_code, output, '')