Subject: | Fix Loop kept alive until last query in query cache times out |
Paul,
This issue is mostly revealed by POE, but it's also a conceptual problem no matter what.
Queries are kept alive for 60 seconds, but until that future is claimed or cancelled, the references to the loop and self
inside it will keep everything alive in a big circle.
POE suffers more from this, because any timers in a session keep the session alive.
This patch implements a pacemaker pattern to keep checking the query cache is alive.
I'm starting to get a strong desire to have a full mock Cassandra server for this stuff, but that's one hell of a lot of work.
HTH. Grab me on IRC with any questions.
G
Subject: | Net-Async-CassandraCQL-0.11-query_cache.patch |
--- /home/staff/gbjk/.cpanm/work/1422274813.12622/Net-Async-CassandraCQL-0.11_1/blib/lib/Net/Async/CassandraCQL.pm 2015-01-26 12:20:38.932434870 +0000
+++ local/lib/perl5/Net/Async/CassandraCQL.pm 2015-01-28 08:30:40.930295267 +0000
@@ -15,9 +15,10 @@
use Carp;
+use Devel::Refcount qw/ refcount /;
use Future::Utils qw( fmap_void try_repeat_until_success );
use List::Util qw( shuffle );
-use Scalar::Util qw( weaken );
+use Scalar::Util qw( weaken isweak );
use Socket qw( inet_ntop getnameinfo AF_INET AF_INET6 NI_NUMERICHOST NIx_NOSERV );
use Protocol::CassandraCQL qw( CONSISTENCY_ONE );
@@ -29,6 +30,11 @@
# Time after which down nodes will be retried
use constant NODE_RETRY_TIME => 60;
+# How long after a query is last used to keep it alive on a pacemaker
+use constant QUERY_TTL => 60;
+# How often the pacemaker checks the query isn't the only thing keeping the whole stack alive
+use constant QUERY_PACEMAKER_INTERVAL => 0.5;
+
=head1 NAME
C<Net::Async::CassandraCQL> - use Cassandra databases with L<IO::Async> using CQL
@@ -173,7 +179,7 @@
shift->_on_status_change( @_ );
});
- $self->{queries_by_cql} = {}; # {$cql} => [$query, $expire_timer_f]
+ $self->{queries_by_cql} = {}; # {$cql} => {query => $query, pacemaker => $expire_timer_f, ttl => Int]
# Can be in two states:
# $query is_weak; timer undef => normal user use
# $query non-weak; timer exists => due to expire soon
@@ -499,7 +505,7 @@
return $conn_f unless keys %$queries_by_cql;
( fmap_void {
- my $query = shift->[0] or return Future->new->done;
+ my $query = shift->{query} or return Future->new->done;
$conn->prepare( $query->cql, $self );
} foreach => [ values %$queries_by_cql ] )
->then( sub { $conn_f } );
@@ -855,11 +861,11 @@
my $queries_by_cql = $self->{queries_by_cql};
if( my $q = $queries_by_cql->{$cql} ) {
- my $query = $q->[0];
- if( $q->[1] ) {
- $q->[1]->cancel;
- undef $q->[1];
- weaken( $q->[0] );
+ my $query = $q->{query};
+ if( $q->{pacemaker} ) {
+ $q->{pacemaker}->cancel;
+ undef $q->{pacemaker};
+ weaken( $q->{query} );
}
return Future->new->done( $query );
}
@@ -877,13 +883,46 @@
$self->debug_printf( "PREPARE => [%s]", unpack "H*", $query->id );
- my $q = $queries_by_cql->{$cql} = [ $query, undef ];
- weaken( $q->[0] );
+ my $q = $queries_by_cql->{$cql} = { query => $query };
+ weaken( $q->{query} );
Future->new->done( $query );
});
}
+# Check if a query cache is the only thing keeping a query alive
+sub _check_query_pacemaker {
+ my ($self, $cql) = @_;
+
+ weaken $self;
+
+ my $q = $self->{queries_by_cql}{$cql};
+
+ my $query = $q->{query};
+
+ $q->{ttl} -= QUERY_PACEMAKER_INTERVAL;
+
+ # 1) Without a loop, there's nothing we can do anyway
+ # 2) Otherwise we've kept this query alive long enough
+ # 3) Otherwise Cassandra and loop is being artificialy kept alive by us.
+ # ( 2 references because Loop has a reference to cassandra in notifiers, and this query has one )
+ if ($q->{ttl} <= 0 || refcount($self) == 2 || !$self->loop){
+
+ # Remove the {cassandra} element from the query so it doesn't
+ # re-register itself for expiry when it is DESTROYed again
+ undef $query->{cassandra};
+
+ delete $self->{queries_by_cql}{$cql};
+
+ return;
+ }
+ else {
+ $q->{pacemaker} = $self->loop->delay_future( after => QUERY_PACEMAKER_INTERVAL )->on_done(
+ sub { $self->_check_query_pacemaker( $cql ) }
+ );
+ }
+}
+
sub _expire_query
{
my $self = shift;
@@ -892,15 +931,14 @@
my $queries_by_cql = $self->{queries_by_cql};
my $q = $queries_by_cql->{$cql} or return;
- my $query = $q->[0]; undef $q->[0]; $q->[0] = $query; # unweaken
+ # Unweaken the query itself, so it'll survive destruction
+ my $query = $q->{query};
+ undef $q->{query};
+ $q->{query} = $query;
+ $q->{ttl} = QUERY_TTL;
- $q->[1] = $self->loop->delay_future( after => 60 )
- ->on_done( sub {
- # Remove the {cassandra} element from the query so it doesn't
- # re-register itself for expiry when it is DESTROYed again
- undef $q->[0]{cassandra};
- delete $queries_by_cql->{$cql};
- });
+ $q->{pacemaker} //= $self->loop->delay_future( after => QUERY_PACEMAKER_INTERVAL )
+ ->on_done( sub { $self->_check_query_pacemaker( $cql ) } );
}
=head2 $cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result )