[](
https://travis-ci.org/avast/RabbitMQ-Consumer-Batcher)
# NAME
RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages
# SYNOPSIS
use AnyEvent;
use AnyEvent::RabbitMQ::PubSub;
use AnyEvent::RabbitMQ::PubSub::Consumer;
use RabbitMQ::Consumer::Batcher;
my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
);
my $exchange = {
exchange => 'my_test_exchange',
type => 'topic',
durable => 0,
auto_delete => 1,
};
my $queue = {
queue => 'my_test_queue';
auto_delete => 1,
};
my $routing_key = 'my_rk';
my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
channel => $channel,
exchange => $exchange,
queue => $queue,
routing_key => $routing_key,
);
$consumer->init(); #declares channel, queue and binding
my $batcher = RabbitMQ::Consumer::Batcher->new(
batch_size => $consumer->prefetch_count,
on_add => sub {
my ($batcher, $msg) = @_;
my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload());
return $decode_payload;
},
on_add_catch => sub {
my ($batcher, $msg, $exception) = @_;
if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) {
$stats->increment($exception->{payload}{stats_key});
}
if ($exception->$_isa('failure') && $exception->{payload}{reject}) {
$batcher->reject($msg);
$log->error("consume failed - reject: $exception\n".$msg->{body}->payload());
}
else {
$batcher->reject_and_republish($msg);
$log->error("consume failed - republish: $exception");
}
},
on_batch_complete => sub {
my ($batcher, $batch) = @_;
path(...)->spew(join "\t", map { $_->value() } @$batch);
},
on_batch_complete_catch => sub {
my ($batcher, $batch, $exception) = @_;
$log->error("save messages to file failed: $exception");
}
);
my $cv = AnyEvent->condvar();
$consumer->consume($cv, $batcher->consume_code())->then(sub {
say 'Consumer was started...';
});
# DESCRIPTION
If you need batch of messages from RabbitMQ - this module is for you.
This module work well with [AnyEvent::RabbitMQ::PubSub::Consumer](
https://metacpan.org/pod/AnyEvent::RabbitMQ::PubSub::Consumer)
Idea of this module is - in _on\_add_ phase is message validate and if is corrupted, can be reject.
In _on\_batch\_complete_ phase we manipulated with message which we don't miss.
If is some problem in this phase, messages are republished..
# METHODS
## new(%attributes)
### attributes
#### batch\_size
Max batch size (trigger for `on_batch_complete`)
`batch_size` must be `prefetch_count` or bigger!
this is required attribute
#### on\_add
this callback are called after consume one single message. Is usefully for decoding for example.
return value of callback are used as value in batch item ([RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item))
default behaviour is payload of message is used as item in batch
return sub {
my($batcher, $msg) = @_;
return $msg->{body}->payload()
}
parameters which are give to callback:
- `$batcher`
self instance of [RabbitMQ::Consumer::Batcher](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher)
- `$msg`
consumed message ["on\_consume" in AnyEvent::RabbitMQ::Channel](
https://metacpan.org/pod/AnyEvent::RabbitMQ::Channel#on_consume)
#### on\_add\_catch
this callback are called if `on_add` callback throws
default behaviour do reject message
return sub {
my ($batcher, $msg, $exception) = @_;
$batcher->reject($msg);
}
parameters which are give to callback:
- `$batcher`
self instance of [RabbitMQ::Consumer::Batcher](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher)
- `$msg`
consumed message ["on\_consume" in AnyEvent::RabbitMQ::Channel](
https://metacpan.org/pod/AnyEvent::RabbitMQ::Channel#on_consume)
- `$exception`
exception string
#### on\_batch\_complete
this callback is triggered if batch is complete (count of items is `batch_size`)
this is required attribute
parameters which are give to callback:
- `$batcher`
self instance of [RabbitMQ::Consumer::Batcher](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher)
- `$batch`
batch is _ArrayRef_ of [RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item)
example `on_batch_complete` _CodeRef_ (item _value_ are _string_s)
return sub {
my($batcher, $batch) = @_;
print join "\n", map { $_->value() } @$batch;
$batcher->ack($batch);
}
#### on\_batch\_complete\_catch
this callback are called if `on_batch_complete` callback throws
after this callback is batch _reject\_and\_republish_
If you need change _reject\_and\_republish_ of batch to (for example) _reject_, you can do:
return sub {
my ($batcher, $batch, $exception) = @_;
$batcher->reject($batch);
#batch_clean must be called,
#because reject_and_republish after this exception handler will be called to...
$batcher->batch_clean();
}
parameters which are give to callback:
- `$batcher`
self instance of [RabbitMQ::Consumer::Batcher](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher)
- `$batch`
_ArrayRef_ of [RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item)s
- `$exception`
exception string
## consume\_code()
return `sub{}` for handling messages in `consume` method of [AnyEvent::RabbitMQ::PubSub::Consumer](
https://metacpan.org/pod/AnyEvent::RabbitMQ::PubSub::Consumer)
$consumer->consume($cv, $batcher->consume_code());
## ack(@items)
ack all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message))
## reject(@items)
reject all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message))
## reject\_and\_republish(@items)
reject and republish all `@items` (instances of [RabbitMQ::Consumer::Batcher::Item](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Item) or [RabbitMQ::Consumer::Batcher::Message](
https://metacpan.org/pod/RabbitMQ::Consumer::Batcher::Message))
# contributing
for dependency use [cpanfile](
https://metacpan.org/pod/cpanfile)...
for resolve dependency use [Carton](
https://metacpan.org/pod/Carton) (or [Carmel](
https://metacpan.org/pod/Carmel) - is more experimental)
carton install
for run test use `minil test`
carton exec minil test
if you don't have perl environment, is best way use docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test
## warning
docker run default as root, all files which will be make in docker will be have root rights
one solution is change rights in docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ."
or after docker command (but you must have root rights)
# LICENSE
Copyright (C) Avast Software.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
# AUTHOR
Jan Seidl <
[email protected]>