Celery.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. <?php
  2. /**
  3. * This file contains a PHP client to Celery distributed task queue
  4. *
  5. * LICENSE: 2-clause BSD
  6. *
  7. * Copyright (c) 2012, GDR!
  8. * All rights reserved.
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * 1. Redistributions of source code must retain the above copyright notice, this
  14. * list of conditions and the following disclaimer.
  15. * 2. Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. *
  19. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  20. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  21. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  22. * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  23. * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  24. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  25. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  26. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  28. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. *
  30. * The views and conclusions contained in the software and documentation are those
  31. * of the authors and should not be interpreted as representing official policies,
  32. * either expressed or implied, of the FreeBSD Project.
  33. *
  34. * @link http://massivescale.net/
  35. * @link http://gdr.geekhood.net/
  36. * @link https://github.com/gjedeer/celery-php
  37. *
  38. * @package celery-php
  39. * @license http://opensource.org/licenses/bsd-license.php 2-clause BSD
  40. * @author GDR! <gdr@go2.pl>
  41. */
  42. /**
  43. * General exception class
  44. * @package celery-php
  45. */
  46. class CeleryException extends Exception {};
  47. /**
  48. * Emited by AsyncResult::get() on timeout
  49. * @package celery-php
  50. */
  51. class CeleryTimeoutException extends CeleryException {};
  52. /**
  53. * Client for a Celery server
  54. * @package celery-php
  55. */
  56. class Celery
  57. {
  58. private $connection = null; // AMQPConnection object
  59. private $connection_details = array(); // array of strings required to connect
  60. function __construct($config)
  61. {
  62. if(!class_exists('AMQPConnection'))
  63. {
  64. throw new CeleryException("Class AMQPConnection not found\nMake sure that AMQP extension is installed and enabled:\nhttp://www.php.net/manual/en/amqp.installation.php");
  65. }
  66. $this->connection_details = $config;
  67. $this->connection = Celery::InitializeAMQPConnection($this->connection_details);
  68. //$success = $this->connection->connect();
  69. }
  70. static function InitializeAMQPConnection($details)
  71. {
  72. $connection = new AMQPConnection();
  73. $connection->setHost($details['host']);
  74. $connection->setLogin($details['login']);
  75. $connection->setPassword($details['password']);
  76. $connection->setVhost($details['vhost']);
  77. $connection->setPort($details['port']);
  78. return $connection;
  79. }
  80. /**
  81. * Post a task to Celery
  82. * @param string $task Name of the task, prefixed with module name (like tasks.add for function add() in task.py)
  83. * @param array $args Array of arguments (kwargs call when $args is associative)
  84. * @return AsyncResult
  85. */
  86. function PostTask($task, $args)
  87. {
  88. $this->connection->connect();
  89. if(!is_array($args))
  90. {
  91. throw new CeleryException("Args should be an array");
  92. }
  93. $id = uniqid('php_', TRUE);
  94. $ch = new AMQPChannel($this->connection);
  95. $xchg = new AMQPExchange($ch);
  96. $xchg->setName($this->connection_details['exchange']);
  97. /* $args is numeric -> positional args */
  98. if(array_keys($args) === range(0, count($args) - 1))
  99. {
  100. $kwargs = array();
  101. }
  102. /* $args is associative -> contains kwargs */
  103. else
  104. {
  105. $kwargs = $args;
  106. $args = array();
  107. }
  108. $task_array = array(
  109. 'id' => $id,
  110. 'task' => $task,
  111. 'args' => $args,
  112. 'kwargs' => (object)$kwargs,
  113. );
  114. $task = json_encode($task_array);
  115. $params = array('content_type' => 'application/json',
  116. 'content_encoding' => 'UTF-8',
  117. 'immediate' => false,
  118. );
  119. $success = $xchg->publish($task, $this->connection_details['routing_key'], AMQP_NOPARAM, $params);
  120. $this->connection->disconnect();
  121. return new AsyncResult($id, $this->connection_details, $task_array['task'], $args);
  122. //$result->isReady();
  123. }
  124. }
  125. /*
  126. * Asynchronous result of Celery task
  127. * @package celery-php
  128. */
  129. class AsyncResult
  130. {
  131. private $task_id; // string, queue name
  132. private $connection; // AMQPConnection instance
  133. private $connection_details; // array of strings required to connect
  134. private $complete_result; // AMQPEnvelope instance
  135. private $body; // decoded array with message body (whatever Celery task returned)
  136. /**
  137. * Don't instantiate AsyncResult yourself, used internally only
  138. * @param string $id Task ID in Celery
  139. * @param array $connection_details used to initialize AMQPConnection, keys are the same as args to Celery::__construct
  140. * @param string task_name
  141. * @param array task_args
  142. */
  143. function __construct($id, $connection_details, $task_name=NULL, $task_args=NULL)
  144. {
  145. $this->task_id = $id;
  146. $this->connection = Celery::InitializeAMQPConnection($connection_details);
  147. $this->connection_details = $connection_details;
  148. $this->task_name = $task_name;
  149. $this->task_args = $task_args;
  150. }
  151. function __wakeup()
  152. {
  153. if($this->connection_details)
  154. {
  155. $this->connection = Celery::InitializeAMQPConnection($this->connection_details);
  156. }
  157. }
  158. /**
  159. * Connect to queue, see if there's a result waiting for us
  160. * Private - to be used internally
  161. */
  162. private function getCompleteResult()
  163. {
  164. if($this->complete_result)
  165. {
  166. return $this->complete_result;
  167. }
  168. $this->connection->connect();
  169. $ch = new AMQPChannel($this->connection);
  170. $q = new AMQPQueue($ch);
  171. $q->setName($this->connection_details['queue']);
  172. $q->setFlags(AMQP_AUTODELETE);
  173. # $q->setArgument('x-expires', 86400000);
  174. //$q->declare();
  175. try
  176. {
  177. $q->bind($this->connection_details['exchange'], $this->connection_details['routing_key']);
  178. }
  179. catch(AMQPQueueException $e)
  180. {
  181. // var_dump($e);
  182. //$q->delete();
  183. $this->connection->disconnect();
  184. return false;
  185. }
  186. $message = $q->get(AMQP_AUTOACK);
  187. // var_dump($message);
  188. if(!$message)
  189. {
  190. $q->delete();
  191. $this->connection->disconnect();
  192. return false;
  193. }
  194. $this->complete_result = $message;
  195. if($message->getContentType() != 'application/json')
  196. {
  197. $q->delete();
  198. $this->connection->disconnect();
  199. throw new CeleryException('Response was not encoded using JSON - found ' .
  200. $message->getContentType().
  201. ' - check your CELERY_RESULT_SERIALIZER setting!');
  202. }
  203. $this->body = json_decode($message->getBody());
  204. $q->delete();
  205. $this->connection->disconnect();
  206. return false;
  207. }
  208. /**
  209. * Get the Task Id
  210. * @return string
  211. */
  212. function getId()
  213. {
  214. return $this->task_id;
  215. }
  216. /**
  217. * Check if a task result is ready
  218. * @return bool
  219. */
  220. function isReady()
  221. {
  222. return ($this->getCompleteResult() !== false);
  223. }
  224. /**
  225. * Return task status (needs to be called after isReady() returned true)
  226. * @return string 'SUCCESS', 'FAILURE' etc - see Celery source
  227. */
  228. function getStatus()
  229. {
  230. if(!$this->body)
  231. {
  232. throw new CeleryException('Called getStatus before task was ready');
  233. }
  234. return $this->body->status;
  235. }
  236. /**
  237. * Check if task execution has been successful or resulted in an error
  238. * @return bool
  239. */
  240. function isSuccess()
  241. {
  242. return($this->getStatus() == 'SUCCESS');
  243. }
  244. /**
  245. * If task execution wasn't successful, return a Python traceback
  246. * @return string
  247. */
  248. function getTraceback()
  249. {
  250. if(!$this->body)
  251. {
  252. throw new CeleryException('Called getTraceback before task was ready');
  253. }
  254. return $this->body->traceback;
  255. }
  256. /**
  257. * Return a result of successful execution.
  258. * In case of failure, this returns an exception object
  259. * @return mixed Whatever the task returned
  260. */
  261. function getResult()
  262. {
  263. if(!$this->body)
  264. {
  265. throw new CeleryException('Called getResult before task was ready');
  266. }
  267. return $this->body->result;
  268. }
  269. /****************************************************************************
  270. * Python API emulation *
  271. * http://ask.github.com/celery/reference/celery.result.html *
  272. ****************************************************************************/
  273. /**
  274. * Returns TRUE if the task failed
  275. */
  276. function failed()
  277. {
  278. return $this->isReady() && !$this->isSuccess();
  279. }
  280. /**
  281. * Forget about (and possibly remove the result of) this task
  282. * Currently does nothing in PHP client
  283. */
  284. function forget()
  285. {
  286. }
  287. /**
  288. * Wait until task is ready, and return its result.
  289. * @param float $timeout How long to wait, in seconds, before the operation times out
  290. * @param bool $propagate (TODO - not working) Re-raise exception if the task failed.
  291. * @param float $interval Time to wait (in seconds) before retrying to retrieve the result
  292. * @throws CeleryTimeoutException on timeout
  293. * @return mixed result on both success and failure
  294. */
  295. function get($timeout=10, $propagate=TRUE, $interval=0.5)
  296. {
  297. $interval_us = (int)($interval * 1000000);
  298. $iteration_limit = (int)($timeout / $interval);
  299. for($i = 0; $i < $iteration_limit; $i++)
  300. {
  301. if($this->isReady())
  302. {
  303. break;
  304. }
  305. usleep($interval_us);
  306. }
  307. if(!$this->isReady())
  308. {
  309. throw new CeleryTimeoutException(sprintf('AMQP task %s(%s) did not return after 10 seconds', $this->task_name, json_encode($this->task_args)), 4);
  310. }
  311. return $this->getResult();
  312. }
  313. /**
  314. * Implementation of Python's properties: result, state/status
  315. */
  316. public function __get($property)
  317. {
  318. /**
  319. * When the task has been executed, this contains the return value.
  320. * If the task raised an exception, this will be the exception instance.
  321. */
  322. if($property == 'result')
  323. {
  324. if($this->isReady())
  325. {
  326. return $this->getResult();
  327. }
  328. else
  329. {
  330. return NULL;
  331. }
  332. }
  333. /**
  334. * state: The tasks current state.
  335. *
  336. * Possible values includes:
  337. *
  338. * PENDING
  339. * The task is waiting for execution.
  340. *
  341. * STARTED
  342. * The task has been started.
  343. *
  344. * RETRY
  345. * The task is to be retried, possibly because of failure.
  346. *
  347. * FAILURE
  348. * The task raised an exception, or has exceeded the retry limit. The result attribute then contains the exception raised by the task.
  349. *
  350. * SUCCESS
  351. * The task executed successfully. The result attribute then contains the tasks return value.
  352. *
  353. * status: Deprecated alias of state.
  354. */
  355. elseif($property == 'state' || $property == 'status')
  356. {
  357. if($this->isReady())
  358. {
  359. return $this->getStatus();
  360. }
  361. else
  362. {
  363. return 'PENDING';
  364. }
  365. }
  366. return $this->$property;
  367. }
  368. /**
  369. * Returns True if the task has been executed.
  370. * If the task is still running, pending, or is waiting for retry then False is returned.
  371. */
  372. function ready()
  373. {
  374. return $this->isReady();
  375. }
  376. /**
  377. * Send revoke signal to all workers
  378. * Does nothing in PHP client
  379. */
  380. function revoke()
  381. {
  382. }
  383. /**
  384. * Returns True if the task executed successfully.
  385. */
  386. function successful()
  387. {
  388. return $this->isSuccess();
  389. }
  390. /**
  391. * Deprecated alias to get()
  392. */
  393. function wait($timeout=10, $propagate=TRUE, $interval=0.5)
  394. {
  395. return $this->get($timeout, $propagate, $interval);
  396. }
  397. }