#!/usr/bin/perl
# This program forks children to handle a number of slow tasks.  It
# uses POE::Filter::Reference so the child tasks can send back
# arbitrary Perl data.  The constant MAX_CONCURRENT_TASKS limits the
# number of forked processes that can run at any given time.
use warnings;
use strict;
use POE qw(Wheel::Run Filter::Reference);
sub MAX_CONCURRENT_TASKS () { 3 }
my @tasks = qw(one two three four five six seven eight nine ten);

# Start the session that will manage all the children.  The _start and
# next_task events are handled by the same function.
POE::Session->create(
  inline_states => {
    _start      => \&start_tasks,
    next_task   => \&start_tasks,
    task_result => \&handle_task_result,
    task_done   => \&handle_task_done,
    task_debug  => \&handle_task_debug,
    sig_child   => \&sig_child,
  }
);

# Start as many tasks as needed so that the number of tasks is no more
# than MAX_CONCURRENT_TASKS.  Every wheel event is accompanied by the
# wheel's ID.  This function saves each wheel by its ID so it can be
# referred to when its events are handled.
# Wheel::Run's Program may be a code reference.  Here it's called via
# a short anonymous sub so we can pass in parameters.
sub start_tasks {
  my ($kernel, $heap) = @_[KERNEL, HEAP];
  while (keys(%{$heap->{task}}) < MAX_CONCURRENT_TASKS) {
    my $next_task = shift @tasks;
    last unless defined $next_task;
    print "Starting task for $next_task...\n";
    my $task = POE::Wheel::Run->new(
      Program      => sub { do_stuff($next_task) },
      StdoutFilter => POE::Filter::Reference->new(),
      StdoutEvent  => "task_result",
      StderrEvent  => "task_debug",
      CloseEvent   => "task_done",
    );
    $heap->{task}->{$task->ID} = $task;
    $kernel->sig_child($task->PID, "sig_child");
  }
}

# This function is not a POE function!  It is a plain sub that will be
# run in a forked off child.  It uses POE::Filter::Reference so that
# it can return arbitrary information.  All POE filters can be used by
# themselves, but their parameters and return values are always list
# references.
sub do_stuff {
  binmode(STDOUT);    # Required for this to work on MSWin32
  my $task   = shift;
  my $filter = POE::Filter::Reference->new();

  # Simulate a long, blocking task.
  sleep(rand 5);

  # Generate a bogus result.  Note that this result will be passed by
  # reference back to the parent process via POE::Filter::Reference.
  my %result = (
    task   => $task,
    status => "seems ok to me",
  );

  # Generate some output via the filter.  Note the strange use of list
  # references.
  my $output = $filter->put([\%result]);
  print @$output;
}

# Handle information returned from the task.  Since we're using
# POE::Filter::Reference, the $result is however it was created in the
# child process.  In this sample, it's a hash reference.
sub handle_task_result {
  my $result = $_[ARG0];
  print "Result for $result->{task}: $result->{status}\n";
}

# Catch and display information from the child's STDERR.  This was
# useful for debugging since the child's warnings and errors were not
# being displayed otherwise.
sub handle_task_debug {
  my $result = $_[ARG0];
  print "Debug: $result\n";
}

# The task is done.  Delete the child wheel, and try to start a new
# task to take its place.
sub handle_task_done {
  my ($kernel, $heap, $task_id) = @_[KERNEL, HEAP, ARG0];
  delete $heap->{task}->{$task_id};
  $kernel->yield("next_task");
}

# Detect the CHLD signal as each of our children exits.
sub sig_child {
  my ($heap, $sig, $pid, $exit_val) = @_[HEAP, ARG0, ARG1, ARG2];
  my $details = delete $heap->{$pid};

  # warn "$$: Child $pid exited";
}

# Run until there are no more tasks.
$poe_kernel->run();
exit 0;