Subject: | Fix pending requests getting responses from other queries |
Paul,
Final issue in my series of "patches without tests" ... promise!
This issue is very critical, because it causes queries to get the wrong response.
With reference to on_read in Connection.pm, Consider:
$cassandra->query("INSERT...")->then(sub {
$cassandra->query_rows("SELECT...")
})->then(...)
If this happens when the insert goes into pending queue then because the $streamid is set to undef, *before* $f->done is called.
The future then runs a new query_rows, which populates streams->[$streamid] with that SELECT request.
The pending condition is then hit, which writes over the $streamid again.
Now when we next read from cassandra, we tell the SELECT query futures about the INSERT query response.
Fortunately this was caught by the fact that query_rows expects $type, and INSERT returns void.
However that failure only showed up in an undef comparison check, because the fail future wasn't actually returned.
I've added another explicit check there, just to make sure it's clear which situation we're in.
So this patch fixes that, as well as noting that the MAX_STREAMS is due to increase in the near future with native protocol version 3, where it becomes 2 bytes and 32k concurrency.
I hope it's self-evident from the patch and the comments.
Again, I'd really like to have a test to cover this, but I was struggling with the test suite to find a way to mock it.
Since this issue is so important, I thought I'd get it over first.
Regards
Gareth
Subject: | Net-Async-CassandraCQL-0.11-pending.patch |
diff -Naurb ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL/Connection.pm ./lib/Net/Async/CassandraCQL/Connection.pm
--- ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL/Connection.pm 2015-01-24 09:43:27.919671085 +0000
+++ ./lib/Net/Async/CassandraCQL/Connection.pm 2015-01-25 13:48:03.386798084 +0000
@@ -21,6 +22,10 @@
use constant HAVE_SNAPPY => eval { require Compress::Snappy };
use constant HAVE_LZ4 => eval { require Compress::LZ4 };
+# Max streams 127, because of streamid packed in the native protocol as C.
+# Version 3 of the protocol changes that, and this can be raised
+use constant MAX_STREAMS => 127;
+
use Protocol::CassandraCQL qw(
:opcodes :results :consistencies FLAG_COMPRESS
build_frame parse_frame
@@ -218,7 +223,6 @@
my $frame = Protocol::CassandraCQL::Frame->new( $body );
if( my $f = $self->{streams}[$streamid] ) {
- undef $self->{streams}[$streamid];
if( $opcode == OPCODE_ERROR ) {
my ( $err, $message ) = parse_error_frame( $version, $frame );
@@ -231,6 +235,10 @@
$f->done( $opcode, $frame, $version );
}
+ # Undefined after running $f->done, so that $f doesn't get to jump the queue ahead of pending requests
+ # NB: Moving this above $f->done would mean protecting pending against assuming this streamid is still free
+ undef $self->{streams}[$streamid];
+
if( my $next = shift @{ $self->{pending} } ) {
my ( $opcode, $frame, $f ) = @$next;
$self->_send( $opcode, $streamid, $frame, $f );
@@ -325,7 +333,7 @@
}
if( !defined $id ) {
- if( $#$streams == 127 ) {
+ if( $#$streams == MAX_STREAMS ) {
push @{ $self->{pending} }, [ $opcode, $frame, $f ];
return $f;
}
diff -Naurb ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL.pm ./lib/Net/Async/CassandraCQL.pm
--- ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL.pm 2015-01-24 09:43:27.919671085 +0000
+++ ./lib/Net/Async/CassandraCQL.pm 2015-01-25 13:29:00.190691508 +0000
@@ -823,7 +828,8 @@
$self->query( @_ )->then( sub {
my ( $type, $result ) = @_;
- $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
+ defined $type or return Future->new->fail( "Expected type from query" );
+ $type eq "rows" or return Future->new->fail( "Expected 'rows' result" );
Future->new->done( $result );
});
}