Function amqp_basic_publish

Summary

#include <librabbitmq/amqp.h>

(1) int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body)

#include <librabbitmq/amqp_api.c>

(2) int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, amqp_basic_properties_t const *properties, amqp_bytes_t body)

Function overload

Synopsis

#include <librabbitmq/amqp.h>

int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body)

Description

Publish a message to the broker

Publish a message on an exchange with a routing key.

Note that at the AMQ protocol level basic.publish is an async method: this means error conditions that occur on the broker (such as publishing to a non-existent exchange) will not be reflected in the return value of this function.

Parameters:

[ in ] state - the connection object

[ in ] channel - the channel identifier

[ in ] exchange - the exchange on the broker to publish to

[ in ] routing_key - the routing key to use when publishing the message

[ in ] mandatory - indicate to the broker that the message MUST be routed to a queue. If the broker cannot do this it should respond with a basic.return method.

[ in ] immediate - indicate to the broker that the message MUST be delivered to a consumer immediately. If the broker cannot do this it should respond with a basic.return method.

[ in ] properties - the properties associated with the message

[ in ] body - the message body

Return
AMQP_STATUS_OK on success, amqp_status_enum value on failure. Note that basic.publish is an async method, the return value from this function only indicates that the message data was successfully transmitted to the broker. It does not indicate failures that occur on the broker, such as publishing to a non-existent exchange. Possible error values:
Note: this function does heartbeat processing as of v0.4.0
Since
v0.1

Mentioned in

Source

Line 1910 in librabbitmq/amqp.h.

Synopsis

#include <librabbitmq/amqp_api.c>

int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, amqp_basic_properties_t const *properties, amqp_bytes_t body)

Description

No description yet.

Mentioned in

Source

Lines 190-279 in librabbitmq/amqp_api.c.

int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
                       amqp_bytes_t exchange, amqp_bytes_t routing_key,
                       amqp_boolean_t mandatory, amqp_boolean_t immediate,
                       amqp_basic_properties_t const *properties,
                       amqp_bytes_t body) {
  amqp_frame_t f;
  size_t body_offset;
  size_t usable_body_payload_size =
      state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
  int res;
  int flagz;

  amqp_basic_publish_t m;
  amqp_basic_properties_t default_properties;

  m.exchange = exchange;
  m.routing_key = routing_key;
  m.mandatory = mandatory;
  m.immediate = immediate;
  m.ticket = 0;

  /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
   * should really be done in amqp_try_send/writev */
  res = amqp_time_has_past(state->next_recv_heartbeat);
  if (AMQP_STATUS_TIMER_FAILURE == res) {
    return res;
  } else if (AMQP_STATUS_TIMEOUT == res) {
    res = amqp_try_recv(state);
    if (AMQP_STATUS_TIMEOUT == res) {
      return AMQP_STATUS_HEARTBEAT_TIMEOUT;
    } else if (AMQP_STATUS_OK != res) {
      return res;
    }
  }

  res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
                               AMQP_SF_MORE, amqp_time_infinite());
  if (res < 0) {
    return res;
  }

  if (properties == NULL) {
    memset(&default_properties, 0, sizeof(default_properties));
    properties = &default_properties;
  }

  f.frame_type = AMQP_FRAME_HEADER;
  f.channel = channel;
  f.payload.properties.class_id = AMQP_BASIC_CLASS;
  f.payload.properties.body_size = body.len;
  f.payload.properties.decoded = (void *)properties;

  if (body.len > 0) {
    flagz = AMQP_SF_MORE;
  } else {
    flagz = AMQP_SF_NONE;
  }
  res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
  if (res < 0) {
    return res;
  }

  body_offset = 0;
  while (body_offset < body.len) {
    size_t remaining = body.len - body_offset;

    if (remaining == 0) {
      break;
    }

    f.frame_type = AMQP_FRAME_BODY;
    f.channel = channel;
    f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
    if (remaining >= usable_body_payload_size) {
      f.payload.body_fragment.len = usable_body_payload_size;
      flagz = AMQP_SF_MORE;
    } else {
      f.payload.body_fragment.len = remaining;
      flagz = AMQP_SF_NONE;
    }

    body_offset += f.payload.body_fragment.len;
    res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
    if (res < 0) {
      return res;
    }
  }

  return AMQP_STATUS_OK;
}





Add Discussion as Guest

Log in to DocsForge