Subject: | work-around for DBD::Pg not using cursors |
As you probably already know, DBD::Pg does not use PostgreSQL's cursors
to fetch data. Instead, it loads all the rows into RAM, then iterates
over them.
DBD::Pg's documentation shows how to explicitly use cursors.
The attached patch implements that usage inside
DBIx::Class::Storage::DBI::Pg.
NOTE: it *breaks* threaded tests. I have a suspicion that such tests ran
before only because the shared statement handle was never actually used
from more than one test (after the first ->next, no more communication
with the db took place).
Subject: | DBIx-Class-Pg-cursors.patch |
diff --git a/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm b/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm
new file mode 100644
index 0000000..96acd77
--- /dev/null
+++ b/lib/DBIx/Class/Storage/DBI/Cursor/Pg.pm
@@ -0,0 +1,89 @@
+package DBIx::Class::Storage::DBI::Cursor::Pg;
+
+use strict;
+use warnings;
+
+use base qw/DBIx::Class::Storage::DBI::Cursor/;
+
+__PACKAGE__->mk_group_accessors('simple' =>
+ qw/sub_sth/
+);
+
+sub _check_cursor_end {
+ my ($self,$storage)=@_;
+ if ($self->sub_sth->rows == 0) {
+ $self->sub_sth(undef);
+ $self->{done} = 1;
+ $storage->_delete_cursor($self->sth);
+ $self->sth(undef);
+ }
+}
+
+sub _run_sub_sth {
+ my ($self,$storage)=@_;
+
+ $self->sub_sth($storage->sth("fetch 1000 from ".$storage->_get_cursor_from_map($self->sth)));
+ $self->sub_sth->execute;
+}
+
+sub _dbh_next {
+ my ($storage, $dbh, $self) = @_;
+
+ $self->_check_dbh_gen;
+
+ return if $self->{done};
+ unless ($self->sth) {
+ $self->sth(($storage->_select(@{$self->{args}}))[1]);
+ }
+
+ unless ($self->sub_sth) {
+ $self->_run_sub_sth($storage);
+ }
+ $self->_check_cursor_end($storage);
+ return if $self->{done};
+
+ my @row = $self->sub_sth->fetchrow_array;
+ if (@row) {
+ $self->{pos}++;
+ } else {
+ $self->_run_sub_sth($storage);
+ $self->_check_cursor_end($storage);
+ return if $self->{done};
+ @row = $self->sub_sth->fetchrow_array;
+ }
+ return @row;
+}
+
+sub _dbh_all {
+ my ($storage, $dbh, $self) = @_;
+
+ $self->_check_dbh_gen;
+ if ($self->sth && $self->sth->{Active}) {
+ if ($self->sub_sth) {
+ $self->sub_sth(undef);
+ $storage->_delete_cursor($self->sth);
+ }
+ $self->sth->finish;
+ $self->sth(undef);
+ }
+ $self->sth(($storage->_select(@{$self->{args}}))[1]);
+
+ my @rows;
+ $self->{done}=0;
+ while (!$self->{done}) {
+ $self->_run_sub_sth($storage);
+ push @rows, @{$self->sub_sth->fetchall_arrayref};
+ $self->_check_cursor_end($storage);
+ }
+
+ return @rows;
+}
+
+sub DESTROY {
+ my ($self) = @_;
+
+ $self->{storage}->_delete_cursor($self->sth) if $self->sub_sth;
+ $self->SUPER::DESTROY;
+}
+
+1;
diff --git a/lib/DBIx/Class/Storage/DBI/Pg.pm b/lib/DBIx/Class/Storage/DBI/Pg.pm
index 4428b1f..bd560b7 100644
--- a/lib/DBIx/Class/Storage/DBI/Pg.pm
+++ b/lib/DBIx/Class/Storage/DBI/Pg.pm
@@ -8,14 +8,20 @@ use base qw/
/;
use mro 'c3';
+use Carp::Clan qw/^DBIx::Class/;
use DBD::Pg qw(:pg_types);
use Scope::Guard ();
use Context::Preserve ();
+use Scalar::Util 'refaddr';
+
+require DBIx::Class::Storage::DBI::Cursor::Pg;
# Ask for a DBD::Pg with array support
warn __PACKAGE__.": DBD::Pg 2.9.2 or greater is strongly recommended\n"
if ($DBD::Pg::VERSION < 2.009002); # pg uses (used?) version::qv()
+__PACKAGE__->mk_group_accessors('simple' => '_cursors_map');
+
sub _supports_insert_returning {
my $self = shift;
@@ -181,6 +187,67 @@ sub bind_attribute_by_data_type {
}
}
+__PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor::Pg');
+
+sub _cursor_name {
+ my ($self)=@_;
+ $self->_cursors_map({}) unless $self->_cursors_map;
+ my $count=scalar keys %{$self->_cursors_map};
+ return 'csr_'.refaddr($self)."_$count";
+}
+
+sub _add_cursor_to_map {
+ my ($self,$sth,$id)=@_;
+ $self->_cursors_map({}) unless $self->_cursors_map;
+ $self->_cursors_map->{refaddr($sth)}=$id;
+}
+
+sub _get_cursor_from_map {
+ my ($self,$sth)=@_;
+ return $self->_cursors_map->{refaddr($sth)};
+}
+
+sub _remove_cursor_from_map {
+ my ($self,$sth)=@_;
+ delete $self->_cursors_map->{refaddr($sth)};
+}
+
+sub _delete_cursor {
+ my ($self,$sth) = @_;
+ my $csr_id=$self->_get_cursor_from_map($sth);
+ my $csr_sql="CLOSE $csr_id";
+ $self->_get_dbh->do($csr_sql);
+ $self->_remove_cursor_from_map($sth);
+}
+
+sub _dbh_sth {
+ my ($self, $dbh, $sql) = @_;
+
+ my ($save_cursor,$csr_id);
+ if ($sql =~ /^SELECT\b/i) {
+ $csr_id=$self->_cursor_name;
+ my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
+ $sql="DECLARE $csr_id CURSOR $hold FOR $sql";
+ $save_cursor=1;
+ }
+ my $sth=$self->SUPER::_dbh_sth($dbh,$sql);
+ if ($save_cursor) {
+ $self->_add_cursor_to_map($sth,$csr_id);
+ }
+ return $sth;
+}
+
+sub select_single {
+ my $self = shift;
+ my $csr=$self->select(@_);
+ my @row=$csr->next;
+ my @nextrow = $csr->next if @row;
+ if(@row && @nextrow) {
+ carp "Query returned more than one row. SQL that returns multiple rows is DEPRECATED for ->find and ->single";
+ }
+ return @row;
+}
+
sub _svp_begin {
my ($self, $name) = @_;
diff --git a/t/72pg_cursors.t b/t/72pg_cursors.t
new file mode 100644
index 0000000..00fe987
--- /dev/null
+++ b/t/72pg_cursors.t
@@ -0,0 +1,66 @@
+use strict;
+use warnings;
+
+use Test::More;
+use lib qw(t/lib);
+use DBICTest;
+use Time::HiRes qw(gettimeofday tv_interval);
+
+my ($dsn, $dbuser, $dbpass) = @ENV{map { "DBICTEST_PG_${_}" } qw/DSN USER PASS/};
+
+plan skip_all => 'Set $ENV{DBICTEST_PG_DSN}, _USER and _PASS to run this test'
+ unless ($dsn && $dbuser);
+
+plan tests => 2;
+
+my $schema = DBICTest::Schema->connection($dsn, $dbuser, $dbpass, { AutoCommit => 1 });
+
+my $dbh = $schema->storage->dbh;
+
+{
+ local $SIG{__WARN__} = sub {};
+ $dbh->do('DROP TABLE IF EXISTS artist');
+ $dbh->do(q[
+ CREATE TABLE artist
+ (
+ artistid serial NOT NULL PRIMARY KEY,
+ name varchar(100),
+ rank integer,
+ charfield char(10)
+ );
+ ],{ RaiseError => 1, PrintError => 1 });
+}
+
+# copied from 100populate.t
+
+my $start_id = 'populateXaaaaaa';
+my $rows=2005;
+my $offset = 3;
+
+$schema->populate('Artist', [ [ qw/artistid name/ ], map { [ ($_ + $offset) => $start_id++ ] } ( 1 .. $rows ) ] );
+is (
+ $schema->resultset ('Artist')->search ({ name => { -like => 'populateX%' } })->count,
+ $rows,
+ 'populate created correct number of rows with massive AoA bulk insert',
+);
+
+{
+ my $rs=$schema->resultset('Artist')->search({});
+ my $count=0;
+ $count++ while $rs->next;
+ is($count,$rows,'get all the rows');
+}
+
+{
+ my $rs=$schema->resultset('Artist')->search({});
+ my $t0=[gettimeofday];
+ $rs->first;
+ diag('Time for first: '.tv_interval($t0));
+}
+
+{
+ my $rs=$schema->resultset('Artist')->search({});
+ my $t0=[gettimeofday];
+ $rs->all;
+ diag('Time for all: '.tv_interval($t0));
+}