#!/usr/bin/perl use warnings; use strict; use constant DEBUG => 1; # Enable much runtime information. use constant MAX_PROCESSES => 10; # Total server process count. use constant SERVER_PORT => 8888; # Server port. use constant TESTING_CHURN => 1; # Randomly test process respawning. use POE; # Base features. use POE::Filter::HTTPD; # For serving HTTP content. use POE::Wheel::ReadWrite; # For socket I/O. use POE::Wheel::SocketFactory; # For serving socket connections. # We are the parent process. Start the server, and then run it. # Exit when it's done. POE::Session->create( inline_states => { _start => \&listener_start, _stop => \&listener_stop, do_fork => \&listener_do_fork, got_error => \&listener_got_error, got_sig_int => \&listener_got_sig_int, got_sig_child => \&listener_got_sig_child, got_connection => \&listener_got_connection, _child => sub { undef }, }, heap => {max_processes => MAX_PROCESSES}, ); POE::Kernel->run(); exit; ### Listener callbacks. # The server has started. Set up the listener socket and begin # tracking child processes. Fork the initial child processes. sub listener_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; $heap->{server} = POE::Wheel::SocketFactory->new( BindPort => SERVER_PORT, SuccessEvent => "got_connection", FailureEvent => "got_error", Reuse => "yes", ); $kernel->sig(INT => "got_sig_int"); $heap->{children} = {}; $heap->{is_a_child} = 0; warn "Server $$ has begun listening on port ", SERVER_PORT, "\n"; $kernel->yield("do_fork"); } # Log the fact that the main server session has stopped. sub listener_stop { my $heap = $_[HEAP]; DEBUG and warn "Server $$ stopped.\n"; } # Shut down the server if it encounters an error. sub listener_got_error { my ($heap, $syscall, $errno, $error) = @_[HEAP, ARG0 .. ARG2]; DEBUG and warn( "Server $$ got $syscall error $errno: $error\n", "Server $$ is shutting down.\n", ); delete $heap->{server}; } # The parent may fork() more processes as needed. Because events are # copied during fork(), we must make sure this is only done in the # parent. Otherwise, hilarity in the form of forkbombing may ensue. sub listener_do_fork { my ($kernel, $heap) = @_[KERNEL, HEAP]; return if $heap->{is_a_child}; while (scalar(keys %{$heap->{children}}) < $heap->{max_processes}) { my $pid = fork(); # Fork failed. Try again shortly. unless (defined($pid)) { DEBUG and warn( "Server $$ fork failed: $!\n", "Server $$ will retry fork shortly.\n", ); $kernel->delay(do_fork => 1); return; } # Parent. Add the child process to its list. if ($pid) { DEBUG and warn "Server $$ forked child $pid\n"; $heap->{children}->{$pid} = 1; $kernel->sig_child($pid, "got_sig_child"); next; } # Child. Clear the child process list. $kernel->has_forked(); $heap->{is_a_child} = 1; $heap->{children} = {}; return; } } # Gracefully stop a server on SIGINT. sig_handled() prevents the # signal from gracelessly forcing the server to stop. sub listener_got_sig_int { DEBUG and warn "Server $$ received SIGINT.\n"; delete $_[HEAP]->{server}; $_[KERNEL]->sig_handled(); } # Reap child processes, and fork new ones if the server is still # listening for connections. Ignore child processes that we've # already stopped tracking. sub listener_got_sig_child { my ($kernel, $heap, $child_pid) = @_[KERNEL, HEAP, ARG1]; return unless delete $heap->{children}->{$child_pid}; DEBUG and warn "Server $$ reaped child $child_pid.\n"; $kernel->yield("do_fork") if exists $_[HEAP]->{server}; } # Handle a new connection. # Create a new session to interact with the client. sub listener_got_connection { my ($heap, $socket, $peer_addr, $peer_port) = @_[HEAP, ARG0, ARG1, ARG2]; DEBUG and warn "Server $$ received a connection.\n"; POE::Session->create( inline_states => { _start => \&interactor_start, _stop => \&interactor_stop, got_request => \&interactor_got_request, got_flush => \&interactor_flushed_request, got_error => \&interactor_got_error, _parent => sub { 0 }, }, heap => { socket => $socket, peer_addr => $peer_addr, peer_port => $peer_port, }, ); # Gracefully exit if testing process churn. delete $heap->{server} if (TESTING_CHURN and $heap->{is_a_child} and (rand() < 0.25)); } ### Interactor callbacks. # The interactor has started for a client socket. Begin reading # and writing on the socket. sub interactor_start { my $heap = $_[HEAP]; $heap->{client} = POE::Wheel::ReadWrite->new( Handle => $heap->{socket}, Filter => POE::Filter::HTTPD->new(), InputEvent => "got_request", ErrorEvent => "got_error", FlushedEvent => "got_flush", ); DEBUG and warn "Client handler $$/", $_[SESSION]->ID, " started.\n"; } # Log the fact that the interactor has stopped. sub interactor_stop { DEBUG and warn "Client handler $$/", $_[SESSION]->ID, " stopped.\n"; } # The interactor has received a request. If it's an HTTP::Response # object, then some error has occurred. Send the response back to the # client, and return immediately. Otherwise, parse and process the # request, generating an HTTP::Response object and sending it back. sub interactor_got_request { my ($heap, $request) = @_[HEAP, ARG0]; DEBUG and warn("Client handler $$/", $_[SESSION]->ID, " is handling a request.\n"); if ($request->isa("HTTP::Response")) { $heap->{client}->put($request); return; } my $response = HTTP::Response->new(200); $response->content_type("text/html"); $response->content( "<" . "html>" . # Avoid confusing the wiki. "<head><title>POE Pre-Forking Server Example</title></head>" . "<body><table border=1>" . join("", map { "<tr><td>$_</td><td>" . $request->header($_) . "</td></tr>" } $request->header_field_names()) . "<tr><td>PID</td><td>$$</td></tr>" . "<tr><td>time</td><td>" . scalar(gmtime) . " GMT</td></tr>" . "</table></body>" . "</html>" ); $heap->{client}->put($response); } # Stop interacting if a socket error has occurred. sub interactor_got_error { my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2]; DEBUG and warn( "Client handler $$/", $_[SESSION]->ID, " got $operation error $errnum: $errstr\n", "Client handler $$/", $_[SESSION]->ID, " is shutting down.\n" ); delete $heap->{client}; } # Shut down the connection after a response has been fully flushed to # the socket. No, we don't support keep-alive. sub interactor_flushed_request { my $heap = $_[HEAP]; DEBUG and warn( "Client handler $$/", $_[SESSION]->ID, " flushed its response.\n", "Client handler $$/", $_[SESSION]->ID, " is shutting down.\n" ); delete $heap->{client}; } __END__ This program could be refactored into two classes, one representing the listener and another representing interactors. Each new connection would instantiate an interactor.
Forking Web Server FAQ
Q: I can't seem to figure out how requests get bound to child proccesses. Am I correct to believe that the parent and all children are listening for connection events?
A: Yes. In UNIX and similar operating systems, forked processes have duplicate copies of sockets, including the listening one. When an incoming connection arrives, the OS kernel passes it to one of the listeners.