Initial patch attached.
I think I might neaten this up further before next release, perhaps by
creating a new IaFileChange notifier, or something.
But this at least should work and get you started.
--
Paul Evans
=== modified file 'lib/IO/Async/FileStream.pm'
--- lib/IO/Async/FileStream.pm 2012-05-20 21:00:26 +0000
+++ lib/IO/Async/FileStream.pm 2012-05-23 17:34:24 +0000
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2011 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2011-2012 -- leonerd@leonerd.org.uk
package IO::Async::FileStream;
@@ -131,6 +131,13 @@
=over 8
+=item filename => STRING
+
+Optional. If supplied, watches the named file rather than the filehandle given
+in C<read_handle>. The file will be opened by the constructor, and then
+watched for renames. If the file is renamed, the new filename is opened and
+tracked similarly after closing the previous file.
+
=item interval => NUM
Optional. The interval in seconds to poll the filehandle using C<stat(2)>
@@ -154,6 +161,12 @@
$self->{timer}->configure( $_ => delete $params{$_} ) if exists $params{$_};
}
+ if( exists $params{filename} ) {
+ my $filename = delete $params{filename};
+ $params{read_handle} = $self->_reopen_file( $filename );
+ $self->{filename} = $filename;
+ }
+
croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle};
$self->SUPER::configure( %params );
@@ -189,11 +202,25 @@
croak "Cannot _watch_write in " . ref($self) if $want;
}
+sub _reopen_file
+{
+ my $self = shift;
+ my ( $path ) = @_;
+
+ open my $fh, "<", $path or croak "Cannot open $path for reading - $!";
+
+ undef $self->{last_size};
+ undef $self->{last_pos};
+
+ return $fh;
+}
+
sub _do_initial
{
my $self = shift;
- my $size = (stat $self->read_handle)[7];
+ # During reopen for rename, start from the beginning
+ my $size = $self->{renamed} ? 0 : (stat $self->read_handle)[7];
$self->{last_size} = $size;
@@ -205,7 +232,22 @@
{
my $self = shift;
- my $size = (stat $self->read_handle)[7];
+ my @fstats = stat $self->read_handle;
+
+ if( my $filename = $self->{filename} ) {
+ # Detect rename
+ my @namestats = stat $filename;
+
+ # [0] = dev, [1] = ino
+ if( !@namestats or $namestats[0] != $fstats[0] or $namestats[1] != $fstats[1] ) {
+ $self->{renamed} = 1;
+ $self->debug_printf( "read tail of old file" );
+ $self->read_more;
+ return;
+ }
+ }
+
+ my $size = $fstats[7];
if( $size < $self->{last_size} ) {
$self->maybe_invoke_event( on_truncated => );
@@ -234,6 +276,14 @@
if( $self->{last_pos} < $self->{last_size} ) {
$self->loop->later( sub { $self->read_more } );
}
+ elsif( $self->{renamed} ) {
+ $self->debug_printf( "reopening for rename" );
+ $self->read_handle->close;
+
+ my $fh = $self->_reopen_file( $self->{filename} );
+ $self->configure( read_handle => $fh );
+ undef $self->{renamed};
+ }
}
sub write
@@ -364,11 +414,6 @@
=item *
-Support opening a file by name instead of taking an already-open filehandle.
-With that comes the option to track it for renames too.
-
-=item *
-
Move the actual file update watching code into C<IO::Async::Loop>, possibly as
a new watch/unwatch method pair C<watch_file>.
=== modified file 't/27filestream.t'
--- t/27filestream.t 2011-07-26 00:19:12 +0000
+++ t/27filestream.t 2012-05-23 17:34:24 +0000
@@ -4,7 +4,7 @@
use IO::Async::Test;
-use Test::More tests => 20;
+use Test::More tests => 27;
use Test::Fatal;
use Test::Refcount;
@@ -28,7 +28,7 @@
$wr->autoflush( 1 );
- return ( $rd, $wr );
+ return ( $rd, $wr, $filename );
}
{
@@ -221,6 +221,55 @@
$loop->remove( $filestream );
}
+# Follow by name
+{
+ my ( undef, $wr, $filename ) = mkhandles;
+
+ my @lines;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ filename => $filename,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ push @lines, $1 while $$buffref =~ s/^(.*\n)//;
+ return 0;
+ },
+ );
+
+ ok( defined $filestream, '$filestream defined for filenaem' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );
+
+ $wr->syswrite( "message\n" );
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "message\n" ], '@lines after wait' );
+ shift @lines;
+
+ $wr->syswrite( "last line of old file\n" );
+ close $wr;
+ rename( $filename, "$filename.old" ) or die "Cannot rename $filename - $!";
+ END { -f $filename and unlink $filename }
+ END { -f "$filename.old" and unlink "$filename.old" }
+ open $wr, ">", $filename or die "Cannot reopen $filename for writing - $!";
+ $wr->syswrite( "first line of new file\n" );
+
+ wait_for { scalar @lines };
+ is_deeply( $lines[0], "last line of old file\n", '@lines sees last line of old file' );
+ wait_for { scalar @lines >= 2 };
+ is_deeply( $lines[1], "first line of new file\n", '@lines sees first line of new file' );
+
+ $loop->remove( $filestream );
+}
+
# Subclass
my @sub_lines;