Skip Menu |

This queue is for tickets about the IO-Async CPAN distribution.

Report information
The Basics
Id: 66520
Status: resolved
Priority: 0/
Queue: IO-Async

People
Owner: Nobody in particular
Requestors: leonerd-cpan [...] leonerd.org.uk
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: 0.40



Subject: File Tailing watcher
Some ability to tail the end of a file a.la. File::Tail. Suggest: Make it look like a readonly ::Stream; aka: open my $fh, "/var/log/daemon.log" or die "Cannot - $!; $loop->add( IO::Async::FileStream->new( handle => $fh, on_read => sub { ... } ) ); Should be easy to implement initially by Timer::Periodic'ed stat() calls, can later use $loop->watch_file API to possibly get better performance (e.g. inotify on Linux) -- Paul Evans
Theattached patch might be a good initial implementation -- Paul Evans
Subject: rt66520.patch
=== added file 'lib/IO/Async/FileStream.pm' --- lib/IO/Async/FileStream.pm 1970-01-01 00:00:00 +0000 +++ lib/IO/Async/FileStream.pm 2011-03-10 17:23:15 +0000 @@ -0,0 +1,212 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2011 -- leonerd@leonerd.org.uk + +package IO::Async::FileStream; + +use strict; +use warnings; + +our $VERSION = '0.39'; + +use base qw( IO::Async::Stream ); + +use IO::Async::Timer::Periodic; + +use Carp; +use Fcntl qw( SEEK_SET SEEK_CUR ); + +=head1 NAME + +C<IO::Async::FileStream> - read the tail of a file + +=head1 SYNOPSIS + + use IO::Async::FileStream; + + use IO::Async::Loop; + my $loop = IO::Async::Loop->new(); + + open my $logh, "<", "var/logs/daemon.log" or die "Cannot open logfile - $!"; + + my $filestream = IO::Async::FileStream->new( + read_handle => $logh, + + on_read => sub { + my ( $self, $buffref ) = @_; + + if( $$buffref =~ s/^(.*\n)// ) { + print "Received a line $1"; + + return 1; + } + }, + ); + + $loop->add( $filestream ); + + $loop->loop_forever; + +=head1 DESCRIPTION + +This subclass of L<IO::Async::Stream> allows reading the end of a regular file +which is being appended to by some other process. It invokes the C<on_read> +event when more data has been added to the file. + +This class provides an API identical to C<IO::Async::Stream> when given a +C<read_handle>; it should be treated similarly. In particular, it can be given +an C<on_read> handler, or subclassed to provide an C<on_read> method, or even +used as the C<transport> for an C<IO::Async::Protocol::Stream> object. + +It will not support writing. + +=cut + +=head1 EVENTS + +The following events are invoked, either using subclass methods or CODE +references in parameters. + +Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the +events supported by C<Stream> relating to the read handle are supported here. +This is not a full list; see also the documentation relating to +C<IO::Async::Stream>. + +=head2 $ret = on_read \$buffer, $eof + +Invoked when more data is available in the internal receiving buffer. + +Note that C<$eof> only indicates that all the data currently available in the +file has now been read; in contrast to a regular C<IO::Async::Stream>, this +object will not stop watching after this condition. Instead, it will continue +watching the file for updates. + +=head2 on_truncated + +Invoked when the file size shrinks. If this happens, it is presumed that the +file content has been replaced. Reading will then commence from the start of +the file. + +=cut + +sub _init +{ + my $self = shift; + my ( $params ) = @_; + + $self->SUPER::_init( $params ); + + $self->add_child( $self->{timer} = IO::Async::Timer::Periodic->new( + interval => 2, + on_tick => $self->_capture_weakself( 'on_tick' ), + ) ); + + $params->{close_on_read_eof} = 0; +} + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>, in +addition to the parameters relating to reading supported by +C<IO::Async::Stream>. + +=over 8 + +=item interval => NUM + +Optional. The interval in seconds to poll the filehandle using C<stat(2)> +looking for size changes. A default of 2 seconds will be applied if not +defined. + +=back + +=cut + +sub configure +{ + my $self = shift; + my %params = @_; + + foreach (qw( on_truncated )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + foreach (qw( interval )) { + $self->{timer}->configure( $_ => delete $params{$_} ) if exists $params{$_}; + } + + croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle}; + + $self->SUPER::configure( %params ); + + if( defined $self->read_handle ) { + $self->{timer}->start if !$self->{timer}->is_running; + $self->on_file_acquire; + } +} + +# Don't register myself with the Loop for IO purposes; Timer will handle this +sub _add_to_loop { } +sub _remove_from_loop { } + +sub on_file_acquire +{ + my $self = shift; + + $self->{last_pos} = 0; + $self->{last_size} = 0; + + $self->read_more; +} + +sub on_tick +{ + my $self = shift; + + my $size = (stat $self->read_handle)[7]; + + return if $size == $self->{last_size}; + + if( $size < $self->{last_size} ) { + $self->maybe_invoke_event( on_truncated => ); + $self->{last_pos} = 0; + } + + $self->{last_size} = $size; + + $self->{timer}->stop; + + $self->read_more; +} + +sub read_more +{ + my $self = shift; + + sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ); + + $self->on_read_ready; + + $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell + + if( $self->{last_pos} < $self->{last_size} ) { + $self->get_loop->later( sub { $self->read_more } ); + } + else { + $self->{timer}->start; + } +} + +sub write +{ + carp "Cannot ->write from a ".ref($_[0]); +} + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; === modified file 'lib/IO/Async/Stream.pm' --- lib/IO/Async/Stream.pm 2011-02-18 00:04:10 +0000 +++ lib/IO/Async/Stream.pm 2011-03-10 16:31:20 +0000 @@ -176,6 +176,8 @@ $self->{read_len} = $READLEN; $self->{write_len} = $WRITELEN; + + $self->{close_on_read_eof} = 1; } =head1 PARAMETERS @@ -244,6 +246,13 @@ C<autoflush> is enabled, this option only affects deferred writing if the initial attempt failed due to buffer space. +=item close_on_read_eof => BOOL + +Optional. Usually true, but if set to a false value then the stream will not +be C<close>d when an EOF condition occurs on read. This is normally not useful +as at that point the underlying stream filehandle is no longer useable, but it +may be useful for reading regular files, or interacting with TTY devices. + =back If a read handle is given, it is required that either an C<on_read> callback @@ -269,7 +278,8 @@ my %params = @_; for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error - on_write_error autoflush read_len read_all write_len write_all )) { + on_write_error autoflush read_len read_all write_len write_all + close_on_read_eof )) { $self->{$_} = delete $params{$_} if exists $params{$_}; } @@ -572,7 +582,7 @@ if( $eof ) { $self->maybe_invoke_event( on_read_eof => ); - $self->close_now; + $self->close_now if $self->{close_on_read_eof}; return; } === modified file 't/21stream-1read.t' --- t/21stream-1read.t 2011-01-30 16:53:07 +0000 +++ t/21stream-1read.t 2011-03-10 16:33:29 +0000 @@ -4,7 +4,7 @@ use IO::Async::Test; -use Test::More tests => 48; +use Test::More tests => 54; use Test::Fatal; use Test::Refcount; @@ -283,6 +283,41 @@ is( $partial, "Incomplete", 'EOF stream retains partial input' ); ok( !defined $stream->get_loop, 'EOF stream no longer member of Loop' ); + ok( !defined $stream->read_handle, 'Stream no longer has a read_handle' ); +} + +# Disabled close_on_read_eof +{ + my ( $rd, $wr ) = mkhandles; + + my $eof = 0; + my $partial; + + my $stream = IO::Async::Stream->new( read_handle => $rd, + on_read => sub { + my ( undef, $buffref, $eof ) = @_; + $partial = $$buffref if $eof; + return 0; + }, + on_read_eof => sub { $eof++ }, + close_on_read_eof => 0, + ); + + $loop->add( $stream ); + + $wr->syswrite( "Incomplete" ); + + $wr->close; + + is( $eof, 0, 'EOF indication before wait' ); + + wait_for { $eof }; + + is( $eof, 1, 'EOF indication after wait' ); + is( $partial, "Incomplete", 'EOF stream retains partial input' ); + + ok( defined $stream->get_loop, 'EOF stream still member of Loop' ); + ok( defined $stream->read_handle, 'Stream still has a read_handle' ); } # Close === added file 't/27filestream.t' --- t/27filestream.t 1970-01-01 00:00:00 +0000 +++ t/27filestream.t 2011-03-10 17:23:10 +0000 @@ -0,0 +1,157 @@ +#!/usr/bin/perl -w + +use strict; + +use IO::Async::Test; + +use Test::More tests => 14; +use Test::Fatal; +use Test::Refcount; + +use Fcntl qw( SEEK_SET ); +use File::Temp qw( tempfile ); + +use IO::Async::Loop; + +use IO::Async::FileStream; + +use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1; + +my $loop = IO::Async::Loop->new(); + +testing_loop( $loop ); + +sub mkhandles +{ + my ( $rd, $filename ) = tempfile( "tmpfile.XXXXXX", UNLINK => 1 ); + open my $wr, ">", $filename or die "Cannot reopen file for writing - $!"; + + $wr->autoflush( 1 ); + + return ( $rd, $wr ); +} + +{ + my ( $rd, $wr ) = mkhandles; + + my @lines; + + my $filestream = IO::Async::FileStream->new( + interval => 1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless( $$buffref =~ s/^(.*\n)// ); + + push @lines, $1; + return 1; + }, + ); + + ok( defined $filestream, '$filestream defined' ); + isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' ); + + is_oneref( $filestream, 'reading $filestream has refcount 1 initially' ); + + $loop->add( $filestream ); + + is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' ); + + $wr->syswrite( "message\n" ); + + is_deeply( \@lines, [], '@lines before wait' ); + + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "message\n" ], '@lines after wait' ); + + $loop->remove( $filestream ); +} + +# Truncation +{ + my ( $rd, $wr ) = mkhandles; + + my @lines; + my $truncated; + + my $filestream = IO::Async::FileStream->new( + interval => 1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless( $$buffref =~ s/^(.*\n)// ); + + push @lines, $1; + return 1; + }, + on_truncated => sub { $truncated++ }, + ); + + $loop->add( $filestream ); + + $wr->syswrite( "Some original lines\nin the file\n" ); + + wait_for { scalar @lines }; + + $wr->truncate( 0 ); + sysseek( $wr, 0, SEEK_SET ); + $wr->syswrite( "And another\n" ); + + wait_for { @lines == 3 }; + + is( $truncated, 1, 'File content truncation detected' ); + is_deeply( \@lines, + [ "Some original lines\n", "in the file\n", "And another\n" ], + 'All three lines read' ); + + $loop->remove( $filestream ); +} + +# Subclass +my @sub_lines; + +{ + my ( $rd, $wr ) = mkhandles; + + my $filestream = TestStream->new( + read_handle => $rd, + ); + + ok( defined $filestream, 'subclass $filestream defined' ); + isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' ); + + is_oneref( $filestream, 'subclass $filestream has refcount 1 initially' ); + + $loop->add( $filestream ); + + is_refcount( $filestream, 2, 'subclass $filestream has refcount 2 after adding to Loop' ); + + $wr->syswrite( "message\n" ); + + is_deeply( \@sub_lines, [], '@sub_lines before wait' ); + + wait_for { scalar @sub_lines }; + + is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' ); + + $loop->remove( $filestream ); +} + +package TestStream; +use base qw( IO::Async::FileStream ); + +sub on_read +{ + my $self = shift; + my ( $buffref ) = @_; + + return 0 unless $$buffref =~ s/^(.*\n)//; + + push @sub_lines, $1; + return 1; +}