if($callback) {
# Non-blocking
$self->log->debug("Waiting for $id on status $status in non-blocking mode");
return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
} else {
# Blocking
$self->log->debug("Waiting for $id on status $status in blocking mode");
return $self->_watch_blocking($id, $status);
}
}
sub _watch_blocking {
my ($self, $id, $status) = @_;
while(1) {
my $doc = $self->collection->find_one({'_id' => $id});
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
MangoX::Queue - A MongoDB queue implementation using Mango
=head1 DESCRIPTION
L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.
L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.
For an introduction to L<MangoX::Queue>, see L<MangoX::Queue::Tutorial>.
=head1 SYNOPSIS
=head2 Non-blocking
Non-blocking mode requires a running L<Mojo::IOLoop>.
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To add a job
enqueue $queue 'test' => sub { my $id = shift; };
# To set options
enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; };
# To watch for a specific job status
watch $queue $id, 'Complete' => sub {
# Job status is 'Complete'
};
# To fetch a job
fetch $queue sub {
my ($job) = @_;
# ...
};
# To get a job by id
get $queue $id => sub { my $job = shift; };
# To requeue a job
requeue $queue $job => sub { my $id = shift; };
# To dequeue a job
dequeue $queue $id => sub { };
# To consume a queue
my $consumer = consume $queue sub {
my ($job) = @_;
# ...
};
# To stop consuming a queue
release $queue $consumer;
# To listen for errors
on $queue error => sub { my ($queue, $error) = @_; };
=head2 Blocking
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To add a job
my $id = enqueue $queue 'test';
# To set options
my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test';
# To watch for a specific job status
watch $queue $id;
# To fetch a job
my $job = fetch $queue;
# To get a job by id
my $job = get $queue $id;
# To requeue a job
my $id = requeue $queue $job;
# To dequeue a job
dequeue $queue $id;
# To consume a queue
while(my $job = consume $queue) {
# ...
}
=head2 Other
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To listen for events
on $queue enqueued => sub ( my ($queue, $job) = @_; };
on $queue dequeued => sub ( my ($queue, $job) = @_; };
on $queue consumed => sub { my ($queue, $job) = @_; };
# To register a plugin
plugin $queue 'MangoX::Queue::Plugin::Statsd';
=head1 ATTRIBUTES
L<MangoX::Queue> implements the following attributes.
=head2 collection
my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));
my $queue = MangoX::Queue->new(collection => $collection);
The L<Mango::Collection> representing the MongoDB queue collection.
=head2 delay
my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);
The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.
=head2 plugins
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
=head2 retries
my $retries = $queue->retries;
$queue->retries(5);
The number of times a job will be picked up from the queue before it is
marked as failed.
=head2 timeout
my $timeout = $queue->timeout;
$queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.
=head1 EVENTS
L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events.
Events are emitted only for actions on the current queue object, not the entire queue.
=head2 consumed
on $queue consumed => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is consumed (either via consume or fetch)
=head2 dequeued
on $queue dequeued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is dequeued
=head2 enqueued
on $queue enqueued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is enqueued
=head1 METHODS
L<MangoX::Queue> implements the following methods.