Skip Menu |

This queue is for tickets about the Net-Async-CassandraCQL CPAN distribution.

Report information
The Basics
Id: 101728
Status: resolved
Priority: 0/
Queue: Net-Async-CassandraCQL

People
Owner: Nobody in particular
Requestors: gbjk [...] thermeon.com
Cc:
AdminCc:

Bug Information
Severity: Critical
Broken in: (no value)
Fixed in: (no value)



Subject: Fix pending requests getting responses from other queries
Paul, Final issue in my series of "patches without tests" ... promise! This issue is very critical, because it causes queries to get the wrong response. With reference to on_read in Connection.pm, Consider: $cassandra->query("INSERT...")->then(sub { $cassandra->query_rows("SELECT...") })->then(...) If this happens when the insert goes into pending queue then because the $streamid is set to undef, *before* $f->done is called. The future then runs a new query_rows, which populates streams->[$streamid] with that SELECT request. The pending condition is then hit, which writes over the $streamid again. Now when we next read from cassandra, we tell the SELECT query futures about the INSERT query response. Fortunately this was caught by the fact that query_rows expects $type, and INSERT returns void. However that failure only showed up in an undef comparison check, because the fail future wasn't actually returned. I've added another explicit check there, just to make sure it's clear which situation we're in. So this patch fixes that, as well as noting that the MAX_STREAMS is due to increase in the near future with native protocol version 3, where it becomes 2 bytes and 32k concurrency. I hope it's self-evident from the patch and the comments. Again, I'd really like to have a test to cover this, but I was struggling with the test suite to find a way to mock it. Since this issue is so important, I thought I'd get it over first. Regards Gareth
Subject: Net-Async-CassandraCQL-0.11-pending.patch
diff -Naurb ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL/Connection.pm ./lib/Net/Async/CassandraCQL/Connection.pm --- ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL/Connection.pm 2015-01-24 09:43:27.919671085 +0000 +++ ./lib/Net/Async/CassandraCQL/Connection.pm 2015-01-25 13:48:03.386798084 +0000 @@ -21,6 +22,10 @@ use constant HAVE_SNAPPY => eval { require Compress::Snappy }; use constant HAVE_LZ4 => eval { require Compress::LZ4 }; +# Max streams 127, because of streamid packed in the native protocol as C. +# Version 3 of the protocol changes that, and this can be raised +use constant MAX_STREAMS => 127; + use Protocol::CassandraCQL qw( :opcodes :results :consistencies FLAG_COMPRESS build_frame parse_frame @@ -218,7 +223,6 @@ my $frame = Protocol::CassandraCQL::Frame->new( $body ); if( my $f = $self->{streams}[$streamid] ) { - undef $self->{streams}[$streamid]; if( $opcode == OPCODE_ERROR ) { my ( $err, $message ) = parse_error_frame( $version, $frame ); @@ -231,6 +235,10 @@ $f->done( $opcode, $frame, $version ); } + # Undefined after running $f->done, so that $f doesn't get to jump the queue ahead of pending requests + # NB: Moving this above $f->done would mean protecting pending against assuming this streamid is still free + undef $self->{streams}[$streamid]; + if( my $next = shift @{ $self->{pending} } ) { my ( $opcode, $frame, $f ) = @$next; $self->_send( $opcode, $streamid, $frame, $f ); @@ -325,7 +333,7 @@ } if( !defined $id ) { - if( $#$streams == 127 ) { + if( $#$streams == MAX_STREAMS ) { push @{ $self->{pending} }, [ $opcode, $frame, $f ]; return $f; } diff -Naurb ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL.pm ./lib/Net/Async/CassandraCQL.pm --- ../Net-Async-CassandraCQL-0.11.orig/lib/Net/Async/CassandraCQL.pm 2015-01-24 09:43:27.919671085 +0000 +++ ./lib/Net/Async/CassandraCQL.pm 2015-01-25 13:29:00.190691508 +0000 @@ -823,7 +828,8 @@ $self->query( @_ )->then( sub { my ( $type, $result ) = @_; - $type eq "rows" or Future->new->fail( "Expected 'rows' result" ); + defined $type or return Future->new->fail( "Expected type from query" ); + $type eq "rows" or return Future->new->fail( "Expected 'rows' result" ); Future->new->done( $result ); }); }
Applied and released in 0.12