Subject: | Add streaming feature |
Hello,
I managed to get streaming with SimpleHTTP, and it seems to work, could
you please include it in the main branch if the code suits you quality
requirements.
I attach the patches and whish that you apply them.
Thank you
Eriam
Subject: | PoCo-Server-SimpleHTTP-Response.patch |
Index: SimpleHTTP/Response.pm
===================================================================
RCS file: c:\versions/Server/SimpleHTTP/Response.pm,v
retrieving revision 1.1
diff -u -r1.1 Response.pm
--- SimpleHTTP/Response.pm 4 Nov 2006 05:49:03 -0000 1.1
+++ SimpleHTTP/Response.pm 4 Nov 2006 07:14:27 -0000
@@ -54,6 +54,17 @@
return shift->{'CONNECTION'};
}
+sub stream {
+ my $self = shift;
+
+ $self->{'STREAM'} = shift;
+
+}
+sub is_streaming {
+ my $self = shift;
+
+ $self->{'IS_STREAMING'} = 1;
+}
# End of module
1;
Subject: | PoCo-Server-SimpleHTTP.patch |
Index: SimpleHTTP.pm
===================================================================
RCS file: c:\versions/Server/SimpleHTTP.pm,v
retrieving revision 1.1
diff -u -r1.1 SimpleHTTP.pm
--- SimpleHTTP.pm 4 Nov 2006 05:49:03 -0000 1.1
+++ SimpleHTTP.pm 5 Nov 2006 13:36:36 -0000
@@ -15,6 +15,7 @@
use POE::Wheel::ReadWrite;
use POE::Driver::SysRW;
use POE::Filter::HTTPD;
+use POE::Filter::Stream;
# Other miscellaneous modules we need
use Carp qw( croak );
@@ -184,10 +185,13 @@
POE::Session->create(
# Our subroutines
'inline_states' => {
- # Maintenance events
+ # Maintenance events
'_start' => \&StartServer,
'_stop' => \&FindRequestLeaks,
'_child' => sub {},
+
+ # Register states
+ 'REGISTER' => \&Register,
# HANDLER stuff
'GETHANDLERS' => \&GetHandlers,
@@ -637,6 +641,7 @@
# Add this response to the wheel
$_[HEAP]->{'REQUESTS'}->{ $id }->[2] = $response;
+ $_[HEAP]->{'REQUESTS'}->{ $id }->[3] = $request;
# Find which handler will handle this one
foreach my $handler ( @{ $_[HEAP]->{'HANDLERS'} } ) {
@@ -668,9 +673,16 @@
if ( DEBUG ) {
warn "Got Flush event for wheel ID ( $id )";
}
-
+
+ if ($_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 2 ) {
+ # Do the stream !
+ if ( DEBUG ) {
+ warn "Streaming in progress ...!";
+ }
+ return;
+ }
# Check if we are shutting down
- if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] ) {
+ elsif ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 1 ) {
# Shutdown read/write on the wheel
$_[HEAP]->{'REQUESTS'}->{ $id }->[0]->shutdown_input();
$_[HEAP]->{'REQUESTS'}->{ $id }->[0]->shutdown_output();
@@ -679,7 +691,8 @@
# Tracked down by Paul Visscher
delete $_[HEAP]->{'REQUESTS'}->{ $id }->[0];
delete $_[HEAP]->{'REQUESTS'}->{ $id };
- } else {
+ }
+ else {
# Ignore this, eh?
if ( DEBUG ) {
warn "Got Flush event for socket ( $id ) when we did not send anything!";
@@ -725,7 +738,7 @@
# Output to the client!
sub Request_Output {
# ARG0 = HTTP::Response object
- my $response = $_[ ARG0 ];
+ my ($kernel, $response) = @_[KERNEL, ARG0 ];
# Check if we got it
if ( ! defined $response or ! UNIVERSAL::isa( $response, 'HTTP::Response' ) ) {
@@ -740,6 +753,16 @@
# Get the wheel ID
my $id = $response->_WHEEL;
+ if (defined $response->{'STREAM'}) {
+ # Keep track if we plan to stream ...
+ if ( $_[HEAP]->{'RESPONSES'}->{ $id } ) {
+ $response = $_[HEAP]->{'RESPONSES'}->{ $id };
+ }
+ else {
+ $_[HEAP]->{'RESPONSES'}->{ $id } = $response;
+ }
+ }
+
# Check if the wheel exists ( sometimes it gets closed by the client, but the application doesn't know that... )
if ( ! exists $_[HEAP]->{'REQUESTS'}->{ $id } ) {
# Debug stuff
@@ -752,7 +775,7 @@
}
# Check if we have already sent the response
- if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] ) {
+ if ( $_[HEAP]->{'REQUESTS'}->{ $id }->[1] == 1) {
# Tried to send twice!
die 'Tried to send a response to the same connection twice!';
}
@@ -782,17 +805,42 @@
$response->header( 'Content-Type', 'text/html' );
}
- # Send it out!
- $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->put( $response );
-
- # Mark this socket done
- $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 1;
-
- # Debug stuff
- if ( DEBUG ) {
- warn "Completed with Wheel ID $id";
- }
-
+ unless (defined $response->{STREAM}) {
+
+ # Send it out!
+ $_[HEAP]->{'REQUESTS'}->{ $id }->[0]->put( $response );
+
+ # Mark this socket done
+ $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 1;
+
+ # Debug stuff
+ if ( DEBUG ) {
+ warn "Completed with Wheel ID $id";
+ }
+ }
+ else {
+
+ # Loops through current streams
+ foreach (keys %{ $_[HEAP]->{'RESPONSES'} }) {
+
+
+ # Sets the correct POE::Filter
+ unless (defined $response->{'IS_STREAMING'}) {
+ # Mark this socket done
+ $_[HEAP]->{'REQUESTS'}->{ $id }->[1] = 2;
+
+ #
+ $_[HEAP]->{'REQUESTS'}->{ $_ }->[0]->set_output_filter(POE::Filter::Stream->new() ) ;
+ $response->is_streaming();
+ }
+
+ if ( DEBUG ) {
+ print "Sending stream via ".$response->{STREAM}." to $_ with id $id \n" ;
+ }
+
+ $kernel->yield($response->{STREAM}, $_[HEAP]->{'REQUESTS'}->{ $_ }->[0],$_[HEAP]->{'REQUESTS'}->{ $id }->[3], $_[HEAP]->{'RESPONSES'}->{$_}, $_ );
+ }
+ }
# Success!
return 1;
}
@@ -835,9 +883,26 @@
# Delete it!
delete $_[HEAP]->{'REQUESTS'}->{ $id }->[0];
delete $_[HEAP]->{'REQUESTS'}->{ $id };
+ delete $_[HEAP]->{'RESPONSES'}->{ $id };
+
+ if ( DEBUG ) {
+ warn 'Delete references to the connection done.';
+ }
# All done!
return 1;
+}
+
+
+# Registers a POE inline state (primarly for streaming)
+sub Register {
+ my ( $session , $state, $code_ref) = @_[SESSION, ARG0 .. ARG1];
+
+ if ( DEBUG ) {
+ warn 'Registering state in POE session';
+ }
+
+ return $session->register_state( $state, $code_ref );
}
# End of module