Skip Menu |

This queue is for tickets about the POE-Component-Server-SimpleHTTP CPAN distribution.

Report information
The Basics
Id: 24234
Status: resolved
Worked: 30 min
Priority: 0/
Queue: POE-Component-Server-SimpleHTTP

People
Owner: Nobody in particular
Requestors: eriam [...] cpan.org
Cc:
AdminCc:

Bug Information
Severity: Wishlist
Broken in: (no value)
Fixed in: (no value)



Subject: Add streaming feature
Hello, I managed to get streaming with SimpleHTTP, and it seems to work, could you please include it in the main branch if the code suits you quality requirements. I attach the patches and whish that you apply them. Thank you Eriam
Subject: PoCo-Server-SimpleHTTP-Response.patch
Index: SimpleHTTP/Response.pm =================================================================== RCS file: c:\versions/Server/SimpleHTTP/Response.pm,v retrieving revision 1.1 diff -u -r1.1 Response.pm --- SimpleHTTP/Response.pm 4 Nov 2006 05:49:03 -0000 1.1 +++ SimpleHTTP/Response.pm 4 Nov 2006 07:14:27 -0000 @@ -54,6 +54,17 @@ return shift->{'CONNECTION'}; } +sub stream { + my $self = shift; + + $self->{'STREAM'} = shift; + +} +sub is_streaming { + my $self = shift; + + $self->{'IS_STREAMING'} = 1; +} # End of module 1;
Subject: PoCo-Server-SimpleHTTP.patch
Index: SimpleHTTP.pm =================================================================== RCS file: c:\versions/Server/SimpleHTTP.pm,v retrieving revision 1.1 diff -u -r1.1 SimpleHTTP.pm --- SimpleHTTP.pm 4 Nov 2006 05:49:03 -0000 1.1 +++ SimpleHTTP.pm 5 Nov 2006 13:36:36 -0000 @@ -15,6 +15,7 @@ use POE::Wheel::ReadWrite; use POE::Driver::SysRW; use POE::Filter::HTTPD; +use POE::Filter::Stream; # Other miscellaneous modules we need use Carp qw( croak ); @@ -184,10 +185,13 @@ POE::Session->create( # Our subroutines 'inline_states' => { - # Maintenance events + # Maintenance events '_start' => \&StartServer, '_stop' => \&FindRequestLeaks, '_child' => sub {}, + + # Register states + 'REGISTER' => \&Register, # HANDLER stuff 'GETHANDLERS' => \&GetHandlers, @@ -637,6 +641,7 @@ # Add this response to the wheel $_[HEAP]->{'REQUESTS'}->{ $id }->[2] = $response; + $_[HEAP]->{'REQUESTS'}->{ $id }->[3] = $request; # Find which handler will handle this one foreach my $handler ( @{ $_[HEAP]->{'HANDLERS'} } ) { @@ -668,9 +673,16 @@ if ( DEBUG ) { warn "Got Flush event for wheel ID ( $id )"; } - + + if ($_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 2 ) { + # Do the stream ! + if ( DEBUG ) { + warn "Streaming in progress ...!"; + } + return; + } # Check if we are shutting down - if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] ) { + elsif ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 1 ) { # Shutdown read/write on the wheel $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->shutdown_input(); $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->shutdown_output(); @@ -679,7 +691,8 @@ # Tracked down by Paul Visscher delete $_[HEAP]->{'REQUESTS'}->{ $id }->[0]; delete $_[HEAP]->{'REQUESTS'}->{ $id }; - } else { + } + else { # Ignore this, eh? if ( DEBUG ) { warn "Got Flush event for socket ( $id ) when we did not send anything!"; @@ -725,7 +738,7 @@ # Output to the client! sub Request_Output { # ARG0 = HTTP::Response object - my $response = $_[ ARG0 ]; + my ($kernel, $response) = @_[KERNEL, ARG0 ]; # Check if we got it if ( ! defined $response or ! UNIVERSAL::isa( $response, 'HTTP::Response' ) ) { @@ -740,6 +753,16 @@ # Get the wheel ID my $id = $response->_WHEEL; + if (defined $response->{'STREAM'}) { + # Keep track if we plan to stream ... + if ( $_[HEAP]->{'RESPONSES'}->{ $id } ) { + $response = $_[HEAP]->{'RESPONSES'}->{ $id }; + } + else { + $_[HEAP]->{'RESPONSES'}->{ $id } = $response; + } + } + # Check if the wheel exists ( sometimes it gets closed by the client, but the application doesn't know that... ) if ( ! exists $_[HEAP]->{'REQUESTS'}->{ $id } ) { # Debug stuff @@ -752,7 +775,7 @@ } # Check if we have already sent the response - if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] ) { + if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 1) { # Tried to send twice! die 'Tried to send a response to the same connection twice!'; } @@ -782,17 +805,42 @@ $response->header( 'Content-Type', 'text/html' ); } - # Send it out! - $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->put( $response ); - - # Mark this socket done - $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 1; - - # Debug stuff - if ( DEBUG ) { - warn "Completed with Wheel ID $id"; - } - + unless (defined $response->{STREAM}) { + + # Send it out! + $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->put( $response ); + + # Mark this socket done + $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 1; + + # Debug stuff + if ( DEBUG ) { + warn "Completed with Wheel ID $id"; + } + } + else { + + # Loops through current streams + foreach (keys %{ $_[HEAP]->{'RESPONSES'} }) { + + + # Sets the correct POE::Filter + unless (defined $response->{'IS_STREAMING'}) { + # Mark this socket done + $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 2; + + # + $_[HEAP]->{'REQUESTS'}->{ $_ }->[0]->set_output_filter(POE::Filter::Stream->new() ) ; + $response->is_streaming(); + } + + if ( DEBUG ) { + print "Sending stream via ".$response->{STREAM}." to $_ with id $id \n" ; + } + + $kernel->yield($response->{STREAM}, $_[HEAP]->{'REQUESTS'}->{ $_ }->[0],$_[HEAP]->{'REQUESTS'}->{ $id }->[3], $_[HEAP]->{'RESPONSES'}->{$_}, $_ ); + } + } # Success! return 1; } @@ -835,9 +883,26 @@ # Delete it! delete $_[HEAP]->{'REQUESTS'}->{ $id }->[0]; delete $_[HEAP]->{'REQUESTS'}->{ $id }; + delete $_[HEAP]->{'RESPONSES'}->{ $id }; + + if ( DEBUG ) { + warn 'Delete references to the connection done.'; + } # All done! return 1; +} + + +# Registers a POE inline state (primarly for streaming) +sub Register { + my ( $session , $state, $code_ref) = @_[SESSION, ARG0 .. ARG1]; + + if ( DEBUG ) { + warn 'Registering state in POE session'; + } + + return $session->register_state( $state, $code_ref ); } # End of module
Patches applied and new version released. Many thanks.
Many thanks to BinGOs for applying that patch.