if($callback) {
# Non-blocking
$self->log->debug("Waiting for $id on status $status in non-blocking mode");
return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
} else {
# Blocking
$self->log->debug("Waiting for $id on status $status in blocking mode");
return $self->_watch_blocking($id, $status);
}
}
sub _watch_blocking {
my ($self, $id, $status) = @_;
while(1) {
my $doc = $self->collection->find_one({'_id' => $id});
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
MangoX::Queue - A MongoDB queue implementation using Mango
=head1 DESCRIPTION
L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.
L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.
=head1 SYNOPSIS
use Mango;
use MangoX::Queue;
my $mango = Mango->new("mongodb://localhost:27017");
my $collection = $mango->db('my_db')->collection('my_queue');
my $queue = MangoX::Queue->new(collection => $collection);
# To add a job
my $id = enqueue $queue 'test'; # Blocking
enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking
# To set options
my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking
enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking
# To watch for a specific job status
watch $queue $id; # Blocking
watch $queue $id, 'Complete' => sub { # Non-blocking
# Job status is 'Complete'
};
# To fetch a job
my $job = fetch $queue; # Blocking
fetch $queue sub { # Non-blocking
my ($job) = @_;
# ...
};
# To get a job by id
my $job = get $queue $id; # Blocking
get $queue $id => sub { my $job = shift; }; # Non-blocking
# To requeue a job
my $id = requeue $queue $job; # Blocking
requeue $queue $job => sub { my $id = shift; }; # Non-blocking
# To dequeue a job
dequeue $queue $id; # Blocking
dequeue $queue $id => sub { }; # Non-blocking
# To consume a queue
while(my $job = consume $queue) { # Blocking
# ...
}
my $consumer = consume $queue sub { # Non-blocking
my ($job) = @_;
# ...
};
# To stop consuming a queue
release $queue $consumer;
# To listen for events
on $queue enqueued => sub ( my ($queue, $job) = @_; };
on $queue dequeued => sub ( my ($queue, $job) = @_; };
on $queue consumed => sub { my ($queue, $job) = @_; };
# To register a plugin
plugin $queue 'MangoX::Queue::Plugin::Statsd';
=head1 ATTRIBUTES
L<MangoX::Queue> implements the following attributes.
=head2 collection
my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));
my $queue = MangoX::Queue->new(collection => $collection);
The L<Mango::Collection> representing the MongoDB queue collection.
=head2 delay
my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);
The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.
=head2 plugins
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
=head2 retries
my $retries = $queue->retries;
$queue->retries(5);
The number of times a job will be picked up from the queue before it is
marked as failed.
=head2 timeout
my $timeout = $queue->timeout;
$queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.
=head1 EVENTS
L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events
=head2 consumed
on $queue consumed => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is consumed (either via consume or fetch)
=head2 dequeued
on $queue dequeued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is dequeued
=head2 enqueued
on $queue enqueued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is enqueued
=head1 METHODS
L<MangoX::Queue> implements the following methods.