ARedisChannel.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. <?php
  2. /**
  3. * Represents a redis pub/sub channel.
  4. *
  5. * Publishing messages:
  6. * <pre>
  7. * $channel = new ARedisChannel("myChannel");
  8. * $channel->publish("hello world"); // sends a message to the channel
  9. * </pre>
  10. *
  11. * Subscribing to channels:
  12. * <pre>
  13. * $channel = new ARedisChannel("myChannel");
  14. * $channel->onReceiveMessage = function($redis, $channel, $message) {
  15. * echo "Message Received:".$message."\n";
  16. * };
  17. * $channel->subscribe(); // blocks, the callback is triggered when a message is received
  18. * </pre>
  19. * @author Charles Pick
  20. * @package packages.redis
  21. */
  22. class ARedisChannel extends ARedisIterableEntity {
  23. /**
  24. * Holds the data in the entity
  25. * @var array
  26. */
  27. protected $_data = array();
  28. /**
  29. * Subscribes to the channel
  30. * @return ARedisIterableChannel $this subscribed to the channel
  31. */
  32. public function subscribe() {
  33. if ($this->name === null) {
  34. throw new CException(get_class($this)." requires a name!");
  35. }
  36. $this->getConnection()->getClient()->subscribe(array($this->name),array($this,"receiveMessage"));
  37. return $this;
  38. }
  39. /**
  40. * Unsubscribes from the channel
  41. * @return ARedisIterableChannel $this unsubscribed from the channel
  42. */
  43. public function unsubscribe() {
  44. if ($this->name === null) {
  45. throw new CException(get_class($this)." requires a name!");
  46. }
  47. $this->getConnection()->getClient()->unsubscribe(array($this->name));
  48. return $this;
  49. }
  50. /**
  51. * Publishes a message to the channel
  52. * @param string $message The message to publish
  53. * @return integer the number of clients that received the message
  54. */
  55. public function publish($message) {
  56. if ($this->name === null) {
  57. throw new CException(get_class($this)." requires a name!");
  58. }
  59. $this->_data[] = $message;
  60. return $this->getConnection()->getClient()->publish($this->name,$message);
  61. }
  62. /**
  63. * Receives a message from a subscribed channel
  64. * @param Redis $redis the redis client instance
  65. * @param string $channel the name of the channel
  66. * @param string $message the message content
  67. */
  68. public function receiveMessage($redis, $channel, $message) {
  69. $this->_data[] = $message;
  70. $event=new CEvent($this);
  71. $this->onReceiveMessage($event);
  72. }
  73. /**
  74. * Gets the last received / sent message
  75. * @return mixed the last message received, or null if no messages have been received yet
  76. */
  77. public function getLastMessage() {
  78. $count = count($this->_data);
  79. if (!$count) {
  80. return null;
  81. }
  82. return $this->_data[$count - 1];
  83. }
  84. /**
  85. * This event is raised after a message is received
  86. * @param CEvent $event the event parameter
  87. */
  88. public function onReceiveMessage($event)
  89. {
  90. $this->raiseEvent('onReceiveMessage',$event);
  91. }
  92. /**
  93. * Gets the number of items in the channel
  94. * @return integer the number of items in the channel
  95. */
  96. public function getCount() {
  97. return count($this->_data);
  98. }
  99. /**
  100. * Gets all the members in the sorted set
  101. * @param boolean $forceRefresh whether to force a refresh or not, IGNORED!
  102. * @return array the members in the set
  103. */
  104. public function getData($forceRefresh = false) {
  105. return $this->_data;
  106. }
  107. /**
  108. * Returns whether there is an item at the specified offset.
  109. * This method is required by the interface ArrayAccess.
  110. * @param integer $offset the offset to check on
  111. * @return boolean
  112. */
  113. public function offsetExists($offset)
  114. {
  115. return isset($this->data[$offset]);
  116. }
  117. /**
  118. * Returns the item at the specified offset.
  119. * This method is required by the interface ArrayAccess.
  120. * @param integer $offset the offset to retrieve item.
  121. * @return mixed the item at the offset
  122. * @throws CException if the offset is invalid
  123. */
  124. public function offsetGet($offset)
  125. {
  126. return $this->_data[$offset];
  127. }
  128. /**
  129. * Sets the item at the specified offset.
  130. * This method is required by the interface ArrayAccess.
  131. * @param integer $offset the offset to set item
  132. * @param mixed $item the item value
  133. */
  134. public function offsetSet($offset,$item)
  135. {
  136. $this->_data[$offset] = $item;
  137. }
  138. /**
  139. * Unsets the item at the specified offset.
  140. * This method is required by the interface ArrayAccess.
  141. * @param integer $offset the offset to unset item
  142. */
  143. public function offsetUnset($offset)
  144. {
  145. unset($this->_data[$offset]);
  146. }
  147. }