package MangoX::Queue;

use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use DateTime;
use DateTime::Duration;
use Mojo::Log;
use Mango::BSON ':bson';
use MangoX::Queue::Delay;

no warnings 'experimental::smartmatch';

our $VERSION = '0.03';

# A logger
has 'log' => sub { Mojo::Log->new->level('error') };

# The Mango::Collection representing the queue
has 'collection';

# A MangoX::Queue::Delay
has 'delay' => sub { MangoX::Queue::Delay->new };

# How long to wait before assuming a job has failed
has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };

# How many times to retry a job before giving up
has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };

# Store Mojo::IOLoop->timer IDs
has 'consumers' => sub { {} };

# Store plugins
has 'plugins' => sub { {} };

sub new {
       my $self = shift->SUPER::new(@_);

       croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';

       return $self;
}

sub plugin {
       my ($self, $name, $options) = @_;

       croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};

       {
               no strict 'refs';
               unless($name->can('new')) {
                       eval "require $name" or croak qq{Failed to load plugin $name: $@};
               }
       }

       $self->plugins->{$name} = $name->new(%$options);

       $self->plugins->{$name}->register($self);

       return $self->plugins->{$name};
}

sub get_options {
       my ($self) = @_;

       return {
               query => {
                       '$or' => [{
                               status => {
                                       '$in' => [ 'Pending' ]
                               }
                       },{
                               status => {
                                       '$in' => [ 'Retrieved' ]
                               },
                               retrieved => {
                                       '$lt' => DateTime->now->subtract_duration(DateTime::Duration->new(seconds => $self->timeout))
                               }
                       }]
               },
               update => {
                       '$set' => {
                               status => 'Retrieved',
                               retrieved => DateTime->now,
                       },
                       '$inc' => {
                               attempt => 1,
                       }
               },
               sort => bson_doc( # Sort by priority, then in order of creation
                       'priority' => 1,
                       'created' => -1,
               ),
               new => 0, # Get the original object (so we can see status etc)
       };
}

sub enqueue {
       my ($self, @args) = @_;

       # args maybe
       # - 'job_name'
       # - foo => bar, 'job_name'
       # - 'job_name', $callback
       # - foo => bar, 'job_name', $callback

       my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
       my $job = pop @args;
       my %args;
       %args = (@args) if scalar @args;

       my $db_job = {
               priority => $args{priority} // 1,
               created => $args{created} // DateTime->now,
               data => $job,
               status => $args{status} // 'Pending',
               attempt => 1,
       };

       if($callback) {
               return $self->collection->insert($db_job => sub {
                       my ($collection, $error, $oid) = @_;
                       $db_job->{_id} = $oid;
                       $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
                       $callback->($db_job);
               });
       } else {
               my $id = $self->collection->insert($db_job);
               $db_job->{_id} = $id;
               $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
               return $db_job;
       }
}

sub watch {
       my ($self, $id_or_job, $status, $callback) = @_;

       my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

       $status //= 'Complete';

       # args
       # - watch $queue $id, 'Status' => $callback

       if($callback) {
               # Non-blocking
               $self->log->debug("Waiting for $id on status $status in non-blocking mode");
               return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
       } else {
               # Blocking
               $self->log->debug("Waiting for $id on status $status in blocking mode");
               return $self->_watch_blocking($id, $status);
       }
}

sub _watch_blocking {
       my ($self, $id, $status) = @_;

       while(1) {
               my $doc = $self->collection->find_one({'_id' => $id});
               $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

               if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
                       return 1;
               } else {
                       $self->delay->wait;
               }
       }
}

sub _watch_nonblocking {
       my ($self, $id, $status, $callback) = @_;

       $self->collection->find_one({'_id' => $id} => sub {
               my ($cursor, $err, $doc) = @_;
               $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

               if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
                       $self->log->debug("Status is $status");
                       $self->delay->reset;
                       $callback->($doc);
               } else {
                       $self->log->debug("Job not found or status doesn't match");
                       $self->delay->wait(sub {
                               return unless Mojo::IOLoop->is_running;
                               Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
                       });
                       return undef;
               }
       });
}

sub requeue {
       my ($self, $job, $callback) = @_;

       $job->{status} = 'Pending';
       return $self->update($job, $callback);
}

sub dequeue {
       my ($self, $id_or_job, $callback) = @_;

       # TODO option to not remove on dequeue?

       my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

       if($callback) {
               $self->collection->remove({'_id' => $id} => sub {
                       $callback->();
                       $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
               });
       } else {
               $self->collection->remove({'_id' => $id});
               $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
       }
}

sub get {
       my ($self, $id_or_job, $callback) = @_;

       my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

       if($callback) {
               return $self->collection->find_one({'_id' => $id} => sub {
                       my ($collection, $error, $doc) = @_;
                       $callback->($doc);
               });
       } else {
               return $self->collection->find_one({'_id' => $id});
       }
}

sub update {
       my ($self, $job, $callback) = @_;

       if($callback) {
               return $self->collection->find_one({'_id' => $job->{_id}} => sub {
                       my ($collection, $error, $doc) = @_;
                       $callback->($doc);
               });
       } else {
               return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1});
       }
}

sub fetch {
       my ($self, @args) = @_;

       # fetch $queue status => 'Complete', sub { my $job = shift; }

       my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
       my %args;
       %args = (@args) if scalar @args;

       $self->log->debug("In fetch");

       if($callback) {
               $self->log->debug("Fetching in non-blocking mode");
               my $consumer_id = (scalar keys %{$self->consumers}) + 1;
               $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
               return $consumer_id;
       } else {
               $self->log->debug("Fetching in blocking mode");
               return $self->_consume_blocking(\%args, 1);
       }
}

sub consume {
       my ($self, @args) = @_;

       # consume $queue status => 'Failed', sub { my $job = shift; }

       my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
       my %args;
       %args = (@args) if scalar @args;

       $self->log->debug("In consume");

       if($callback) {
               $self->log->debug("consuming in non-blocking mode");
               my $consumer_id = (scalar keys %{$self->consumers}) + 1;
               $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
               $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
               return $consumer_id;
       } else {
               $self->log->debug("consuming in blocking mode");
               return $self->_consume_blocking(\%args, 0);
       }
}

sub release {
       my ($self, $consumer_id) = @_;

       $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});

       Mojo::IOLoop->remove($self->consumers->{$consumer_id});
       delete $self->consumers->{$consumer_id};

       return;
}

sub _consume_blocking {
       my ($self, $args, $fetch) = @_;

       while(1) {
               my $opts = $self->get_options;
               $opts->{query} = $args if scalar keys %$args;

               my $doc = $self->collection->find_and_modify($opts);
               $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

               if($doc) {
                       $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
                       return $doc;
               } else {
                       last if $fetch;
                       $self->delay->wait;
               }
       }
}

sub _consume_nonblocking {
       my ($self, $args, $consumer_id, $callback, $fetch) = @_;

       my $opts = $self->get_options;
       $opts->{query} = $args if scalar keys %$args;

       $self->collection->find_and_modify($opts => sub {
               my ($cursor, $err, $doc) = @_;
               $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

               if($doc) {
                       $self->delay->reset;
                       $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
                       $callback->($doc);
                       return unless Mojo::IOLoop->is_running;
                       return if $fetch;
                       return unless exists $self->consumers->{$consumer_id};
                       $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
                       $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
               } else {
                       return unless Mojo::IOLoop->is_running;
                       return if $fetch;
                       $self->delay->wait(sub {
                               return unless exists $self->consumers->{$consumer_id};
                               $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
                               $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
                       });
                       return undef;
               }
       });
}

1;

=encoding utf8

=head1 NAME

MangoX::Queue - A MongoDB queue implementation using Mango

=head1 DESCRIPTION

L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.

L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.

=head1 SYNOPSIS

       use Mango;
       use MangoX::Queue;

       my $mango = Mango->new("mongodb://localhost:27017");
       my $collection = $mango->db('my_db')->collection('my_queue');

       my $queue = MangoX::Queue->new(collection => $collection);

       # To add a job
       my $id = enqueue $queue 'test'; # Blocking
       enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking

       # To set options
       my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking
       enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking

       # To watch for a specific job status
       watch $queue $id; # Blocking
       watch $queue $id, 'Complete' => sub { # Non-blocking
               # Job status is 'Complete'
       };

       # To fetch a job
       my $job = fetch $queue; # Blocking
       fetch $queue sub { # Non-blocking
               my ($job) = @_;
               # ...
       };

       # To get a job by id
       my $job = get $queue $id; # Blocking
       get $queue $id => sub { my $job = shift; }; # Non-blocking

       # To requeue a job
       my $id = requeue $queue $job; # Blocking
       requeue $queue $job => sub { my $id = shift; }; # Non-blocking

       # To dequeue a job
       dequeue $queue $id; # Blocking
       dequeue $queue $id => sub { }; # Non-blocking

       # To consume a queue
       while(my $job = consume $queue) { # Blocking
               # ...
       }
       my $consumer = consume $queue sub { # Non-blocking
               my ($job) = @_;
               # ...
       };

       # To stop consuming a queue
       release $queue $consumer;

       # To listen for events
       on $queue enqueued => sub ( my ($queue, $job) = @_; };
       on $queue dequeued => sub ( my ($queue, $job) = @_; };
       on $queue consumed => sub { my ($queue, $job) = @_; };

       # To register a plugin
       plugin $queue 'MangoX::Queue::Plugin::Statsd';

=head1 ATTRIBUTES

L<MangoX::Queue> implements the following attributes.

=head2 collection

   my $collection = $queue->collection;
   $queue->collection($mango->db('foo')->collection('bar'));

   my $queue = MangoX::Queue->new(collection => $collection);

The L<Mango::Collection> representing the MongoDB queue collection.

=head2 delay

       my $delay = $queue->delay;
       $queue->delay(MangoX::Queue::Delay->new);

The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.

=head2 plugins

       my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

=head2 retries

       my $retries = $queue->retries;
       $queue->retries(5);

The number of times a job will be picked up from the queue before it is
marked as failed.

=head2 timeout

       my $timeout = $queue->timeout;
       $queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.

=head1 EVENTS

L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events

=head2 consumed

       on $queue consumed => sub {
               my ($queue, $job) = @_;
               # ...
       };

Emitted when an item is consumed (either via consume or fetch)

=head2 dequeued

       on $queue dequeued => sub {
               my ($queue, $job) = @_;
               # ...
       };

Emitted when an item is dequeued

=head2 enqueued

       on $queue enqueued => sub {
               my ($queue, $job) = @_;
               # ...
       };

Emitted when an item is enqueued

=head1 METHODS

L<MangoX::Queue> implements the following methods.

=head2 consume

       # In blocking mode
       while(my $job = consume $queue) {
               # ...
       }
       while(my $job = $queue->consume) {
               # ...
       }

       # In non-blocking mode
       consume $queue sub {
               my ($job) = @_;
               # ...
       };
       $queue->consume(sub {
               my ($job) = @_;
               # ...
       });

Waits for jobs to arrive on the queue, sleeping between queue checks using L<MangoX::Queue::Delay> or L<Mojo::IOLoop>.

Currently sets the status to 'Retrieved' before returning the job.

=head2 dequeue

       my $job = fetch $queue;
       dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

=head2 enqueue

       enqueue $queue 'job name';
       enqueue $queue [ 'some', 'data' ];
       enqueue $queue +{ foo => 'bar' };

       $queue->enqueue('job name');
       $queue->enqueue([ 'some', 'data' ]);
       $queue->enqueue({ foo => 'bar' });

Add an item to the queue.

Currently uses priority 1 with a job status of 'Pending'.

=head2 fetch

       # In blocking mode
       my $job = fetch $queue;
       my $job = $queue->fetch;

       # In non-blocking mode
       fetch $queue sub {
               my ($job) = @_;
               # ...
       };
       $queue->fetch(sub {
               my ($job) = @_;
               # ...
       });

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

=head2 get

       my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

=head2 get_options

       my $options = $queue->get_options;

Returns the L<Mango::Collection> options hash used by find_and_modify to
identify and update available queue items.

Wait for a job to enter a certain status.

=head2 release

       my $consumer = consume $queue sub {
               # ...
       };
       release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

=head2 requeue

       my $job = fetch $queue;
       requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

=head2 update

       my $job = fetch $queue;
       $job->{status} = 'Failed';
       update $queue $job;

Updates a job in the queue.

=head2 watch

       # In blocking mode
       my $id = enqueue $queue 'test';
       watch $queue $id, 'Complete'; # blocks until job is complete

       # In non-blocking mode
       my $id = enqueue $queue 'test';
       watch $queue $id, 'Complete' => sub {
               # ...
       };

=head1 SEE ALSO

L<Mojolicious>, L<Mango>

=cut