POE::Component::DBIAgent manages coprocesses that perform DBI requests without blocking the main program. It is written by Rob Bloodgood, and it is available on the CPAN.

This sample program exercises POE::Component::DBIAgent in fairly simple ways.

Sample output:

  - Insert done. -
  - Insert done. -
  Query row: name=name; value=this is its value
  Query row: name=other; value=this is some other value
  - Query done. (2 records) -
  - Update done. -
  Query row: name=other; value=this is some other value
  Query row: name=name; value=this is the new value
  - Query done. (2 records) -
  - Delete done. -
  Query row: name=other; value=this is some other value
  - Query done. (1 record) -
  - Delete done. -
  - Query done. (0 records) -

And the program itself:

#!/usr/bin/perl

use warnings;
use strict;

# The test database name, user name, and password.  The database must
# have a table called "test" in it, with two fields: name char(5),
# value char(50).

sub DB_NAME () { "test" }
sub DB_USER () { "" }
sub DB_PASS () { "" }

use POE;
use POE::Component::DBIAgent;

# Create the session that will do transactions.

POE::Session->create(
  inline_states => {
    _start      => \&start_session,
    query_done  => \&handle_query_response,
    update_done => \&handle_update_response,
    insert_done => \&handle_insert_response,
    delete_done => \&handle_delete_response,
    final_done  => \&handle_final_response,
  }
);

$poe_kernel->run();
exit 0;

# React to the session having started.  This sets up the DBI agent
# component and sends it several queries all at once.  This program
# requires all its queries to run in the order that query() is called,
# so Count has been set to 1.  Atomic queries can benefit from a
# higher concurrency Count.

# DSN is essentially the parameters given to DBI->connect().  See your
# DBD::Foo man page for details.  Queries is a hash of query
# statements.

sub start_session {
  my ($kernel, $heap) = @_[KERNEL, HEAP];

  $heap->{dbi_helper} = POE::Component::DBIAgent->new(
    DSN     => ['dbi:Pg:dbname=' . DB_NAME, DB_USER, DB_PASS],
    Count   => 1,
    Queries => {
      query  => "select * from test",
      update => "update test set value = ? where name = ?",
      insert => "insert into test values (?, ?)",
      delete => "delete from test where name = ?",
    },
  );

  # DBIAgent will send its responses back to a session we provide.

  my $session_id = $_[SESSION]->ID;
  my $helper     = $heap->{dbi_helper};

  # Queue up several transactions.  Each query is in the form
  # (query_name, response_session, response_event, query_parameters).

  $helper->query(
    insert => $session_id => "insert_done" => ("name", "this is its value"));

  $helper->query(insert => $session_id => "insert_done" =>
      ("other", "this is some other value"));

  $helper->query(query => $session_id => "query_done");

  $helper->query(
    update => $session_id => "update_done" => ("this is the new value", "name")
  );

  $helper->query(query => $session_id => "query_done");

  $helper->query(delete => $session_id => "delete_done" => ("name"));

  $helper->query(query => $session_id => "query_done");

  $helper->query(delete => $session_id => "delete_done" => ("other"));

  $helper->query(query => $session_id => "final_done");
}

# This is a plain perl function, not a POE event handler.  It displays
# the information from a single query record.

my $query_record_count = 0;

sub display_query_record {
  my $query_record = shift;

  if ($query_record eq 'EOF') {
    my $plural = "s";
    $plural = "" if $query_record_count == 1;
    printf "- Query done. ($query_record_count record$plural) -\n";
    $query_record_count = 0;
  }
  else {
    my ($name, $value) = @$query_record;
    $name  =~ s/\s+$//;
    $value =~ s/\s+$//;
    print "Query row: name=$name; value=$value\n";
    $query_record_count++;
  }
}

# Display the results of a SELECT query (the "query" query) in one of
# two ways.  This is the usual way, which just displays the results by
# calling display_query_record().

sub handle_query_response {
  my $query_record = $_[ARG0];
  display_query_record($query_record);
}

# The "final" way sends this session a SIGINT after the last record is
# displayed.  The SIGINT stops the program when we're done.

sub handle_final_response {
  my ($kernel, $session, $query_record) = @_[KERNEL, SESSION, ARG0];
  display_query_record($query_record);
  $kernel->signal($session, "INT") if $query_record eq 'EOF';
}

# Acknowledge that an UPDATE query has finished.

sub handle_update_response {
  print "- Update done. -\n";
}

# Acknowledge that an INSERT query has finished.

sub handle_insert_response {
  print "- Insert done. -\n";
}

# Acknowledge that a DELETE query has finished.

sub handle_delete_response {
  print "- Delete done. -\n";
}