Theattached patch might be a good initial implementation
--
Paul Evans
=== added file 'lib/IO/Async/FileStream.pm'
--- lib/IO/Async/FileStream.pm 1970-01-01 00:00:00 +0000
+++ lib/IO/Async/FileStream.pm 2011-03-10 17:23:15 +0000
@@ -0,0 +1,212 @@
+# You may distribute under the terms of either the GNU General Public License
+# or the Artistic License (the same terms as Perl itself)
+#
+# (C) Paul Evans, 2011 -- leonerd@leonerd.org.uk
+
+package IO::Async::FileStream;
+
+use strict;
+use warnings;
+
+our $VERSION = '0.39';
+
+use base qw( IO::Async::Stream );
+
+use IO::Async::Timer::Periodic;
+
+use Carp;
+use Fcntl qw( SEEK_SET SEEK_CUR );
+
+=head1 NAME
+
+C<IO::Async::FileStream> - read the tail of a file
+
+=head1 SYNOPSIS
+
+ use IO::Async::FileStream;
+
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new();
+
+ open my $logh, "<", "var/logs/daemon.log" or die "Cannot open logfile - $!";
+
+ my $filestream = IO::Async::FileStream->new(
+ read_handle => $logh,
+
+ on_read => sub {
+ my ( $self, $buffref ) = @_;
+
+ if( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line $1";
+
+ return 1;
+ }
+ },
+ );
+
+ $loop->add( $filestream );
+
+ $loop->loop_forever;
+
+=head1 DESCRIPTION
+
+This subclass of L<IO::Async::Stream> allows reading the end of a regular file
+which is being appended to by some other process. It invokes the C<on_read>
+event when more data has been added to the file.
+
+This class provides an API identical to C<IO::Async::Stream> when given a
+C<read_handle>; it should be treated similarly. In particular, it can be given
+an C<on_read> handler, or subclassed to provide an C<on_read> method, or even
+used as the C<transport> for an C<IO::Async::Protocol::Stream> object.
+
+It will not support writing.
+
+=cut
+
+=head1 EVENTS
+
+The following events are invoked, either using subclass methods or CODE
+references in parameters.
+
+Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the
+events supported by C<Stream> relating to the read handle are supported here.
+This is not a full list; see also the documentation relating to
+C<IO::Async::Stream>.
+
+=head2 $ret = on_read \$buffer, $eof
+
+Invoked when more data is available in the internal receiving buffer.
+
+Note that C<$eof> only indicates that all the data currently available in the
+file has now been read; in contrast to a regular C<IO::Async::Stream>, this
+object will not stop watching after this condition. Instead, it will continue
+watching the file for updates.
+
+=head2 on_truncated
+
+Invoked when the file size shrinks. If this happens, it is presumed that the
+file content has been replaced. Reading will then commence from the start of
+the file.
+
+=cut
+
+sub _init
+{
+ my $self = shift;
+ my ( $params ) = @_;
+
+ $self->SUPER::_init( $params );
+
+ $self->add_child( $self->{timer} = IO::Async::Timer::Periodic->new(
+ interval => 2,
+ on_tick => $self->_capture_weakself( 'on_tick' ),
+ ) );
+
+ $params->{close_on_read_eof} = 0;
+}
+
+=head1 PARAMETERS
+
+The following named parameters may be passed to C<new> or C<configure>, in
+addition to the parameters relating to reading supported by
+C<IO::Async::Stream>.
+
+=over 8
+
+=item interval => NUM
+
+Optional. The interval in seconds to poll the filehandle using C<stat(2)>
+looking for size changes. A default of 2 seconds will be applied if not
+defined.
+
+=back
+
+=cut
+
+sub configure
+{
+ my $self = shift;
+ my %params = @_;
+
+ foreach (qw( on_truncated )) {
+ $self->{$_} = delete $params{$_} if exists $params{$_};
+ }
+
+ foreach (qw( interval )) {
+ $self->{timer}->configure( $_ => delete $params{$_} ) if exists $params{$_};
+ }
+
+ croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle};
+
+ $self->SUPER::configure( %params );
+
+ if( defined $self->read_handle ) {
+ $self->{timer}->start if !$self->{timer}->is_running;
+ $self->on_file_acquire;
+ }
+}
+
+# Don't register myself with the Loop for IO purposes; Timer will handle this
+sub _add_to_loop { }
+sub _remove_from_loop { }
+
+sub on_file_acquire
+{
+ my $self = shift;
+
+ $self->{last_pos} = 0;
+ $self->{last_size} = 0;
+
+ $self->read_more;
+}
+
+sub on_tick
+{
+ my $self = shift;
+
+ my $size = (stat $self->read_handle)[7];
+
+ return if $size == $self->{last_size};
+
+ if( $size < $self->{last_size} ) {
+ $self->maybe_invoke_event( on_truncated => );
+ $self->{last_pos} = 0;
+ }
+
+ $self->{last_size} = $size;
+
+ $self->{timer}->stop;
+
+ $self->read_more;
+}
+
+sub read_more
+{
+ my $self = shift;
+
+ sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET );
+
+ $self->on_read_ready;
+
+ $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell
+
+ if( $self->{last_pos} < $self->{last_size} ) {
+ $self->get_loop->later( sub { $self->read_more } );
+ }
+ else {
+ $self->{timer}->start;
+ }
+}
+
+sub write
+{
+ carp "Cannot ->write from a ".ref($_[0]);
+}
+
+=head1 AUTHOR
+
+Paul Evans <leonerd@leonerd.org.uk>
+
+=cut
+
+0x55AA;
=== modified file 'lib/IO/Async/Stream.pm'
--- lib/IO/Async/Stream.pm 2011-02-18 00:04:10 +0000
+++ lib/IO/Async/Stream.pm 2011-03-10 16:31:20 +0000
@@ -176,6 +176,8 @@
$self->{read_len} = $READLEN;
$self->{write_len} = $WRITELEN;
+
+ $self->{close_on_read_eof} = 1;
}
=head1 PARAMETERS
@@ -244,6 +246,13 @@
C<autoflush> is enabled, this option only affects deferred writing if the
initial attempt failed due to buffer space.
+=item close_on_read_eof => BOOL
+
+Optional. Usually true, but if set to a false value then the stream will not
+be C<close>d when an EOF condition occurs on read. This is normally not useful
+as at that point the underlying stream filehandle is no longer useable, but it
+may be useful for reading regular files, or interacting with TTY devices.
+
=back
If a read handle is given, it is required that either an C<on_read> callback
@@ -269,7 +278,8 @@
my %params = @_;
for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
- on_write_error autoflush read_len read_all write_len write_all )) {
+ on_write_error autoflush read_len read_all write_len write_all
+ close_on_read_eof )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
@@ -572,7 +582,7 @@
if( $eof ) {
$self->maybe_invoke_event( on_read_eof => );
- $self->close_now;
+ $self->close_now if $self->{close_on_read_eof};
return;
}
=== modified file 't/21stream-1read.t'
--- t/21stream-1read.t 2011-01-30 16:53:07 +0000
+++ t/21stream-1read.t 2011-03-10 16:33:29 +0000
@@ -4,7 +4,7 @@
use IO::Async::Test;
-use Test::More tests => 48;
+use Test::More tests => 54;
use Test::Fatal;
use Test::Refcount;
@@ -283,6 +283,41 @@
is( $partial, "Incomplete", 'EOF stream retains partial input' );
ok( !defined $stream->get_loop, 'EOF stream no longer member of Loop' );
+ ok( !defined $stream->read_handle, 'Stream no longer has a read_handle' );
+}
+
+# Disabled close_on_read_eof
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my $eof = 0;
+ my $partial;
+
+ my $stream = IO::Async::Stream->new( read_handle => $rd,
+ on_read => sub {
+ my ( undef, $buffref, $eof ) = @_;
+ $partial = $$buffref if $eof;
+ return 0;
+ },
+ on_read_eof => sub { $eof++ },
+ close_on_read_eof => 0,
+ );
+
+ $loop->add( $stream );
+
+ $wr->syswrite( "Incomplete" );
+
+ $wr->close;
+
+ is( $eof, 0, 'EOF indication before wait' );
+
+ wait_for { $eof };
+
+ is( $eof, 1, 'EOF indication after wait' );
+ is( $partial, "Incomplete", 'EOF stream retains partial input' );
+
+ ok( defined $stream->get_loop, 'EOF stream still member of Loop' );
+ ok( defined $stream->read_handle, 'Stream still has a read_handle' );
}
# Close
=== added file 't/27filestream.t'
--- t/27filestream.t 1970-01-01 00:00:00 +0000
+++ t/27filestream.t 2011-03-10 17:23:10 +0000
@@ -0,0 +1,157 @@
+#!/usr/bin/perl -w
+
+use strict;
+
+use IO::Async::Test;
+
+use Test::More tests => 14;
+use Test::Fatal;
+use Test::Refcount;
+
+use Fcntl qw( SEEK_SET );
+use File::Temp qw( tempfile );
+
+use IO::Async::Loop;
+
+use IO::Async::FileStream;
+
+use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;
+
+my $loop = IO::Async::Loop->new();
+
+testing_loop( $loop );
+
+sub mkhandles
+{
+ my ( $rd, $filename ) = tempfile( "tmpfile.XXXXXX", UNLINK => 1 );
+ open my $wr, ">", $filename or die "Cannot reopen file for writing - $!";
+
+ $wr->autoflush( 1 );
+
+ return ( $rd, $wr );
+}
+
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my @lines;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ return 0 unless( $$buffref =~ s/^(.*\n)// );
+
+ push @lines, $1;
+ return 1;
+ },
+ );
+
+ ok( defined $filestream, '$filestream defined' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );
+
+ $wr->syswrite( "message\n" );
+
+ is_deeply( \@lines, [], '@lines before wait' );
+
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "message\n" ], '@lines after wait' );
+
+ $loop->remove( $filestream );
+}
+
+# Truncation
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my @lines;
+ my $truncated;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ return 0 unless( $$buffref =~ s/^(.*\n)// );
+
+ push @lines, $1;
+ return 1;
+ },
+ on_truncated => sub { $truncated++ },
+ );
+
+ $loop->add( $filestream );
+
+ $wr->syswrite( "Some original lines\nin the file\n" );
+
+ wait_for { scalar @lines };
+
+ $wr->truncate( 0 );
+ sysseek( $wr, 0, SEEK_SET );
+ $wr->syswrite( "And another\n" );
+
+ wait_for { @lines == 3 };
+
+ is( $truncated, 1, 'File content truncation detected' );
+ is_deeply( \@lines,
+ [ "Some original lines\n", "in the file\n", "And another\n" ],
+ 'All three lines read' );
+
+ $loop->remove( $filestream );
+}
+
+# Subclass
+my @sub_lines;
+
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my $filestream = TestStream->new(
+ read_handle => $rd,
+ );
+
+ ok( defined $filestream, 'subclass $filestream defined' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'subclass $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, 'subclass $filestream has refcount 2 after adding to Loop' );
+
+ $wr->syswrite( "message\n" );
+
+ is_deeply( \@sub_lines, [], '@sub_lines before wait' );
+
+ wait_for { scalar @sub_lines };
+
+ is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' );
+
+ $loop->remove( $filestream );
+}
+
+package TestStream;
+use base qw( IO::Async::FileStream );
+
+sub on_read
+{
+ my $self = shift;
+ my ( $buffref ) = @_;
+
+ return 0 unless $$buffref =~ s/^(.*\n)//;
+
+ push @sub_lines, $1;
+ return 1;
+}