Commit: eb19804cbf0c764ff6bbd3fd9655aa91affc5646
Author: Joël Perras | Date: 2009-12-26 14:23:49 -0500
diff --git a/extensions/commands/Queue.php b/extensions/commands/Queue.php
new file mode 100644
index 0000000..40a2cec
--- /dev/null
+++ b/extensions/commands/Queue.php
@@ -0,0 +1,8 @@
+<?php
+
+namespace li3_queue\extensions\commands;
+
+class Queue extends \lithium\console\Command {
+}
+
+?>
\ No newline at end of file
diff --git a/extensions/data/Job.php b/extensions/data/Job.php
new file mode 100644
index 0000000..7ea934e
--- /dev/null
+++ b/extensions/data/Job.php
@@ -0,0 +1,54 @@
+<?php
+
+namespace li3_queue\extensions\data;
+
+use \lithium\core\Object;
+
+class Job extends \lithium\core\Object {
+
+ /**
+ * Unique identifier for the current job
+ *
+ * @var mixed Unique identifier (string, integer, UUID, etc.)
+ */
+ public $id = null;
+
+ /**
+ * Holds the current status of the job
+ *
+ * @var string A status type, e.g. 'ready' or 'reserved'.
+ */
+ public $status = null;
+
+ /**
+ * Job data
+ *
+ * @var mixed Data necessary for job processing
+ */
+ public $data = null;
+
+ /**
+ * Additional properties/information that may be required for the
+ * processing of this job by the requesting entity.
+ *
+ * @var array Optional additional properties
+ */
+ public $properties = array();
+
+ /**
+ * Constructor for a Job
+ *
+ * Allows setting of job properties on instantiation
+ */
+ public function __construct($id, $data, $properties = array()) {
+ $this->id = $id;
+ $this->data = $data;
+
+ if (!empty($properties)) {
+ foreach ($properties as $type => &$property) {
+ $this->{$type} = $property;
+ }
+ }
+ }
+}
+?>
\ No newline at end of file
diff --git a/extensions/data/Queue.php b/extensions/data/Queue.php
new file mode 100644
index 0000000..9bbc884
--- /dev/null
+++ b/extensions/data/Queue.php
@@ -0,0 +1,57 @@
+<?php
+
+namespace li3_queue\extensions\data;
+
+use \lithium\core\Object;
+
+abstract class Queue extends \lithium\core\Object {
+
+ /**
+ * Queue Constructor
+ *
+ * Sets parameters required by the queue adapter being used.
+ *
+ * @param array $config Configuration parameters.
+ * @return void
+ */
+ public function __construct($config = array()) {
+ $defaults = array();
+ parent::__construct($config + $defaults);
+ }
+
+ /**
+ * Connect to the queue service
+ *
+ * @param array $parameters Connection-specific queue parameters.
+ * @return boolean True on successful connection, false otherwise.
+ */
+ abstract public function connect($host, $port, $options = array());
+
+ /**
+ * Adds a job to the queue
+ *
+ * @param object $job Job to be added
+ * @param integer $priority Priority of the job.
+ * @return boolean True on successful enqueue, false otherwise.
+ */
+ abstract public function enqueue($Job, $priority = null);
+
+ /**
+ * Remove a job from the queue
+ *
+ * @param mixed $id Job id to be removed.
+ * @return Boolean true if dequeue was successful, false otherwise.
+ */
+ abstract public function dequeue($id);
+
+ /**
+ * Returns the job identified by $id, but without removing it from
+ * the queue
+ *
+ * @param mixed $id Id of the job to peek at. If no id is set, then
+ * peek at the next ready job.
+ * @return object Job object if it exists, null otherwise.
+ */
+ abstract public function peek($id = null);
+}
+?>
\ No newline at end of file
diff --git a/extensions/data/queue/adapters/Beanstalk.php b/extensions/data/queue/adapters/Beanstalk.php
new file mode 100644
index 0000000..895462d
--- /dev/null
+++ b/extensions/data/queue/adapters/Beanstalk.php
@@ -0,0 +1,167 @@
+<?php
+
+namespace li3_queue\extensions\data\queue\adapters;
+
+use \lithium\util\socket\Stream;
+
+class Beanstalk extends \li3_queue\extensions\data\Queue {
+
+ /**
+ * The stream socket connection
+ *
+ * @var object
+ */
+ protected $_stream = null;
+
+ /**
+ * Default delay time for jobs
+ *
+ * An integer number of seconds to wait before putting the job in
+ * the ready queue. The job will be in the "delayed" state during this time.
+ *
+ * @var integer Delay time in seconds. Defaults to zero.
+ */
+ public $delay = 0;
+
+ /**
+ * Default priority for jobs
+ *
+ * an integer < 2**32. Jobs with smaller priority values will be
+ * scheduled before jobs with larger priorities. The most urgent priority is 0;
+ * the least urgent priority is 4294967295.
+ *
+ * @var integer Priority level. Defaults to 65536.
+ * @see http://github.com/kr/beanstalkd/blob/v1.3/doc/protocol.txt?raw=true
+ */
+ public $priority = 65536;
+
+ /**
+ * Default time to run for jobs
+ *
+ * an integer number of seconds to allow a worker to run this job.
+ * This time is counted from the moment a worker reserves this job.
+ *If the worker does not delete, release, or bury the job within
+ * <ttr> seconds, the job will time out and the server will release the job.
+ * The minimum ttr is 1. If the client sends 0, the server will silently
+ * increase the ttr to 1.
+ *
+ * @var integer Number of seconds to allow the job to complete.
+ * Defaults to 120 seconds (2 minutes).
+ */
+ public $runtime = 5;
+
+ const USING = 'USING';
+
+ const RESERVED = 'RESERVED';
+
+ const RELEASED = 'RELEASED';
+
+ const BURIED = 'BURIED';
+
+ const DELETED = 'DELETED';
+
+ const NOT_FOUND = 'NOT_FOUND';
+
+ const INSERTED = 'INSERTED';
+
+ const TIMED_OUT = 'TIMED_OUT';
+
+ const JOB_TOO_BIG = 'JOB_TOO_BIG';
+
+ /**
+ * Connect to the queue and store the resource handle
+ *
+ * @param string $host FQDN or IP of host running beanstalkd
+ * @param integer $port Port on which beanstalkd is listening
+ * @param array $options Beanstalk specific connection options
+ * @todo Lazy-open stream connection
+ */
+ public function connect($host, $port, $options = array()) {
+ $options += array(
+ 'persistent' => true,
+ 'host' => $host,
+ 'port' => $port
+ );
+ $stream = new Stream($options);
+ $resource = $stream->open();
+
+ if (!is_resource($resource)) {
+ return false;
+ }
+ $this->_stream = $resource;
+ return true;
+ }
+
+ /**
+ * Close the stream connection
+ *
+ * @return boolean True on successful close, false if an error occurs, and
+ * null if no stream connection exists.
+ */
+ public function close() {
+ if (!is_resource($this->_stream)) {
+ return false;
+ }
+ return $this->_stream->close();
+ }
+
+ /**
+ * Adds a job to the queue
+ *
+ * @param object $Job Job to be added
+ * @param array $options Optional parameters for
+ * @return boolean True on successful enqueue, false otherwise
+ */
+ public function enqueue($Job, $options = array()) {
+ $options += array(
+ 'delay' => $this->delay,
+ 'priority' => $this->priority,
+ 'ttr' => $this->runtime
+ );
+ }
+
+ /**
+ * Remove a job from the queue
+ *
+ * @param mixed $id Job id to be removed.
+ * @return Boolean true if dequeue was successful, false otherwise.
+ */
+ public function dequeue($id) {
+
+ }
+
+ /**
+ * Obtain the stream resource for the active connection
+ *
+ * @return object Stream resource if it exists, null otherwise
+ */
+ public function resource() {
+ return $_stream;
+ }
+
+ /**
+ * Returns the job identified by $id, but without removing it from
+ * the queue
+ *
+ * @param mixed $id Id of the job to peek at. If no id is set, then
+ * peek at the next ready job.
+ * @return object Job object if it exists, null otherwise.
+ */
+ public function peek($id = null) {
+ if (is_null($id)) {
+ //return $this->_stream->send($this->_wrap('peek-ready'));
+ }
+ }
+
+ /**
+ * Prepares commands and data to be sent to the beanstalkd service by first
+ * terminating them with \r\n EOL markers.
+ *
+ * @param string $data Data to be EOL terminated.
+ * @return EOL terminated string
+ */
+ protected function _wrap($data) {
+ return $data . "\r\n";
+ }
+}
+?>
\ No newline at end of file