Skip Menu |

This queue is for tickets about the Net-RabbitMQ CPAN distribution.

Report information
The Basics
Id: 68128
Status: open
Priority: 0/
Queue: Net-RabbitMQ

People
Owner: Nobody in particular
Requestors: awick [...] purple.org
Cc:
AdminCc:

Bug Information
Severity: Important
Broken in: (no value)
Fixed in: (no value)



Subject: Add support for set/get property headers
Would like to set/get the property headers. Looks like amqp_framing.c already supports AMQP_BASIC_HEADERS_FLAG but RabbitMQ.xs doesn't, and I'm not sure how to add.
From: awick [...] purple.org
First attempt at a patch to setting feature
Subject: headers.diff
diff -c /Users/awick/Downloads/Net-RabbitMQ-0.2.0/RabbitMQ.xs ./RabbitMQ.xs *** /Users/awick/Downloads/Net-RabbitMQ-0.2.0/RabbitMQ.xs 2011-04-19 22:56:50.000000000 -0400 --- ./RabbitMQ.xs 2011-05-12 13:14:53.000000000 -0400 *************** *** 545,550 **** --- 545,572 ---- properties.timestamp = (uint64_t) SvIV(*v); properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG; } + if (NULL != (v = hv_fetch(props, "headers", strlen("headers"), 0))) { + amqp_create_table(conn, &properties.headers, 10); + HV *headers; + HE *he; + I32 iter; + char *key; + I32 retlen; + SV *value; + + headers = (HV *)SvRV(*v); + hv_iterinit(headers); + while (NULL != (he = hv_iternext(headers))) { + key = hv_iterkey(he, &retlen); + value = hv_iterval(headers, he); + if (SvTYPE(value) == SVt_PV) { + amqp_table_add_string(conn, &properties.headers, amqp_cstring_bytes(key), amqp_cstring_bytes(SvPV_nolen(value))); + } else if (SvTYPE(value) == SVt_IV) { + amqp_table_add_int(conn, &properties.headers, amqp_cstring_bytes(key), (uint64_t) SvIV(value)); + } + } + properties._flags |= AMQP_BASIC_HEADERS_FLAG; + } } rv = amqp_basic_publish(conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b); RETVAL = rv; diff -c /Users/awick/Downloads/Net-RabbitMQ-0.2.0/amqp.h ./amqp.h *** /Users/awick/Downloads/Net-RabbitMQ-0.2.0/amqp.h 2011-04-19 22:56:50.000000000 -0400 --- ./amqp.h 2011-05-12 11:30:33.000000000 -0400 *************** *** 26,31 **** --- 26,32 ---- typedef struct amqp_table_t_ { int num_entries; + int size; struct amqp_table_entry_t_ *entries; } amqp_table_t; diff -c /Users/awick/Downloads/Net-RabbitMQ-0.2.0/amqp_table.c ./amqp_table.c *** /Users/awick/Downloads/Net-RabbitMQ-0.2.0/amqp_table.c 2011-04-19 22:56:50.000000000 -0400 --- ./amqp_table.c 2011-05-12 13:14:18.000000000 -0400 *************** *** 89,94 **** --- 89,95 ---- output->num_entries = num_entries; output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t)); + output->size = num_entries; memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); *offsetptr = offset; *************** *** 168,170 **** --- 169,210 ---- return p1->key.len - p2->key.len; } + + void amqp_create_table(amqp_connection_state_t state, + amqp_table_t *output, + int initialSize) { + + output->entries = amqp_pool_alloc(&state->frame_pool, initialSize * sizeof(amqp_table_entry_t)); + output->size = initialSize; + output->num_entries = 0; + } + + void amqp_table_add_string(amqp_connection_state_t state, + amqp_table_t *output, + amqp_bytes_t key, + amqp_bytes_t value) + { + + if (output->num_entries == output->size) + return; + + output->entries[output->num_entries].kind = 'S'; + output->entries[output->num_entries].key = key; + output->entries[output->num_entries].value.bytes = value; + output->num_entries++; + } + + void amqp_table_add_int(amqp_connection_state_t state, + amqp_table_t *output, + amqp_bytes_t key, + int value) + { + + if (output->num_entries == output->size) + return; + + output->entries[output->num_entries].kind = 'I'; + output->entries[output->num_entries].key = key; + output->entries[output->num_entries].value.i32 = value; + output->num_entries++; + }
From: Aaron Schrab
On Thu May 12 22:26:52 2011, andywick wrote: Show quoted text
> First attempt at a patch to setting feature
Nice, I've taken that and extended it to also support receiving headers, better handle header values with overloading or magic, and allow more than 10 headers to be set. I've also added some tests of this functionality. The attached patch is based on a source tree where the previous patch was applied on top of my patch for #67794 fixing the tests. The changes are available as finer-grained commits from my github repository: https://github.com/aschrab/perl-Net--RabbitMQ/commits/headers
Subject: headers.patch
diff --git a/META.yml b/META.yml index 31a67b5..95a190d 100644 --- a/META.yml +++ b/META.yml @@ -10,9 +10,11 @@ configure_requires: ExtUtils::MakeMaker: 0 build_requires: ExtUtils::MakeMaker: 0 + Variable::Magic: 0 requires: ExtUtils::MakeMaker: 6.03 Test::More: 0 + Scalar::Util: 0 no_index: directory: - t diff --git a/RabbitMQ.pm b/RabbitMQ.pm index 608ab6d..6e147a4 100644 --- a/RabbitMQ.pm +++ b/RabbitMQ.pm @@ -8,6 +8,7 @@ $VERSION = "0.2.0"; @ISA = qw/DynaLoader/; bootstrap Net::RabbitMQ $VERSION ; +use Scalar::Util qw(blessed); =head1 NAME @@ -317,4 +318,21 @@ C<$subroutine> is a perl coderef that takes two arguments: =cut +sub publish { + my ($self, $channel, $routing_key, $body, $options, $props) = @_; + + $options ||= {}; + $props ||= {}; + + # Do a shallow clone to avoid modifying variable passed by caller + $props = { %$props }; + + # Convert blessed variables in headers to strings + if( $props->{headers} ) { + $props->{headers} = { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } }; + } + + $self->_publish($channel, $routing_key, $body, $options, $props); +} + 1; diff --git a/RabbitMQ.xs b/RabbitMQ.xs index 8805998..54a5afa 100644 --- a/RabbitMQ.xs +++ b/RabbitMQ.xs @@ -171,7 +171,29 @@ int internal_recv(HV *RETVAL, amqp_connection_state_t conn, int piggyback) { hv_store(props, "timestamp", strlen("timestamp"), newSViv(p->timestamp), 0); } - + if (p->_flags & AMQP_BASIC_HEADERS_FLAG) { + int i; + SV *val; + HV *headers = newHV(); + hv_store( props, "headers", strlen("headers"), newRV_noinc((SV *)headers), 0 ); + + for( i=0; i < p->headers.num_entries; ++i ) { + if( p->headers.entries[i].kind == 'I' ) { + hv_store( headers, + p->headers.entries[i].key.bytes, p->headers.entries[i].key.len, + newSViv(p->headers.entries[i].value.i32), + 0 + ); + } + else if( p->headers.entries[i].kind == 'S' ) { + hv_store( headers, + p->headers.entries[i].key.bytes, p->headers.entries[i].key.len, + newSVpvn( p->headers.entries[i].value.bytes.bytes, p->headers.entries[i].value.bytes.len), + 0 + ); + } + } + } body_target = frame.payload.properties.body_size; body_received = 0; @@ -467,7 +489,7 @@ net_rabbitmq_purge(conn, channel, queuename, no_wait = 0) die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Purging queue"); int -net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = NULL) +net_rabbitmq__publish(conn, channel, routing_key, body, options = NULL, props = NULL) Net::RabbitMQ conn int channel HV *options; @@ -546,7 +568,6 @@ net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = N properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG; } if (NULL != (v = hv_fetch(props, "headers", strlen("headers"), 0))) { - amqp_create_table(conn, &properties.headers, 10); HV *headers; HE *he; I32 iter; @@ -555,14 +576,21 @@ net_rabbitmq_publish(conn, channel, routing_key, body, options = NULL, props = N SV *value; headers = (HV *)SvRV(*v); + amqp_create_table(conn, &properties.headers, HvKEYS(headers)); hv_iterinit(headers); while (NULL != (he = hv_iternext(headers))) { key = hv_iterkey(he, &retlen); value = hv_iterval(headers, he); - if (SvTYPE(value) == SVt_PV) { + + if (SvGMAGICAL(value)) + mg_get(value); + + if (SvPOK(value)) { amqp_table_add_string(conn, &properties.headers, amqp_cstring_bytes(key), amqp_cstring_bytes(SvPV_nolen(value))); - } else if (SvTYPE(value) == SVt_IV) { + } else if (SvIOK(value)) { amqp_table_add_int(conn, &properties.headers, amqp_cstring_bytes(key), (uint64_t) SvIV(value)); + } else { + Perl_croak( aTHX_ "Unsupported SvType for header value: %d", SvTYPE(value) ); } } properties._flags |= AMQP_BASIC_HEADERS_FLAG; diff --git a/amqp_table.c b/amqp_table.c index f818007..bfbd17a 100644 --- a/amqp_table.c +++ b/amqp_table.c @@ -179,19 +179,35 @@ void amqp_create_table(amqp_connection_state_t state, output->num_entries = 0; } +amqp_table_entry_t *amqp_table_add_entry( amqp_connection_state_t state, + amqp_table_t *table, + amqp_bytes_t key) +{ + amqp_table_entry_t *entry; + + if (table->num_entries == table->size) + { + int new_size = table->size * 2; + amqp_table_entry_t *new_entries = amqp_pool_alloc( &state->frame_pool, new_size * sizeof(amqp_table_entry_t) ); + memcpy( new_entries, table->entries, table->size * sizeof(amqp_table_entry_t) ); + table->size = new_size; + table->entries = new_entries; + } + + entry = &table->entries[table->num_entries]; + table->num_entries++; + entry->key = key; + return entry; +} + void amqp_table_add_string(amqp_connection_state_t state, amqp_table_t *output, amqp_bytes_t key, amqp_bytes_t value) { - - if (output->num_entries == output->size) - return; - - output->entries[output->num_entries].kind = 'S'; - output->entries[output->num_entries].key = key; - output->entries[output->num_entries].value.bytes = value; - output->num_entries++; + amqp_table_entry_t *entry = amqp_table_add_entry( state, output, key ); + entry->kind = 'S'; + entry->value.bytes = value; } void amqp_table_add_int(amqp_connection_state_t state, @@ -199,12 +215,7 @@ void amqp_table_add_int(amqp_connection_state_t state, amqp_bytes_t key, int value) { - - if (output->num_entries == output->size) - return; - - output->entries[output->num_entries].kind = 'I'; - output->entries[output->num_entries].key = key; - output->entries[output->num_entries].value.i32 = value; - output->num_entries++; + amqp_table_entry_t *entry = amqp_table_add_entry( state, output, key ); + entry->kind = 'I'; + entry->value.i32 = value; } diff --git a/t/004_selfconsume.t b/t/004_selfconsume.t index a8da9b4..4741eeb 100644 --- a/t/004_selfconsume.t +++ b/t/004_selfconsume.t @@ -1,4 +1,4 @@ -use Test::More tests => 10; +use Test::More tests => 11; use strict; my $dtag=(unpack("L",pack("N",1)) != 1)?'0100000000000000':'0000000000000001'; @@ -20,6 +20,7 @@ isnt($queuename, '', "queue_declare -> private name"); eval { $mq->queue_bind(1, $queuename, "nr_test_x", "nr_test_q"); }; is($@, '', "queue_bind"); eval { $mq->publish(1, "nr_test_q", "Magic Transient Payload", { exchange => "nr_test_x" }); }; +is($@, '', "publish"); eval { $mq->consume(1, $queuename, {consumer_tag=>'ctag', no_local=>0,no_ack=>1,exclusive=>0}); }; is($@, '', "consume"); diff --git a/t/013_headers.t b/t/013_headers.t new file mode 100755 index 0000000..1d51553 --- /dev/null +++ b/t/013_headers.t @@ -0,0 +1,117 @@ +use Test::More tests => 20; +use strict; + +package TestBlessings; +use overload + '""' => sub { uc ${$_[0]} }, + ; + +sub new { + my ($class, $self) = @_; + + bless \$self, $class; +} + +package main; + +my $host = $ENV{'MQHOST'} || "dev.rabbitmq.com"; +$host = 'localhost'; # FIXME + +use_ok('Net::RabbitMQ'); + +my $mq = Net::RabbitMQ->new(); +ok($mq, "Created object"); + +eval { $mq->connect($host, { user => "guest", password => "guest" }); }; +is($@, '', "connect"); + +eval { $mq->channel_open(1); }; +is($@, '', "channel_open"); + +eval { $mq->queue_declare(1, "nr_test_hole", { passive => 0, durable => 1, exclusive => 0, auto_delete => 0 }); }; +is($@, '', "queue_declare"); + +eval { $mq->queue_bind(1, "nr_test_hole", "nr_test_x", "nr_test_route"); }; +is($@, '', "queue_bind"); + +eval { 1 while($mq->get(1, "nr_test_hole")); }; +is($@, '', "drain queue"); + +my $headers = { + abc => 123, + def => 'xyx', + head3 => 3, + head4 => 4, + head5 => 5, + head6 => 6, + head7 => 7, + head8 => 8, + head9 => 9, + head10 => 10, + head11 => 11, + head12 => 12, +}; +eval { $mq->publish( 1, "nr_test_route", "Header Test", + { exchange => "nr_test_x" }, + { headers => $headers }, + ); +}; + +is( $@, '', "publish" ); + +eval { $mq->consume(1, "nr_test_hole", {consumer_tag=>'ctag', no_local=>0,no_ack=>1,exclusive=>0}); }; +is($@, '', "consume"); + +my $msg; +eval { $msg = $mq->recv() }; +is( $@, '', 'recv' ); + +is( $msg->{body}, 'Header Test', "Received body" ); +use Data::Dumper; +diag Dumper($msg); +is( exists $msg->{props}, 1, "Props exist" ); +is( exists $msg->{props}{headers}, 1, "Headers exist" ); +is_deeply( $msg->{props}{headers}, $headers, "Received headers" ); + +$headers = { + blah => TestBlessings->new('foo'), +}; +eval { $mq->publish( 1, "nr_test_route", "Header Test", + { exchange => "nr_test_x" }, + { headers => $headers }, + ); +}; +is( $@, '', 'publish with blessed header values' ); + +eval { $msg = $mq->recv() }; +is( $@, '', 'recv from blessed header values' ); + +is_deeply( $msg->{props}{headers}, $headers, "Received blessed headers" ); + +SKIP: { + use Variable::Magic qw(wizard cast); + + # Currently not able to get Variable::Magic to work without requiring it + #skip "Variable::Magic not available", 3 if $@; + + my $wizard = wizard + set => sub { }, + ; + my $magic = 'foo'; + cast $magic, $wizard; + my $headers = { blah => $magic, }; + diag Dumper($headers); + + eval { $mq->publish( 1, "nr_test_route", "Header Test", + { exchange => "nr_test_x" }, + { headers => $headers }, + ); + }; + is( $@, '', 'publish with magic header values' ); + + skip "Publish failed", 2 if $@; + eval { $msg = $mq->recv() }; + is( $@, '', 'recv from magic header values' ); + + is_deeply( $msg->{props}{headers}, $headers, "Received magic headers" ); +};