Subject: | pipelining and error handling patch |
The attached patch adds wrappers for the hiredis functions
redisAppendCommand() and redisGetReply() in order to support pipelining.
It also changes error handling so exceptions are thrown when errors
occur. Currently, on error, the error string is returned instead of the
expected value. This was an issue for me b/c there wasn't a clean way to
detect when an error occurred. This change makes the module's behavior
deviate a bit from the underlying C library but that's already the case
since we are returning the value from the reply instead of a data
structure representing the reply.
I also modularized things a bit and tried to make spacing consistent.
Subject: | Redis-hires-pipeline.patch |
diff --git a/MANIFEST b/MANIFEST
index 9f199ed..3149b54 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -20,6 +20,8 @@ t/004_sets.t
t/005_zsets.t
t/006_hashes.t
t/007_transactions.t
+t/008_pipeline.t
+t/009_errors.t
hiredis.pm
typemap
META.yml Module meta-data (added by MakeMaker)
diff --git a/hiredis.pm b/hiredis.pm
index 456f6b4..759a61c 100644
--- a/hiredis.pm
+++ b/hiredis.pm
@@ -20,6 +20,12 @@ Redis::hiredis - interact with Redis using the hiredis client.
$redis->command('set foo bar');
my $val = $redis->command('get foo');
+ # to pipeline commands
+ $redis->append_command('set abc 123');
+ $redis->append_command('get abc');
+ my $set_status = $redis->get_reply(); # 'OK'
+ my $get_val = $redis->get_reply(); # 123
+
=head1 DESCRIPTION
C<Redis::hiredis> is a simple wrapper around Salvatore Sanfilippo's
@@ -52,6 +58,22 @@ the official redis cli
command will return a scalar value which will either be an integer, string
or an array ref (if multiple values are returned).
+=item append_command( $command )
+
+For performance reasons, it's sometimes useful to pipeline commands. When
+pipelining, muiltple commands are sent to the server at once and the results
+are read as they become available. hiredis supports this via append_command()
+and get_reply(). Commands passed to append_command() are buffered locally
+until the first call to get_reply() when all the commands are sent to the
+server at once. The results are then returned one at a time via calls to
+get_reply().
+
+See the hiredis documentation for a more detailed explanation.
+
+=item get_reply()
+
+See append_command().
+
=back
=head1 SEE ALSO
@@ -59,6 +81,9 @@ or an array ref (if multiple values are returned).
The Redis command reference can be found here:
F<http://redis.io/commands>
+A discusion of pipelining can be found here:
+F<http://redis.io/topics/pipelining>
+
Documentation on the hiredis client can be found here:
F<http://github.com/antirez/hiredis>
diff --git a/hiredis.xs b/hiredis.xs
index dd7afd3..6bbdf16 100644
--- a/hiredis.xs
+++ b/hiredis.xs
@@ -4,90 +4,130 @@
#include "ppport.h"
#include <string.h>
+#include <stdio.h>
#include "lib-hiredis.h"
#include "lib-net.h"
#include "lib-sds.h"
typedef struct redhi_obj {
- redisContext *context;
+ redisContext *context;
} redhi_obj;
typedef redhi_obj *Redis__hiredis;
-MODULE = Redis::hiredis PACKAGE = Redis::hiredis PREFIX = redis_hiredis_
+SV * _readReply (redisReply *reply);
+SV * _readMultiBulkReply (redisReply *reply);
+SV * _readBulkReply (redisReply *reply);
-SV *
-redis_hiredis_connect(self, hostname, port=6379)
- Redis::hiredis self
- char *hostname
- int port
- CODE:
- self->context = redisConnect(hostname, port);
- if ( self->context->err ) {
- RETVAL = newSVpvn(self->context->errstr, strlen(self->context->errstr));
+SV * _readReply (redisReply *reply) {
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ return _readMultiBulkReply(reply);
}
else {
- RETVAL = newSV(1);
+ return _readBulkReply(reply);
}
- OUTPUT:
- RETVAL
+}
-SV *
-redis_hiredis_command(self, cmd)
- Redis::hiredis self
- char *cmd
- PREINIT:
+SV * _readMultiBulkReply (redisReply *reply) {
+ SV *sv;
AV *arr_reply;
- redisReply *reply;
int i;
- CODE:
- reply = redisCommand(self->context, cmd);
- if ( reply->type == REDIS_REPLY_ERROR
- || reply->type == REDIS_REPLY_STRING
- || reply->type == REDIS_REPLY_STATUS )
- {
- RETVAL = newSVpvn(reply->str,reply->len);
- }
- else if ( reply->type == REDIS_REPLY_ARRAY) {
- arr_reply = newAV();
- for ( i=0; i<reply->elements; i++ ) {
- if ( reply->element[i]->type == REDIS_REPLY_ERROR
- || reply->element[i]->type == REDIS_REPLY_STRING
- || reply->element[i]->type == REDIS_REPLY_STATUS )
- {
- av_push(arr_reply,
- newSVpvn(reply->element[i]->str, reply->element[i]->len)
- );
- }
- else if ( reply->element[i]->type == REDIS_REPLY_INTEGER ) {
- av_push(arr_reply, newSViv(reply->element[i]->integer));
- }
- }
- RETVAL = newRV_inc((SV*)arr_reply);
- }
- else if ( reply->type == REDIS_REPLY_INTEGER ) {
- RETVAL = newSViv(reply->integer);
- }
- else {
- // either REDIS_REPLY_NIL or something is awry
- RETVAL = newSV(0);
+ arr_reply = newAV();
+ for ( i=0; i < reply->elements; i++) {
+ av_push(arr_reply, _readBulkReply(reply->element[i]));
+ }
+ sv = newRV_inc((SV*)arr_reply);
+
+ return sv;
+}
+
+SV * _readBulkReply (redisReply *reply) {
+ SV *sv;
+
+ if ( reply->type == REDIS_REPLY_ERROR ) {
+ croak(reply->str);
+ }
+ else if ( reply->type == REDIS_REPLY_STRING
+ || reply->type == REDIS_REPLY_STATUS ) {
+ sv = newSVpvn(reply->str,reply->len);
+ }
+ else if ( reply->type == REDIS_REPLY_INTEGER ) {
+ sv = newSViv(reply->integer);
+ }
+ else {
+ // either REDIS_REPLY_NIL or something is awry
+ sv = newSV(0);
+ }
+
+ return sv;
+}
+
+void assertConnected (redhi_obj *self) {
+ if (self->context == NULL) {
+ croak("Not connected.");
+ }
+}
+
+MODULE = Redis::hiredis PACKAGE = Redis::hiredis PREFIX = redis_hiredis_
+
+void
+redis_hiredis_connect(self, hostname, port=6379)
+ Redis::hiredis self
+ char *hostname
+ int port
+ CODE:
+ self->context = redisConnect(hostname, port);
+ if ( self->context->err ) {
+ croak(self->context->errstr);
}
+
+SV *
+redis_hiredis_command(self, cmd)
+ Redis::hiredis self
+ char *cmd
+ PREINIT:
+ redisReply *reply;
+ CODE:
+ assertConnected(self);
+ reply = redisCommand(self->context, cmd);
+ RETVAL = _readReply(reply);
+ freeReplyObject(reply);
+ OUTPUT:
+ RETVAL
+
+void
+redis_hiredis_append_command(self, cmd)
+ Redis::hiredis self
+ char *cmd
+ CODE:
+ assertConnected(self);
+ redisAppendCommand(self->context, cmd);
+
+SV *
+redis_hiredis_get_reply(self)
+ Redis::hiredis self
+ PREINIT:
+ redisReply *reply;
+ CODE:
+ assertConnected(self);
+ redisGetReply(self->context, (void **) &reply);
+ RETVAL = _readReply(reply);
freeReplyObject(reply);
- OUTPUT:
- RETVAL
+ OUTPUT:
+ RETVAL
Redis::hiredis
redis_hiredis_new(clazz)
- char *clazz
- CODE:
- RETVAL = calloc(1, sizeof(struct redhi_obj));
- OUTPUT:
- RETVAL
+ char *clazz
+ CODE:
+ RETVAL = calloc(1, sizeof(struct redhi_obj));
+ OUTPUT:
+ RETVAL
void
redis_hiredis_DESTROY(self)
- Redis::hiredis self
- CODE:
- if ( self->context != NULL )
- redisFree(self->context);
+ Redis::hiredis self
+ CODE:
+ if ( self->context != NULL )
+ redisFree(self->context);
diff --git a/t/004_sets.t b/t/004_sets.t
index 2e04d8b..a06111a 100644
--- a/t/004_sets.t
+++ b/t/004_sets.t
@@ -28,7 +28,6 @@ SKIP: {
$r = $h->command('spop '.$prefix.'foo');
like($r, qr/(foo|bar)/, 'spop');
- $h->command(['del', $prefix.'foo']);
$r = $h->command('sadd '.$prefix.'foo foo');
$r = $h->command('sadd '.$prefix.'foo bar');
$r = $h->command('sadd '.$prefix.'foo baz');
diff --git a/t/008_pipeline.t b/t/008_pipeline.t
new file mode 100644
index 0000000..a1b4bcc
--- /dev/null
+++ b/t/008_pipeline.t
@@ -0,0 +1,50 @@
+use strict;
+use warnings;
+use Test::More;
+
+plan skip_all => q/$ENV{'REDISHOST'} isn't set/
+ if !defined $ENV{'REDISHOST'};
+
+{
+ use_ok 'Redis::hiredis';
+ my $h = Redis::hiredis->new();
+ isa_ok($h, 'Redis::hiredis');
+
+ my $host = $ENV{'REDISHOST'};
+ my $port = $ENV{'REDISPORT'} || 6379;
+
+ my $r;
+ my $c = $h->connect($host, $port);
+ is($c, undef, 'connect success');
+
+ my $prefix = "Redis-hiredis-$$-";
+
+ $h->command("set $prefix:foo0 bar0");
+ $h->command("set $prefix:foo1 bar1");
+ $h->command("set $prefix:foo2 bar2");
+
+ $h->append_command("get $prefix:foo0");
+ $h->append_command("get $prefix:foo1");
+ $h->append_command("get $prefix:foo2");
+
+ my $r0 = $h->get_reply();
+ my $r1 = $h->get_reply();
+ my $r2 = $h->get_reply();
+
+ is $r0, 'bar0', 'pipeline reply 0';
+ is $r1, 'bar1', 'pipeline reply 1';
+ is $r2, 'bar2', 'pipeline reply 2';
+
+ $h->append_command("rpush $prefix:list aaa");
+ $h->append_command("rpush $prefix:list bbb");
+ $h->append_command("rpush $prefix:list ccc");
+ $h->append_command("lrange $prefix:list 0 3");
+
+ is $h->get_reply(), 1, 'rpush reply0';
+ is $h->get_reply(), 2, 'rpush reply1';
+ is $h->get_reply(), 3, 'rpush reply2';
+
+ is_deeply $h->get_reply(), [ qw(aaa bbb ccc) ], 'lrange reply';
+}
+
+done_testing();
diff --git a/t/009_errors.t b/t/009_errors.t
new file mode 100644
index 0000000..fd13254
--- /dev/null
+++ b/t/009_errors.t
@@ -0,0 +1,67 @@
+use strict;
+use warnings;
+use Test::More;
+use Test::Exception;
+
+plan skip_all => q/$ENV{'REDISHOST'} isn't set/
+ if !defined $ENV{'REDISHOST'};
+
+{
+ use_ok 'Redis::hiredis';
+ my $h = Redis::hiredis->new();
+ isa_ok $h, 'Redis::hiredis';
+
+ my $host = $ENV{'REDISHOST'};
+ my $port = $ENV{'REDISPORT'} || 6379;
+
+ #
+ # bad connect
+ #
+ throws_ok
+ sub { $h->connect('fake_host', $port) },
+ qr/Can't resolve: fake_host/,
+ 'connect failed correctly';
+
+ lives_ok
+ sub { $h->connect($host, $port) },
+ 'connect worked';
+
+ #
+ # bad command
+ #
+ throws_ok
+ sub { $h->command( 'NO_SUCH_CMD' ) },
+ qr/ERR unknown command 'NO_SUCH_CMD'/,
+ 'command failed correctly';
+
+ #
+ # partially bad pipeline
+ #
+ lives_ok
+ sub { $h->append_command('BAD_CMD0') },
+ 'append_command 0 worked';
+
+ lives_ok
+ sub { $h->append_command('PING') },
+ 'append_command 0 worked';
+
+ lives_ok
+ sub { $h->append_command('BAD_CMD2') },
+ 'append_command 0 worked';
+
+ throws_ok
+ sub { $h->get_reply() },
+ qr/ERR unknown command 'BAD_CMD0'/,
+ 'pipeline cmd 0 failed correctly';
+
+ lives_ok
+ sub { $h->get_reply() },
+ 'pipeline cmd 1 worked';
+
+ throws_ok
+ sub { $h->get_reply() },
+ qr/ERR unknown command 'BAD_CMD2'/,
+ 'pipeline cmd 2 failed correctly';
+};
+
+done_testing();