Subject: | Stream module lacks basic required functionality to prevent socket closing |
As per http://guide.couchdb.org/draft/notifications.html
The "Continuous" feed can be used to monitor active changes from a
CouchDB database. Unfortunately, the URL being crafted for this purpose
within AnyEvent::CouchDB::Stream does not satisfy the necessary
requirements to ensure continuous connection.
References to this issue exist in the following locations:
https://issues.apache.org/jira/browse/COUCHDB-580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel#issue-tabs
https://github.com/CPAN-API/metacpan-web/issues/281
It's documented here:
http://wiki.apache.org/couchdb/HTTP%5Fdatabase%5FAPI#Changes
That the changes API has a default timeout of 60 seconds if no new data
is implemented.
Please note that the relevant code to resolve this issue has been
attached, as well as sample code which displays the noted issue after a
60 second timeframe.
[benbot@benbot ~]$ perl -v
This is perl 5, version 14, subversion 1 (v5.14.1) built for i686-linux
[benbot@benbot ~]$ uname -a
Linux benbot.houston.hostgator.com 2.6.40.4-5.fc15.i686 #1 SMP Tue Aug
30 14:54:41 UTC 2011 i686 i686 i386 GNU/Linux
Subject: | Stream.pm.diff |
--- a/Stream.pm 2011-10-11 12:01:37.208112734 -0500
+++ b/Stream.pm 2011-10-11 12:05:05.952926161 -0500
@@ -8,7 +8,7 @@
use JSON;
use Try::Tiny;
-our $VERSION = '0.01';
+our $VERSION = '0.02';
sub new {
my $class = shift;
@@ -19,6 +19,7 @@
my $filter = delete $args{filter};
my $since = delete $args{since} || 1;
my $on_change = delete $args{on_change};
+ my $heartbeat = delete $args{heartbeat} || 5000;
my $on_error = delete $args{on_error} || sub { die @_ };
my $on_eof = delete $args{on_eof} || sub { };
my $on_keepalive = delete $args{on_keepalive} || sub { };
@@ -27,7 +28,7 @@
my $uri = URI->new($server);
$uri->path( $db. '/_changes' );
- $uri->query_form( filter => $filter, feed => "continuous", since => $since );
+ $uri->query_form( filter => $filter, feed => "continuous", since => $since, heartbeat => $heartbeat );
my $self = bless {}, $class;
@@ -160,6 +161,10 @@
A code ref to execute on eof
+=item B<heartbeat>
+
+The interval in milliseconds between newlines sent from the server to ensure that an open connection is still being maintained
+
=back
=head1 AUTHOR
Subject: | couchdb-stream.pl |
#!perl
use AnyEvent;
use AnyEvent::CouchDB::Stream;
my ($changes_watcher,$changes_listener, $changes_callback);
my %listener_args = (
url => 'http://localhost:5984/',
database => 'jobs',
since => 0,
on_change => sub {
my $change = shift;
$changes_watcher->send and warn "Connection Closed" if $change->{last_seq};
$listener_args->{since} = $change->{seq} if $change->{seq} > $listener_args->{since};
},
on_error => sub {warn "Error due to closed pipe." and $changes_watcher->send;},
timeout => 1
);
sub loop {
$changes_listener = undef;
$changes_watcher = undef;
$changes_listener = AnyEvent::CouchDB::Stream->new(%listener_args);
$changes_watcher = AnyEvent::condvar;
$changes_callback = $changes_watcher->cb(sub{loop();});
}
loop();
$changes_watcher->recv;