Commit: eb19804cbf0c764ff6bbd3fd9655aa91affc5646

Author: Joël Perras | Date: 2009-12-26 14:23:49 -0500
Basic skeleton of abstract Queue, and the start of a concrete Beanstalk implementation.
diff --git a/extensions/commands/Queue.php b/extensions/commands/Queue.php new file mode 100644 index 0000000..40a2cec --- /dev/null +++ b/extensions/commands/Queue.php @@ -0,0 +1,8 @@ +<?php + +namespace li3_queue\extensions\commands; + +class Queue extends \lithium\console\Command { +} + +?> \ No newline at end of file diff --git a/extensions/data/Job.php b/extensions/data/Job.php new file mode 100644 index 0000000..7ea934e --- /dev/null +++ b/extensions/data/Job.php @@ -0,0 +1,54 @@ +<?php + +namespace li3_queue\extensions\data; + +use \lithium\core\Object; + +class Job extends \lithium\core\Object { + + /** + * Unique identifier for the current job + * + * @var mixed Unique identifier (string, integer, UUID, etc.) + */ + public $id = null; + + /** + * Holds the current status of the job + * + * @var string A status type, e.g. 'ready' or 'reserved'. + */ + public $status = null; + + /** + * Job data + * + * @var mixed Data necessary for job processing + */ + public $data = null; + + /** + * Additional properties/information that may be required for the + * processing of this job by the requesting entity. + * + * @var array Optional additional properties + */ + public $properties = array(); + + /** + * Constructor for a Job + * + * Allows setting of job properties on instantiation + */ + public function __construct($id, $data, $properties = array()) { + $this->id = $id; + $this->data = $data; + + if (!empty($properties)) { + foreach ($properties as $type => &$property) { + $this->{$type} = $property; + } + } + } +} +?> \ No newline at end of file diff --git a/extensions/data/Queue.php b/extensions/data/Queue.php new file mode 100644 index 0000000..9bbc884 --- /dev/null +++ b/extensions/data/Queue.php @@ -0,0 +1,57 @@ +<?php + +namespace li3_queue\extensions\data; + +use \lithium\core\Object; + +abstract class Queue extends \lithium\core\Object { + + /** + * Queue Constructor + * + * Sets parameters required by the queue adapter being used. + * + * @param array $config Configuration parameters. + * @return void + */ + public function __construct($config = array()) { + $defaults = array(); + parent::__construct($config + $defaults); + } + + /** + * Connect to the queue service + * + * @param array $parameters Connection-specific queue parameters. + * @return boolean True on successful connection, false otherwise. + */ + abstract public function connect($host, $port, $options = array()); + + /** + * Adds a job to the queue + * + * @param object $job Job to be added + * @param integer $priority Priority of the job. + * @return boolean True on successful enqueue, false otherwise. + */ + abstract public function enqueue($Job, $priority = null); + + /** + * Remove a job from the queue + * + * @param mixed $id Job id to be removed. + * @return Boolean true if dequeue was successful, false otherwise. + */ + abstract public function dequeue($id); + + /** + * Returns the job identified by $id, but without removing it from + * the queue + * + * @param mixed $id Id of the job to peek at. If no id is set, then + * peek at the next ready job. + * @return object Job object if it exists, null otherwise. + */ + abstract public function peek($id = null); +} +?> \ No newline at end of file diff --git a/extensions/data/queue/adapters/Beanstalk.php b/extensions/data/queue/adapters/Beanstalk.php new file mode 100644 index 0000000..895462d --- /dev/null +++ b/extensions/data/queue/adapters/Beanstalk.php @@ -0,0 +1,167 @@ +<?php + +namespace li3_queue\extensions\data\queue\adapters; + +use \lithium\util\socket\Stream; + +class Beanstalk extends \li3_queue\extensions\data\Queue { + + /** + * The stream socket connection + * + * @var object + */ + protected $_stream = null; + + /** + * Default delay time for jobs + * + * An integer number of seconds to wait before putting the job in + * the ready queue. The job will be in the "delayed" state during this time. + * + * @var integer Delay time in seconds. Defaults to zero. + */ + public $delay = 0; + + /** + * Default priority for jobs + * + * an integer < 2**32. Jobs with smaller priority values will be + * scheduled before jobs with larger priorities. The most urgent priority is 0; + * the least urgent priority is 4294967295. + * + * @var integer Priority level. Defaults to 65536. + * @see http://github.com/kr/beanstalkd/blob/v1.3/doc/protocol.txt?raw=true + */ + public $priority = 65536; + + /** + * Default time to run for jobs + * + * an integer number of seconds to allow a worker to run this job. + * This time is counted from the moment a worker reserves this job. + *If the worker does not delete, release, or bury the job within + * <ttr> seconds, the job will time out and the server will release the job. + * The minimum ttr is 1. If the client sends 0, the server will silently + * increase the ttr to 1. + * + * @var integer Number of seconds to allow the job to complete. + * Defaults to 120 seconds (2 minutes). + */ + public $runtime = 5; + + const USING = 'USING'; + + const RESERVED = 'RESERVED'; + + const RELEASED = 'RELEASED'; + + const BURIED = 'BURIED'; + + const DELETED = 'DELETED'; + + const NOT_FOUND = 'NOT_FOUND'; + + const INSERTED = 'INSERTED'; + + const TIMED_OUT = 'TIMED_OUT'; + + const JOB_TOO_BIG = 'JOB_TOO_BIG'; + + /** + * Connect to the queue and store the resource handle + * + * @param string $host FQDN or IP of host running beanstalkd + * @param integer $port Port on which beanstalkd is listening + * @param array $options Beanstalk specific connection options + * @todo Lazy-open stream connection + */ + public function connect($host, $port, $options = array()) { + $options += array( + 'persistent' => true, + 'host' => $host, + 'port' => $port + ); + $stream = new Stream($options); + $resource = $stream->open(); + + if (!is_resource($resource)) { + return false; + } + $this->_stream = $resource; + return true; + } + + /** + * Close the stream connection + * + * @return boolean True on successful close, false if an error occurs, and + * null if no stream connection exists. + */ + public function close() { + if (!is_resource($this->_stream)) { + return false; + } + return $this->_stream->close(); + } + + /** + * Adds a job to the queue + * + * @param object $Job Job to be added + * @param array $options Optional parameters for + * @return boolean True on successful enqueue, false otherwise + */ + public function enqueue($Job, $options = array()) { + $options += array( + 'delay' => $this->delay, + 'priority' => $this->priority, + 'ttr' => $this->runtime + ); + } + + /** + * Remove a job from the queue + * + * @param mixed $id Job id to be removed. + * @return Boolean true if dequeue was successful, false otherwise. + */ + public function dequeue($id) { + + } + + /** + * Obtain the stream resource for the active connection + * + * @return object Stream resource if it exists, null otherwise + */ + public function resource() { + return $_stream; + } + + /** + * Returns the job identified by $id, but without removing it from + * the queue + * + * @param mixed $id Id of the job to peek at. If no id is set, then + * peek at the next ready job. + * @return object Job object if it exists, null otherwise. + */ + public function peek($id = null) { + if (is_null($id)) { + //return $this->_stream->send($this->_wrap('peek-ready')); + } + } + + /** + * Prepares commands and data to be sent to the beanstalkd service by first + * terminating them with \r\n EOL markers. + * + * @param string $data Data to be EOL terminated. + * @return EOL terminated string + */ + protected function _wrap($data) { + return $data . "\r\n"; + } +} +?> \ No newline at end of file