diff --git a/META.yml b/META.yml
index 31a67b5..95a190d 100644
--- a/META.yml
+++ b/META.yml
@@ -10,9 +10,11 @@ configure_requires:
ExtUtils::MakeMaker: 0
build_requires:
ExtUtils::MakeMaker: 0
+ Variable::Magic: 0
requires:
ExtUtils::MakeMaker: 6.03
Test::More: 0
+ Scalar::Util: 0
no_index:
directory:
- t
diff --git a/RabbitMQ.pm b/RabbitMQ.pm
index 608ab6d..6e147a4 100644
--- a/RabbitMQ.pm
+++ b/RabbitMQ.pm
@@ -8,6 +8,7 @@ $VERSION = "0.2.0";
@ISA = qw/DynaLoader/;
bootstrap Net::RabbitMQ $VERSION ;
+use Scalar::Util qw(blessed);
=head1 NAME
@@ -317,4 +318,21 @@ C<$subroutine> is a perl coderef that takes two arguments:
=cut
+sub publish {
+ my ($self, $channel, $routing_key, $body, $options, $props) = @_;
+
+ $options ||= {};
+ $props ||= {};
+
+ # Do a shallow clone to avoid modifying variable passed by caller
+ $props = { %$props };
+
+ # Convert blessed variables in headers to strings
+ if( $props->{headers} ) {
+ $props->{headers} = { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
+ }
+
+ $self->_publish($channel, $routing_key, $body, $options, $props);
+}
+
1;
diff --git a/RabbitMQ.xs b/RabbitMQ.xs
index 8805998..54a5afa 100644
--- a/RabbitMQ.xs
+++ b/RabbitMQ.xs
@@ -171,7 +171,29 @@ int internal_recv(HV *RETVAL, amqp_connection_state_t conn, int piggyback) {
hv_store(props, "timestamp", strlen("timestamp"),
newSViv(p->timestamp), 0);
}
-
+ if (p->_flags & AMQP_BASIC_HEADERS_FLAG) {
+ int i;
+ SV *val;
+ HV *headers = newHV();
+ hv_store( props, "headers", strlen("headers"), newRV_noinc((SV *)headers), 0 );
+
+ for( i=0; i < p->headers.num_entries; ++i ) {
+ if( p->headers.entries[i].kind == 'I' ) {
+ hv_store( headers,
+ p->headers.entries[i].key.bytes, p->headers.entries[i].key.len,
+ newSViv(p->headers.entries[i].value.i32),
+ 0
+ );
+ }
+ else if( p->headers.entries[i].kind == 'S' ) {
+ hv_store( headers,
+ p->headers.entries[i].key.bytes, p->headers.entries[i].key.len,
+ newSVpvn( p->headers.entries[i].value.bytes.bytes, p->headers.entries[i].value.bytes.len),
+ 0
+ );
+ }
+ }
+ }
body_target = frame.payload.properties.body_size;
body_received = 0;
@@ -467,7 +489,7 @@ net_rabbitmq_purge(conn, channel, queuename, no_wait = 0)
die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Purging queue");
int
-net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = NULL)
+net_rabbitmq__publish(conn, channel, routing_key, body, options = NULL, props = NULL)
Net::RabbitMQ conn
int channel
HV *options;
@@ -546,7 +568,6 @@ net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = N
properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
}
if (NULL != (v = hv_fetch(props, "headers", strlen("headers"), 0))) {
- amqp_create_table(conn, &properties.headers, 10);
HV *headers;
HE *he;
I32 iter;
@@ -555,14 +576,21 @@ net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = N
SV *value;
headers = (HV *)SvRV(*v);
+ amqp_create_table(conn, &properties.headers, HvKEYS(headers));
hv_iterinit(headers);
while (NULL != (he = hv_iternext(headers))) {
key = hv_iterkey(he, &retlen);
value = hv_iterval(headers, he);
- if (SvTYPE(value) == SVt_PV) {
+
+ if (SvGMAGICAL(value))
+ mg_get(value);
+
+ if (SvPOK(value)) {
amqp_table_add_string(conn, &properties.headers, amqp_cstring_bytes(key), amqp_cstring_bytes(SvPV_nolen(value)));
- } else if (SvTYPE(value) == SVt_IV) {
+ } else if (SvIOK(value)) {
amqp_table_add_int(conn, &properties.headers, amqp_cstring_bytes(key), (uint64_t) SvIV(value));
+ } else {
+ Perl_croak( aTHX_ "Unsupported SvType for header value: %d", SvTYPE(value) );
}
}
properties._flags |= AMQP_BASIC_HEADERS_FLAG;
diff --git a/amqp_table.c b/amqp_table.c
index f818007..bfbd17a 100644
--- a/amqp_table.c
+++ b/amqp_table.c
@@ -179,19 +179,35 @@ void amqp_create_table(amqp_connection_state_t state,
output->num_entries = 0;
}
+amqp_table_entry_t *amqp_table_add_entry( amqp_connection_state_t state,
+ amqp_table_t *table,
+ amqp_bytes_t key)
+{
+ amqp_table_entry_t *entry;
+
+ if (table->num_entries == table->size)
+ {
+ int new_size = table->size * 2;
+ amqp_table_entry_t *new_entries = amqp_pool_alloc( &state->frame_pool, new_size * sizeof(amqp_table_entry_t) );
+ memcpy( new_entries, table->entries, table->size * sizeof(amqp_table_entry_t) );
+ table->size = new_size;
+ table->entries = new_entries;
+ }
+
+ entry = &table->entries[table->num_entries];
+ table->num_entries++;
+ entry->key = key;
+ return entry;
+}
+
void amqp_table_add_string(amqp_connection_state_t state,
amqp_table_t *output,
amqp_bytes_t key,
amqp_bytes_t value)
{
-
- if (output->num_entries == output->size)
- return;
-
- output->entries[output->num_entries].kind = 'S';
- output->entries[output->num_entries].key = key;
- output->entries[output->num_entries].value.bytes = value;
- output->num_entries++;
+ amqp_table_entry_t *entry = amqp_table_add_entry( state, output, key );
+ entry->kind = 'S';
+ entry->value.bytes = value;
}
void amqp_table_add_int(amqp_connection_state_t state,
@@ -199,12 +215,7 @@ void amqp_table_add_int(amqp_connection_state_t state,
amqp_bytes_t key,
int value)
{
-
- if (output->num_entries == output->size)
- return;
-
- output->entries[output->num_entries].kind = 'I';
- output->entries[output->num_entries].key = key;
- output->entries[output->num_entries].value.i32 = value;
- output->num_entries++;
+ amqp_table_entry_t *entry = amqp_table_add_entry( state, output, key );
+ entry->kind = 'I';
+ entry->value.i32 = value;
}
diff --git a/t/004_selfconsume.t b/t/004_selfconsume.t
index a8da9b4..4741eeb 100644
--- a/t/004_selfconsume.t
+++ b/t/004_selfconsume.t
@@ -1,4 +1,4 @@
-use Test::More tests => 10;
+use Test::More tests => 11;
use strict;
my $dtag=(unpack("L",pack("N",1)) != 1)?'0100000000000000':'0000000000000001';
@@ -20,6 +20,7 @@ isnt($queuename, '', "queue_declare -> private name");
eval { $mq->queue_bind(1, $queuename, "nr_test_x", "nr_test_q"); };
is($@, '', "queue_bind");
eval { $mq->publish(1, "nr_test_q", "Magic Transient Payload", { exchange => "nr_test_x" }); };
+is($@, '', "publish");
eval { $mq->consume(1, $queuename, {consumer_tag=>'ctag', no_local=>0,no_ack=>1,exclusive=>0}); };
is($@, '', "consume");
diff --git a/t/013_headers.t b/t/013_headers.t
new file mode 100755
index 0000000..1d51553
--- /dev/null
+++ b/t/013_headers.t
@@ -0,0 +1,117 @@
+use Test::More tests => 20;
+use strict;
+
+package TestBlessings;
+use overload
+ '""' => sub { uc ${$_[0]} },
+ ;
+
+sub new {
+ my ($class, $self) = @_;
+
+ bless \$self, $class;
+}
+
+package main;
+
+my $host = $ENV{'MQHOST'} || "dev.rabbitmq.com";
+$host = 'localhost'; # FIXME
+
+use_ok('Net::RabbitMQ');
+
+my $mq = Net::RabbitMQ->new();
+ok($mq, "Created object");
+
+eval { $mq->connect($host, { user => "guest", password => "guest" }); };
+is($@, '', "connect");
+
+eval { $mq->channel_open(1); };
+is($@, '', "channel_open");
+
+eval { $mq->queue_declare(1, "nr_test_hole", { passive => 0, durable => 1, exclusive => 0, auto_delete => 0 }); };
+is($@, '', "queue_declare");
+
+eval { $mq->queue_bind(1, "nr_test_hole", "nr_test_x", "nr_test_route"); };
+is($@, '', "queue_bind");
+
+eval { 1 while($mq->get(1, "nr_test_hole")); };
+is($@, '', "drain queue");
+
+my $headers = {
+ abc => 123,
+ def => 'xyx',
+ head3 => 3,
+ head4 => 4,
+ head5 => 5,
+ head6 => 6,
+ head7 => 7,
+ head8 => 8,
+ head9 => 9,
+ head10 => 10,
+ head11 => 11,
+ head12 => 12,
+};
+eval { $mq->publish( 1, "nr_test_route", "Header Test",
+ { exchange => "nr_test_x" },
+ { headers => $headers },
+ );
+};
+
+is( $@, '', "publish" );
+
+eval { $mq->consume(1, "nr_test_hole", {consumer_tag=>'ctag', no_local=>0,no_ack=>1,exclusive=>0}); };
+is($@, '', "consume");
+
+my $msg;
+eval { $msg = $mq->recv() };
+is( $@, '', 'recv' );
+
+is( $msg->{body}, 'Header Test', "Received body" );
+use Data::Dumper;
+diag Dumper($msg);
+is( exists $msg->{props}, 1, "Props exist" );
+is( exists $msg->{props}{headers}, 1, "Headers exist" );
+is_deeply( $msg->{props}{headers}, $headers, "Received headers" );
+
+$headers = {
+ blah => TestBlessings->new('foo'),
+};
+eval { $mq->publish( 1, "nr_test_route", "Header Test",
+ { exchange => "nr_test_x" },
+ { headers => $headers },
+ );
+};
+is( $@, '', 'publish with blessed header values' );
+
+eval { $msg = $mq->recv() };
+is( $@, '', 'recv from blessed header values' );
+
+is_deeply( $msg->{props}{headers}, $headers, "Received blessed headers" );
+
+SKIP: {
+ use Variable::Magic qw(wizard cast);
+
+ # Currently not able to get Variable::Magic to work without requiring it
+ #skip "Variable::Magic not available", 3 if $@;
+
+ my $wizard = wizard
+ set => sub { },
+ ;
+ my $magic = 'foo';
+ cast $magic, $wizard;
+ my $headers = { blah => $magic, };
+ diag Dumper($headers);
+
+ eval { $mq->publish( 1, "nr_test_route", "Header Test",
+ { exchange => "nr_test_x" },
+ { headers => $headers },
+ );
+ };
+ is( $@, '', 'publish with magic header values' );
+
+ skip "Publish failed", 2 if $@;
+ eval { $msg = $mq->recv() };
+ is( $@, '', 'recv from magic header values' );
+
+ is_deeply( $msg->{props}{headers}, $headers, "Received magic headers" );
+};