NAME
Net::Async::Redis - talk to Redis servers via IO::Async
SYNOPSIS
use Net::Async::Redis;
use Future::AsyncAwait;
use IO::Async::Loop;
my $loop = IO::Async::Loop->new;
$loop->add(my $redis = Net::Async::Redis->new);
(async sub {
await $redis->connect;
my $value = await $redis->get('some_key');
$value ||= await $redis->set(some_key => 'some_value');
print "Value: $value";
})->()->get;
# You can also use ->then chaining, see L<Future> for more details
$redis->connect->then(sub {
$redis->get('some_key')
})->then(sub {
my $value = shift;
return Future->done($value) if $value;
$redis->set(some_key => 'some_value')
})->on_done(sub {
print "Value: " . shift;
})->get;
# ... or with Future::AsyncAwait (recommended)
await $redis->connect;
my $value = await $redis->get('some_key');
$value ||= await $redis->set(some_key => 'some_value');
print "Value: $value";
DESCRIPTION
Provides client access for dealing with Redis servers.
See Net::Async::Redis::Commands for the full list of commands, this
list is autogenerated from the official documentation here:
https://redis.io/commands
This is intended to be a near-complete low-level client module for
asynchronous Redis support. See Net::Async::Redis::Server for a
(limited) Perl server implementation. It is an unofficial Perl port and
not endorsed by the Redis server maintainers in any way.
Supported features
Current features include:
* all commands <
https://redis.io/commands> as of 6.0.7 (August 2020),
see
https://redis.io/commands for the methods and parameters
* pub/sub support <
https://redis.io/topics/pubsub>, see "METHODS -
Subscriptions"
* pipelining <
https://redis.io/topics/pipelining>, see
"pipeline_depth"
* transactions <
https://redis.io/topics/transactions>, see "METHODS -
Transactions"
* streams <
https://redis.io/topics/streams-intro> and consumer
groups, via "XADD" in Net::Async::Redis::Commands and related methods
* client-side caching <
https://redis.io/topics/client-side-caching>,
see "METHODS - Clientside caching"
Connecting
As with any other IO::Async::Notifier-based module, you'll need to add
this to an IO::Async::Loop:
my $loop = IO::Async::Loop->new;
$loop->add(
my $redis = Net::Async::Redis->new
);
then connect to the server:
$redis->connect
->then(sub {
# You could achieve a similar result by passing client_name in
# constructor or ->connect parameters
$redis->client_setname("example client")
})->get;
Key-value handling
One of the most common Redis scenarios is as a key/value store. The
"get" and "set" methods are typically used here:
$redis->set(some_key => 'some value')
->then(sub {
$redis->get('some_key')
})->on_done(sub {
my ($value) = @_;
print "Read back value [$value]\n";
})->retain;
See the next section for more information on what these methods are
actually returning.
Requests and responses
Requests are implemented as methods on the Net::Async::Redis object.
These typically return a Future which will resolve once ready:
my $future = $redis->incr("xyz")
->on_done(sub {
print "result of increment was " . shift . "\n"
});
For synchronous code, call ->get on that Future:
print "Database has " . $redis->dbsize->get . " total keys\n";
This means you can end up with ->get being called on the result of
->get, note that these are two very different methods:
$redis
->get('some key') # this is being called on $redis, and is issuing a GET request
->get # this is called on the returned Future, and blocks until the value is ready
Typical async code would not be expected to use the "get" in Future
method extensively; often only calling it in one place at the top level
in the code.
Error handling
Since Future is used for deferred results, failure is indicated by a
failing Future with failure category of redis.
The "catch" in Future feature may be useful for handling these:
$redis->lpush(key => $value)
->catch(
redis => sub { warn "probably an incorrect type, cannot push value"; Future->done }
)->get;
Note that this module uses Future::AsyncAwait internally.
CONSTANTS
OPENTRACING_ENABLED
Defaults to false, this can be controlled by the USE_OPENTRACING
environment variable. This provides a way to set the default
opentracing mode for all Net::Async::Redis instances - you can
enable/disable for a specific instance via "configure":
$redis->configure(opentracing => 1);
When enabled, this will create a span for every Redis request. See
OpenTracing::Any for details.
METHODS
NOTE: For a full list of the Redis methods supported by this module,
please see Net::Async::Redis::Commands.
METHODS - Subscriptions
See
https://redis.io/topics/pubsub for more details on this topic.
There's also more details on the internal implementation in Redis here:
https://making.pusher.com/redis-pubsub-under-the-hood/.
psubscribe
Subscribes to a pattern.
Example:
# Subscribe to 'info::*' channels, i.e. any message
# that starts with the C<info::> prefix, and prints them
# with a timestamp.
$redis_connection->psubscribe('info::*')
->then(sub {
my $sub = shift;
$sub->map('payload')
->each(sub {
print localtime . ' ' . $_ . "\n";
})->retain
})->get;
# this will block until the subscribe is confirmed. Note that you can't publish on
# a connection that's handling subscriptions due to Redis protocol restrictions.
$other_redis_connection->publish('info::example', 'a message here')->get;
Returns a Future which resolves to a Net::Async::Redis::Subscription
instance.
subscribe
Subscribes to one or more channels.
Returns a Future which resolves to a Net::Async::Redis::Subscription
instance.
Example:
# Subscribe to 'notifications' channel,
# print the first 5 messages, then unsubscribe
$redis->subscribe('notifications')
->then(sub {
my $sub = shift;
$sub->events
->map('payload')
->take(5)
->say
->completed
})->then(sub {
$redis->unsubscribe('notifications')
})->get
METHODS - Transactions
multi
Executes the given code in a Redis MULTI transaction.
This will cause each of the requests to be queued on the server, then
applied in a single atomic transaction.
Note that the commands will resolve only after the transaction is
committed: for example, when the "set" command is issued, Redis will
return QUEUED. This information is not used as the result - we only
pass through the immediate response if there was an error. The Future
representing the response will be marked as done once the EXEC command
is applied and we have the results back.
Example:
$redis->multi(sub {
my $tx = shift;
$tx->incr('some::key')->on_done(sub { print "Final value for incremented key was " . shift . "\n"; });
$tx->set('other::key => 'test data')
})->then(sub {
my ($success, $failure) = @_;
return Future->fail("Had $failure failures, expecting everything to succeed") if $failure;
print "$success succeeded\m";
return Future->done;
})->retain;
METHODS - Clientside caching
Enable clientside caching by passing a true value for
client_side_caching_enabled in "configure" or "new". This is currently
experimental, and only operates on "get" in Net::Async::Redis::Commands
requests.
See
https://redis.io/topics/client-side-caching for more details on
this feature.
METHODS - Generic
keys
watch_keyspace
A convenience wrapper around the keyspace notifications API.
Provides the necessary setup to establish a PSUBSCRIBE subscription on
the __keyspace@*__ namespace, setting the configuration required for
this to start emitting events, and then calls $code with each event.
Note that this will switch the connection into pubsub mode, so it will
no longer be available for any other activity.
Resolves to a Ryu::Source instance.
endpoint
The string describing the remote endpoint.
local_endpoint
A string describing the local endpoint, usually host:port.
connect
Connects to the Redis server.
Will use the "configure"d parameters if available, but as a convenience
can be passed additional parameters which will then be applied as if
you had called "configure" with those beforehand. This also means that
they will be preserved for subsequent "connect" calls.
connected
Establishes a connection if needed, otherwise returns an
immediately-available Future instance.
on_message
Called for each incoming message.
Passes off the work to "handle_pubsub_message" or the next queue item,
depending on whether we're dealing with subscriptions at the moment.
stream
Represents the IO::Async::Stream instance for the active Redis
connection.
pipeline_depth
Number of requests awaiting responses before we start queuing. This
defaults to an arbitrary value of 100 requests.
Note that this does not apply when in transaction (MULTI) mode.
See
https://redis.io/topics/pipelining for more details on this
concept.
opentracing
Indicates whether OpenTracing::Any support is enabled.
METHODS - Deprecated
This are still supported, but no longer recommended.
METHODS - Internal
notify_close
Called when the socket is closed.
command_label
Generate a label for the given command list.
stream_read_len
Defines the buffer size when reading from a Redis connection.
Defaults to 1MB, reduce this if you're dealing with a lot of
connections and want to minimise memory usage. Alternatively, if you're
reading large amounts of data and spend too much time in needless
epoll_wait calls, try a larger value.
stream_write_len
The buffer size when writing to Redis connections, in bytes. Defaults
to 1MB.
See "stream_read_len".
configure
Applies configuration parameters - currently supports:
* host
* port
* auth
* database
* pipeline_depth
* stream_read_len
* stream_write_len
* on_disconnect
* client_name
* opentracing
SEE ALSO
Some other Redis implementations on CPAN:
* Mojo::Redis2 - nonblocking, using the Mojolicious framework,
actively maintained
* MojoX::Redis - changelog mentions that this was obsoleted by
Mojo::Redis, although there have been new versions released since
then
* RedisDB - another synchronous (blocking) implementation, handles
pub/sub and autoreconnect
* Cache::Redis - wrapper around RedisDB
* Redis::Fast - wraps hiredis, faster than Redis
* Redis::Jet - also XS-based, docs mention very early development
stage but appears to support pipelining and can handle newer commands
via ->command.
* Redis - synchronous (blocking) implementation, handles pub/sub and
autoreconnect
AUTHOR
Tom Molesworth <
[email protected]>
CONTRIBUTORS
With thanks to the following for contributing patches, bug reports,
tests and feedback:
*
[email protected]
*
[email protected]
* @eyadof
* Nael Alolwani
LICENSE
Copyright Tom Molesworth and others 2015-2020. Licensed under the same
terms as Perl itself.