1: <?php
2: /**
3: * Copyright 2012-2014 Rackspace US, Inc.
4: *
5: * Licensed under the Apache License, Version 2.0 (the "License");
6: * you may not use this file except in compliance with the License.
7: * You may obtain a copy of the License at
8: *
9: * http://www.apache.org/licenses/LICENSE-2.0
10: *
11: * Unless required by applicable law or agreed to in writing, software
12: * distributed under the License is distributed on an "AS IS" BASIS,
13: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14: * See the License for the specific language governing permissions and
15: * limitations under the License.
16: */
17:
18: namespace OpenCloud\Queues;
19:
20: use Guzzle\Common\Event;
21: use Guzzle\Http\Exception\BadResponseException;
22: use OpenCloud\Common\Exceptions\InvalidArgumentError;
23: use OpenCloud\Common\Service\CatalogService;
24: use Symfony\Component\EventDispatcher\EventSubscriberInterface;
25:
26: /**
27: * Cloud Queues is an open source, scalable, and highly available message and
28: * notifications service. Users of this service can create and manage a
29: * "producer-consumer" or a "publisher-subscriber" model from one simple API. It
30: * is made up of a few basic components: queues, messages, claims, and stats.
31: *
32: * In the producer-consumer model, users create queues where producers
33: * can post messages. Workers, or consumers, may then claim those messages and
34: * delete them once complete. A single claim may contain multiple messages, and
35: * administrators are given the ability to query claims for status.
36: *
37: * In the publisher-subscriber model, messages are posted to a queue like above,
38: * but messages are never claimed. Instead, subscribers, or watchers, simply
39: * send GET requests to pull all messages since their last request. In this
40: * model, a message will remain in the queue, unclaimed, until the message's
41: * time to live (TTL) has expired.
42: *
43: * Here is an overview of the Cloud Queues workflow:
44: *
45: * 1. You create a queue to which producers post messages.
46: *
47: * 2. Producers post messages to the queue.
48: *
49: * 3. Workers claim messages from the queue, complete the work in that message,
50: * and delete the message.
51: *
52: * 4. If a worker plans to be offline before its message completes, the worker
53: * can retire the claim TTL, putting the message back into the queue for
54: * another worker to claim.
55: *
56: * 5. Subscribers monitor the claims of these queues to keep track of activity
57: * and help troubleshoot if things go wrong.
58: *
59: * For the majority of use cases, Cloud Queues itself will not be responsible
60: * for the ordering of messages. If there is only a single producer, however,
61: * Cloud Queueing guarantees that messages are handled in a First In, First Out
62: * (FIFO) order.
63: */
64: class Service extends CatalogService implements EventSubscriberInterface
65: {
66: const DEFAULT_TYPE = 'rax:queues';
67: const DEFAULT_NAME = 'cloudQueues';
68:
69: public static function getSubscribedEvents()
70: {
71: return array(
72: 'request.before_send' => 'appendClientIdToRequest'
73: );
74: }
75:
76: /**
77: * Set the Client-ID header to all requests for this service.
78: *
79: * @param Event $event
80: */
81: public function appendClientIdToRequest(Event $event)
82: {
83: $event['request']->setHeader('Client-ID', $this->getClientId());
84: }
85:
86: /**
87: * An arbitrary string used to differentiate your worker/subscriber. This is
88: * needed, for example, when you return back a list of messages and want to
89: * know the ones your worker is processing.
90: *
91: * @var string
92: */
93: private $clientId;
94:
95: /**
96: * @param null $clientId
97: * @return $this
98: */
99: public function setClientId($clientId = null)
100: {
101: if (!$clientId) {
102: $clientId = self::generateUuid();
103: }
104: $this->clientId = $clientId;
105:
106: return $this;
107: }
108:
109: /**
110: * @return string
111: */
112: public function getClientId()
113: {
114: return $this->clientId;
115: }
116:
117: /**
118: * Create a new Queue.
119: *
120: * @param $name Name of the new queue
121: * @return Queue
122: */
123: public function createQueue($name)
124: {
125: if (!is_string($name)) {
126: throw new InvalidArgumentError(
127: 'The only parameter required to create a Queue is a string name. Metadata can be set with '
128: . 'Queue::setMetadata and Queue::saveMetadata'
129: );
130: }
131:
132: $queue = $this->getQueue();
133: $queue->setName($name);
134:
135: // send the request
136: $this->getClient()->put($queue->getUrl())->send();
137:
138: return $queue;
139: }
140:
141: /**
142: * This operation lists queues for the project, sorting the queues
143: * alphabetically by name.
144: *
145: * @param array $params An associative array of optional parameters:
146: *
147: * - marker (string) Specifies the name of the last queue received in a
148: * previous request, or none to get the first page of
149: * results. Optional.
150: *
151: * - limit (integer) Specifies up to 20 (the default, but configurable)
152: * queues to return. Optional.
153: *
154: * - detailed (bool) Determines whether queue metadata is included in the
155: * response. Optional.
156: *
157: * @return Collection
158: */
159: public function listQueues(array $params = array())
160: {
161: return $this->resourceList('Queue', $this->getUrl('queues', $params));
162: }
163:
164: /**
165: * Return an empty Queue.md object.
166: *
167: * @return Queue
168: */
169: public function getQueue($id = null)
170: {
171: return $this->resource('Queue', $id);
172: }
173:
174: /**
175: * This operation checks to see if the specified queue exists.
176: *
177: * @param string $name The queue name that you want to check
178: * @return bool
179: */
180: public function hasQueue($name)
181: {
182: if (!$name || !is_string($name)) {
183: throw new InvalidArgumentError(sprintf(
184: 'You must provide a queue name as a valid string. You provided: %s',
185: print_r($name, true)
186: ));
187: }
188:
189: try {
190: $url = $this->getUrl();
191: $url->addPath('queues')->addPath($name);
192:
193: $this->getClient()->head($url)->send();
194:
195: return true;
196: } catch (BadResponseException $e) {
197: return false;
198: }
199: }
200: }
201: