package MangoX::Queue;

use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use Mojo::Log;
use Mango::BSON ':bson';
use MangoX::Queue::Delay;
use DateTime::Tiny;

our $VERSION = '0.08';

# A logger
has 'log' => sub { Mojo::Log->new->level('error') };

# The Mango::Collection representing the queue
has 'collection';
has 'capped';
has 'stats';

# A MangoX::Queue::Delay
has 'delay' => sub { MangoX::Queue::Delay->new };

# How long to wait before assuming a job has failed
has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };

# How many times to retry a job before giving up
has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };

# Store Mojo::IOLoop->timer IDs
has 'consumers' => sub { {} };

# Store plugins
has 'plugins' => sub { {} };

sub new {
   my $self = shift->SUPER::new(@_);

   croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';

   $self->stats($self->collection->stats);
   $self->capped($self->stats->{capped});

   $self->{pending_status} = $self->capped ? 1 : 'Pending';
   $self->{processing_status} = $self->capped ? 2 : 'Processing';
   $self->{failed_status} = $self->capped ? 3 : 'Failed';

   return $self;
}

sub plugin {
   my ($self, $name, $options) = @_;

   croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};

   {
       no strict 'refs';
       unless($name->can('new')) {
           eval "require $name" or croak qq{Failed to load plugin $name: $@};
       }
   }

   eval {
       $self->plugins->{$name} = $name->new(%$options);
       return 1;
   } or croak qq{Error calling constructor for plugin $name: $@};

   eval {
       $self->plugins->{$name}->register($self);
       return 1;
   } or croak qq{Error calling register for plugin $name: $@};

   return $self->plugins->{$name};
}

sub get_options {
   my ($self) = @_;

   return {
       query => {
           '$or' => [{
               status => {
                   '$in' => ref($self->{pending_status}) eq 'ARRAY' ? $self->{pending_status} : [ $self->{pending_status} ],
               },
               '$or' => [ { processing => 0 }, { processing => undef } ],
           },{
               status => $self->{processing_status},
               processing => {
                   '$lt' => time - $self->timeout,
               }
           }],
           attempt => {
               '$lte' => $self->retries + 1,
           },
       },
       sort => bson_doc( # Sort by priority, then in order of creation
           'priority' => 1,
           'created' => -1,
       ),
       update => {
           '$set' => {
               processing => time,
               status => $self->{processing_status},
           },
           '$inc' => {
               attempt => 1,
           }
       }
   };
}

sub enqueue {
   my ($self, @args) = @_;

   # args maybe
   # - 'job_name'
   # - foo => bar, 'job_name'
   # - 'job_name', $callback
   # - foo => bar, 'job_name', $callback

   my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
   my $job = pop @args;
   my %args;
   %args = (@args) if scalar @args;

   my $db_job = {
       priority => $args{priority} // 1,
       created => $args{created} // DateTime::Tiny->now,
       data => $job,
       status => $args{status} // $self->{pending_status},
       attempt => 1,
       processing => 0,
   };

   if($callback) {
       return $self->collection->insert($db_job => sub {
           my ($collection, $error, $oid) = @_;
           if($error) {
               $self->emit_safe(error => qq{Error inserting job into collection: $error}, $db_job, $error);
               $callback->($db_job, $error);
               return;
           }
           $db_job->{_id} = $oid;
           $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
           eval {
               $callback->($db_job, undef);
               return 1;
           } or $self->emit_safe(error => qq{Error in callback: $@}, $db_job, $@);
       });
   } else {
       eval {
           $db_job->{_id} = $self->collection->insert($db_job);
           return 1;
       } or croak qq{Error inserting job into collection: $@};
       $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
       return $db_job;
   }
}

sub watch {
   my ($self, $id_or_job, $status, $callback) = @_;

   my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

   $status //= 'Complete';

   # args
   # - watch $queue $id, 'Status' => $callback

   if($callback) {
       # Non-blocking
       $self->log->debug("Waiting for $id on status $status in non-blocking mode");
       return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
   } else {
       # Blocking
       $self->log->debug("Waiting for $id on status $status in blocking mode");
       return $self->_watch_blocking($id, $status);
   }
}

sub _watch_blocking {
   my ($self, $id, $status) = @_;

   while(1) {
       my $doc = $self->collection->find_one({'_id' => $id});
       $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

       if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) {
           return 1;
       } else {
           $self->delay->wait;
       }
   }
}

sub _watch_nonblocking {
   my ($self, $id, $status, $callback) = @_;

   $self->collection->find_one({'_id' => $id} => sub {
       my ($cursor, $err, $doc) = @_;
       $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

       if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && grep { $_ =~ $doc->{status} } @$status))) {
           $self->log->debug("Status is $status");
           $self->delay->reset;
           $callback->($doc, undef);
       } else {
           $self->log->debug("Job not found or status doesn't match");
           $self->delay->wait(sub {
               return unless Mojo::IOLoop->is_running;
               Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
           });
           return undef;
       }
   });
}

sub requeue {
   my ($self, $job, $callback) = @_;

   $job->{status} = ref($self->{pending_status}) eq 'ARRAY' ? $self->{pending_status}->[0] : $self->{pending_status};
   return $self->update($job, $callback);
}

sub dequeue {
   my ($self, $id_or_job, $callback) = @_;

   # TODO option to not remove on dequeue?

   my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

   if($callback) {
       $self->collection->remove({'_id' => $id} => sub {
           my ($collection, $error, $doc) = @_;

           if($error) {
               $self->emit_safe(error => qq(Error removing job from collection: $error), $id_or_job, $error) if $self->has_subscribers('error');
               $callback->($id_or_job, $error);
               return;
           }

           $callback->($id_or_job, undef);
           $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
       });
   } else {
       $self->collection->remove({'_id' => $id});
       $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
   }
}

sub get {
   my ($self, $id_or_job, $callback) = @_;

   my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

   if($callback) {
       return $self->collection->find_one({'_id' => $id} => sub {
           my ($collection, $error, $doc) = @_;

           if($error) {
               $self->emit_safe(error => qq(Error retrieving job: $error), $id_or_job, $error) if $self->has_subscribers('error');
           }

           $callback->($doc, $error);
       });
   } else {
       return $self->collection->find_one({'_id' => $id});
   }
}

sub update {
   my ($self, $job, $callback) = @_;

   if($callback) {
       return $self->collection->update({'_id' => $job->{_id}}, $job => sub {
           my ($collection, $error, $doc) = @_;
           if($error) {
               $self->emit_safe(error => qq(Error updating job: $error), $job, $error) if $self->has_subscribers('error');
           }
           $callback->($doc, $error);
       });
   } else {
       return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}) or croak qq{Error updating collection: $@};
   }
}

sub fetch {
   my ($self, @args) = @_;

   # fetch $queue status => 'Complete', sub { my $job = shift; }

   my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
   my %args;
   %args = (@args) if scalar @args;

   $self->log->debug("In fetch");

   if($callback) {
       $self->log->debug("Fetching in non-blocking mode");
       my $consumer_id = (scalar keys %{$self->consumers}) + 1;
       $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
       return $consumer_id;
   } else {
       $self->log->debug("Fetching in blocking mode");
       return $self->_consume_blocking(\%args, 1);
   }
}

sub consume {
   my ($self, @args) = @_;

   # consume $queue status => 'Failed', sub { my $job = shift; }

   my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
   my %args;
   %args = (@args) if scalar @args;

   $self->log->debug("In consume");

   if($callback) {
       $self->log->debug("consuming in non-blocking mode");
       my $consumer_id = (scalar keys %{$self->consumers}) + 1;
       $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
       $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
       return $consumer_id;
   } else {
       $self->log->debug("consuming in blocking mode");
       return $self->_consume_blocking(\%args, 0);
   }
}

sub release {
   my ($self, $consumer_id) = @_;

   $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});

   Mojo::IOLoop->remove($self->consumers->{$consumer_id});
   delete $self->consumers->{$consumer_id};

   return 1;
}

sub _consume_blocking {
   my ($self, $args, $fetch) = @_;

   while(1) {
       my $opts = $self->get_options;
       $opts->{query} = $args if scalar keys %$args;

       my $doc = $self->collection->find_and_modify($opts);
       $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

       if($doc && $doc->{attempt} > $self->retries) {
           $doc->{status} = $self->{failed_status};
           $self->update($doc);
           $doc = undef;
           $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
       }

       if($doc) {
           $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
           return $doc;
       } else {
           last if $fetch;
           $self->delay->wait;
       }
   }
}

sub _consume_nonblocking {
   my ($self, $args, $consumer_id, $callback, $fetch) = @_;

   my $opts = $self->get_options;
   $opts->{query} = $args if scalar keys %$args;

   $self->collection->find_and_modify($opts => sub {
       my ($cursor, $err, $doc) = @_;
       $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

       if($err) {
           $self->log->error($err);
           $self->emit_safe(error => $err);
       }

       if($doc && $doc->{attempt} > $self->retries) {
           $doc->{status} = $self->{failed_status};
           $self->update($doc);
           $doc = undef;
           $self->log->debug("Job exceeded retries, status set to failed and job abandoned");
       }

       if($doc) {
           $self->delay->reset;
           $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
           eval {
               $callback->($doc);
               return 1;
           } or $self->emit_safe(error => "Error in callback: $@");
           return unless Mojo::IOLoop->is_running;
           return if $fetch;
           return unless exists $self->consumers->{$consumer_id};
           #$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
           $self->_consume_nonblocking($args, $consumer_id, $callback, 0);
           $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
       } else {
           return unless Mojo::IOLoop->is_running;
           return if $fetch;
           $self->delay->wait(sub {
               return unless exists $self->consumers->{$consumer_id};
               #$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
               $self->_consume_nonblocking($args, $consumer_id, $callback, 0);
               $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
           });
           return undef;
       }
   });
}

1;

=encoding utf8

=head1 NAME

MangoX::Queue - A MongoDB queue implementation using Mango

=head1 DESCRIPTION

L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.

L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.

For an introduction to L<MangoX::Queue>, see L<MangoX::Queue::Tutorial>.

=head1 SYNOPSIS

=head2 Non-blocking

Non-blocking mode requires a running L<Mojo::IOLoop>.

   my $queue = MangoX::Queue->new(collection => $mango_collection);

   # To add a job
   enqueue $queue 'test' => sub { my $id = shift; };

   # To set options
   enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; };

   # To watch for a specific job status
   watch $queue $id, 'Complete' => sub {
       # Job status is 'Complete'
   };

   # To fetch a job
   fetch $queue sub {
       my ($job) = @_;
       # ...
   };

   # To get a job by id
   get $queue $id => sub { my $job = shift; };

   # To requeue a job
   requeue $queue $job => sub { my $id = shift; };

   # To dequeue a job
   dequeue $queue $id => sub { };

   # To consume a queue
   my $consumer = consume $queue sub {
       my ($job) = @_;
       # ...
   };

   # To stop consuming a queue
   release $queue $consumer;

   # To listen for errors
   on $queue error => sub { my ($queue, $error) = @_; };

=head2 Blocking

   my $queue = MangoX::Queue->new(collection => $mango_collection);

   # To add a job
   my $id = enqueue $queue 'test';

   # To set options
   my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test';

   # To watch for a specific job status
   watch $queue $id;

   # To fetch a job
   my $job = fetch $queue;

   # To get a job by id
   my $job = get $queue $id;

   # To requeue a job
   my $id = requeue $queue $job;

   # To dequeue a job
   dequeue $queue $id;

   # To consume a queue
   while(my $job = consume $queue) {
       # ...
   }

=head2 Other

   my $queue = MangoX::Queue->new(collection => $mango_collection);

   # To listen for events
   on $queue enqueued => sub ( my ($queue, $job) = @_; };
   on $queue dequeued => sub ( my ($queue, $job) = @_; };
   on $queue consumed => sub { my ($queue, $job) = @_; };

   # To register a plugin
   plugin $queue 'MangoX::Queue::Plugin::Statsd';

=head1 ATTRIBUTES

L<MangoX::Queue> implements the following attributes.

=head2 collection

   my $collection = $queue->collection;
   $queue->collection($mango->db('foo')->collection('bar'));

   my $queue = MangoX::Queue->new(collection => $collection);

The L<Mango::Collection> representing the MongoDB queue collection.

=head2 delay

   my $delay = $queue->delay;
   $queue->delay(MangoX::Queue::Delay->new);

The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.

=head2 plugins

   my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

=head2 retries

   my $retries = $queue->retries;
   $queue->retries(5);

The number of times a job will be picked up from the queue before it is
marked as failed.

=head2 timeout

   my $timeout = $queue->timeout;
   $queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.

=head1 EVENTS

L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events.

Events are emitted only for actions on the current queue object, not the entire queue.

=head2 consumed

   on $queue consumed => sub {
       my ($queue, $job) = @_;
       # ...
   };

Emitted when an item is consumed (either via consume or fetch)

=head2 dequeued

   on $queue dequeued => sub {
       my ($queue, $job) = @_;
       # ...
   };

Emitted when an item is dequeued

=head2 enqueued

   on $queue enqueued => sub {
       my ($queue, $job) = @_;
       # ...
   };

Emitted when an item is enqueued

=head1 METHODS

L<MangoX::Queue> implements the following methods.

=head2 consume

   # In blocking mode
   while(my $job = consume $queue) {
       # ...
   }

   # In non-blocking mode
   consume $queue sub {
       my ($job) = @_;
       # ...
   };

Waits for jobs to arrive on the queue, sleeping between queue checks using L<MangoX::Queue::Delay> or L<Mojo::IOLoop>.

Currently sets the status to 'Retrieved' before returning the job.

=head2 dequeue

   my $job = fetch $queue;
   dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

=head2 enqueue

   my $id = enqueue $queue 'job name';
   my $id = enqueue $queue [ 'some', 'data' ];
   my $id = enqueue $queue +{ foo => 'bar' };

Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.

You can set queue options including priority, created and status.

   my $id = enqueue $queue,
       priority => 1,
       created => time,
       status => 'Pending',
       +{
           foo => 'bar'
       };

For non-blocking mode, pass in a coderef as the final argument.

   my $id = enqueue $queue 'job_name' => sub {
       # ...
   };

   my $id = enqueue $queue priority => 1, +{
       foo => 'bar',
   } => sub {
       # ...
   };

Sets the status to 'Pending' by default.

=head2 fetch

   # In blocking mode
   my $job = fetch $queue;

   # In non-blocking mode
   fetch $queue sub {
       my ($job) = @_;
       # ...
   };

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

=head2 get

   # In non-blocking mode
   get $queue $id => sub {
       my ($job) = @_;
       # ...
   };

   # In blocking mode
   my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

You can also pass in a job instead of an ID.

   $job = get $queue $job;

=head2 get_options

   my $options = $queue->get_options;

Returns the L<Mango::Collection> options hash used by find_and_modify to
identify and update available queue items.

=head2 release

   my $consumer = consume $queue sub {
       # ...
   };
   release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

=head2 requeue

   my $job = fetch $queue;
   requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

=head2 update

   my $job = fetch $queue;
   $job->{status} = 'Failed';
   update $queue $job;

Updates a job in the queue.

=head2 watch

Wait for a job to enter a certain status.

   # In blocking mode
   my $id = enqueue $queue 'test';
   watch $queue $id, 'Complete'; # blocks until job is complete

   # In non-blocking mode
   my $id = enqueue $queue 'test';
   watch $queue $id, 'Complete' => sub {
       # ...
   };

=head1 ERRORS

Errors are reported by MangoX::Queue using callbacks and L<Mojo::EventEmitter>

To listen for all errors on a queue, subscribe to the 'error' event:

   $queue->on(error => sub {
       my ($queue, $job, $error) = @_;
       # ...
   });

To check for errors against an individual update, enqueue or dequeue call,
you can check for an error argument to the callback sub:

   enqueue $queue +$job => sub {
       my ($job, $error) = @_;

       if($error) {
           # ...
       }
   }

=head1 SEE ALSO

L<MangoX::Queue::Tutorial>, L<Mojolicious>, L<Mango>

=cut