00001 #ifndef PROTON_MESSENGER_H 00002 #define PROTON_MESSENGER_H 1 00003 00004 /* 00005 * 00006 * Licensed to the Apache Software Foundation (ASF) under one 00007 * or more contributor license agreements. See the NOTICE file 00008 * distributed with this work for additional information 00009 * regarding copyright ownership. The ASF licenses this file 00010 * to you under the Apache License, Version 2.0 (the 00011 * "License"); you may not use this file except in compliance 00012 * with the License. You may obtain a copy of the License at 00013 * 00014 * http://www.apache.org/licenses/LICENSE-2.0 00015 * 00016 * Unless required by applicable law or agreed to in writing, 00017 * software distributed under the License is distributed on an 00018 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 00019 * KIND, either express or implied. See the License for the 00020 * specific language governing permissions and limitations 00021 * under the License. 00022 * 00023 */ 00024 00025 #include <proton/import_export.h> 00026 #include <proton/message.h> 00027 #include <proton/selectable.h> 00028 00029 #ifdef __cplusplus 00030 extern "C" { 00031 #endif 00032 00033 /** 00034 * @file 00035 * 00036 * The messenger API provides a high level interface for sending and 00037 * receiving AMQP messages. 00038 * 00039 * @defgroup messenger Messenger 00040 * @{ 00041 */ 00042 00043 /** 00044 * A ::pn_messenger_t provides a high level interface for sending and 00045 * receiving messages (See ::pn_message_t). 00046 * 00047 * Every messenger contains a single logical queue of incoming 00048 * messages and a single logical queue of outgoing messages. The 00049 * messages in these queues may be destined for, or originate from, a 00050 * variety of addresses. 00051 * 00052 * The messenger interface is single-threaded. All methods except one 00053 * (::pn_messenger_interrupt()) are intended to be used by one thread 00054 * at a time. 00055 * 00056 * 00057 * Address Syntax 00058 * ============== 00059 * 00060 * An address has the following form:: 00061 * 00062 * [ amqp[s]:// ] [user[:password]@] domain [/[name]] 00063 * 00064 * Where domain can be one of:: 00065 * 00066 * host | host:port | ip | ip:port | name 00067 * 00068 * The following are valid examples of addresses: 00069 * 00070 * - example.org 00071 * - example.org:1234 00072 * - amqp://example.org 00073 * - amqps://example.org 00074 * - example.org/incoming 00075 * - amqps://example.org/outgoing 00076 * - amqps://fred:trustno1@example.org 00077 * - 127.0.0.1:1234 00078 * - amqps://127.0.0.1:1234 00079 * 00080 * Sending & Receiving Messages 00081 * ============================ 00082 * 00083 * The messenger API works in conjuction with the ::pn_message_t API. 00084 * A ::pn_message_t is a mutable holder of message content. 00085 * 00086 * The ::pn_messenger_put() operation copies content from the supplied 00087 * ::pn_message_t to the outgoing queue, and may send queued messages 00088 * if it can do so without blocking. The ::pn_messenger_send() 00089 * operation blocks until it has sent the requested number of 00090 * messages, or until a timeout interrupts the attempt. 00091 * 00092 * 00093 * pn_messenger_t *messenger = pn_messenger(NULL); 00094 * pn_message_t *message = pn_message(); 00095 * char subject[1024]; 00096 * for (int i = 0; i < 3; i++) { 00097 * pn_message_set_address(message, "amqp://host/queue"); 00098 * sprintf(subject, "Hello World! %i", i); 00099 * pn_message_set_subject(message, subject); 00100 * pn_messenger_put(messenger, message) 00101 * pn_messenger_send(messenger); 00102 * 00103 * Similarly, the ::pn_messenger_recv() method receives messages into 00104 * the incoming queue, and may block as it attempts to receive up to 00105 * the requested number of messages, or until the timeout is reached. 00106 * It may receive fewer than the requested number. The 00107 * ::pn_messenger_get() method pops the eldest message off the 00108 * incoming queue and copies its content into the supplied 00109 * ::pn_message_t object. It will not block. 00110 * 00111 * 00112 * pn_messenger_t *messenger = pn_messenger(NULL); 00113 * pn_message_t *message = pn_message() 00114 * pn_messenger_recv(messenger): 00115 * while (pn_messenger_incoming(messenger) > 0) { 00116 * pn_messenger_get(messenger, message); 00117 * printf("%s", message.subject); 00118 * } 00119 * 00120 * Output: 00121 * Hello World 0 00122 * Hello World 1 00123 * Hello World 2 00124 * 00125 * The blocking flag allows you to turn off blocking behavior 00126 * entirely, in which case ::pn_messenger_send() and 00127 * ::pn_messenger_recv() will do whatever they can without blocking, 00128 * and then return. You can then look at the number of incoming and 00129 * outgoing messages to see how much outstanding work still remains. 00130 */ 00131 typedef struct pn_messenger_t pn_messenger_t; 00132 00133 /** 00134 * A subscription is a request for incoming messages. 00135 * 00136 * @todo currently the subscription API is under developed, this 00137 * should allow more explicit control over subscription properties and 00138 * behaviour 00139 */ 00140 typedef struct pn_subscription_t pn_subscription_t; 00141 00142 /** 00143 * Trackers provide a lightweight handle used to track the status of 00144 * incoming and outgoing deliveries. 00145 */ 00146 typedef int64_t pn_tracker_t; 00147 00148 /** 00149 * Describes all the possible states for a message associated with a 00150 * given tracker. 00151 */ 00152 typedef enum { 00153 PN_STATUS_UNKNOWN = 0, /**< The tracker is unknown. */ 00154 PN_STATUS_PENDING = 1, /**< The message is in flight. For outgoing 00155 messages, use ::pn_messenger_buffered to 00156 see if it has been sent or not. */ 00157 PN_STATUS_ACCEPTED = 2, /**< The message was accepted. */ 00158 PN_STATUS_REJECTED = 3, /**< The message was rejected. */ 00159 PN_STATUS_RELEASED = 4, /**< The message was released. */ 00160 PN_STATUS_MODIFIED = 5, /**< The message was modified. */ 00161 PN_STATUS_ABORTED = 6, /**< The message was aborted. */ 00162 PN_STATUS_SETTLED = 7 /**< The remote party has settled the message. */ 00163 } pn_status_t; 00164 00165 /** 00166 * Construct a new ::pn_messenger_t with the given name. The name is 00167 * global. If a NULL name is supplied, a UUID based name will be 00168 * chosen. 00169 * 00170 * @param[in] name the name of the messenger or NULL 00171 * 00172 * @return pointer to a new ::pn_messenger_t 00173 */ 00174 PN_EXTERN pn_messenger_t *pn_messenger(const char *name); 00175 00176 /** 00177 * Get the name of a messenger. 00178 * 00179 * @param[in] messenger a messenger object 00180 * @return the name of the messenger 00181 */ 00182 PN_EXTERN const char *pn_messenger_name(pn_messenger_t *messenger); 00183 00184 /** 00185 * Sets the path that will be used to get the certificate that will be 00186 * used to identify this messenger to its peers. The validity of the 00187 * path is not checked by this function. 00188 * 00189 * @param[in] messenger the messenger 00190 * @param[in] certificate a path to a certificate file 00191 * @return an error code of zero if there is no error 00192 */ 00193 PN_EXTERN int pn_messenger_set_certificate(pn_messenger_t *messenger, const char *certificate); 00194 00195 /** 00196 * Get the certificate path. This value may be set by 00197 * pn_messenger_set_certificate. The default certificate path is null. 00198 * 00199 * @param[in] messenger the messenger 00200 * @return the certificate file path 00201 */ 00202 PN_EXTERN const char *pn_messenger_get_certificate(pn_messenger_t *messenger); 00203 00204 /** 00205 * Set path to the private key that was used to sign the certificate. 00206 * See ::pn_messenger_set_certificate 00207 * 00208 * @param[in] messenger a messenger object 00209 * @param[in] private_key a path to a private key file 00210 * @return an error code of zero if there is no error 00211 */ 00212 PN_EXTERN int pn_messenger_set_private_key(pn_messenger_t *messenger, const char *private_key); 00213 00214 /** 00215 * Gets the private key file for a messenger. 00216 * 00217 * @param[in] messenger a messenger object 00218 * @return the messenger's private key file path 00219 */ 00220 PN_EXTERN const char *pn_messenger_get_private_key(pn_messenger_t *messenger); 00221 00222 /** 00223 * Sets the private key password for a messenger. 00224 * 00225 * @param[in] messenger a messenger object 00226 * @param[in] password the password for the private key file 00227 * 00228 * @return an error code of zero if there is no error 00229 */ 00230 PN_EXTERN int pn_messenger_set_password(pn_messenger_t *messenger, const char *password); 00231 00232 /** 00233 * Gets the private key file password for a messenger. 00234 * 00235 * @param[in] messenger a messenger object 00236 * @return password for the private key file 00237 */ 00238 PN_EXTERN const char *pn_messenger_get_password(pn_messenger_t *messenger); 00239 00240 /** 00241 * Sets the trusted certificates database for a messenger. 00242 * 00243 * The messenger will use this database to validate the certificate 00244 * provided by the peer. 00245 * 00246 * @param[in] messenger a messenger object 00247 * @param[in] cert_db a path to the certificates database 00248 * 00249 * @return an error code of zero if there is no error 00250 */ 00251 PN_EXTERN int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const char *cert_db); 00252 00253 /** 00254 * Gets the trusted certificates database for a messenger. 00255 * 00256 * @param[in] messenger a messenger object 00257 * @return path to the trusted certificates database 00258 */ 00259 PN_EXTERN const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger); 00260 00261 /** 00262 * Set the default timeout for a messenger. 00263 * 00264 * Any messenger call that blocks during execution will stop blocking 00265 * and return control when this timeout is reached, if you have set it 00266 * to a value greater than zero. The timeout is expressed in 00267 * milliseconds. 00268 * 00269 * @param[in] messenger a messenger object 00270 * @param[in] timeout a new timeout for the messenger, in milliseconds 00271 * @return an error code or zero if there is no error 00272 */ 00273 PN_EXTERN int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout); 00274 00275 /** 00276 * Gets the timeout for a messenger object. 00277 * 00278 * See ::pn_messenger_set_timeout() for details. 00279 * 00280 * @param[in] messenger a messenger object 00281 * @return the timeout for the messenger, in milliseconds 00282 */ 00283 PN_EXTERN int pn_messenger_get_timeout(pn_messenger_t *messenger); 00284 00285 /** 00286 * Check if a messenger is in blocking mode. 00287 * 00288 * @param[in] messenger a messenger object 00289 * @return true if blocking has been enabled, false otherwise 00290 */ 00291 PN_EXTERN bool pn_messenger_is_blocking(pn_messenger_t *messenger); 00292 00293 /** 00294 * Enable or disable blocking behavior for a messenger during calls to 00295 * ::pn_messenger_send and ::pn_messenger_recv. 00296 * 00297 * @param[in] messenger a messenger object 00298 * @param[in] blocking the value of the blocking flag 00299 * @return an error code or zero if there is no error 00300 */ 00301 PN_EXTERN int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking); 00302 00303 /** 00304 * Check if a messenger is in passive mode. 00305 * 00306 * A messenger that is in passive mode will never attempt to perform 00307 * I/O internally, but instead will make all internal file descriptors 00308 * accessible through ::pn_messenger_selectable() to be serviced 00309 * externally. This can be useful for integrating messenger into an 00310 * external event loop. 00311 * 00312 * @param[in] messenger a messenger object 00313 * @return true if the messenger is in passive mode, false otherwise 00314 */ 00315 PN_EXTERN bool pn_messenger_is_passive(pn_messenger_t *messenger); 00316 00317 /** 00318 * Set the passive mode for a messenger. 00319 * 00320 * See ::pn_messenger_is_passive() for details on passive mode. 00321 * 00322 * @param[in] messenger a messenger object 00323 * @param[in] passive true to enable passive mode, false to disable 00324 * passive mode 00325 * @return an error code or zero on success 00326 */ 00327 PN_EXTERN int pn_messenger_set_passive(pn_messenger_t *messenger, bool passive); 00328 00329 /** Frees a Messenger. 00330 * 00331 * @param[in] messenger the messenger to free (or NULL), no longer 00332 * valid on return 00333 */ 00334 PN_EXTERN void pn_messenger_free(pn_messenger_t *messenger); 00335 00336 /** 00337 * Get the code for a messenger's most recent error. 00338 * 00339 * The error code is initialized to zero at messenger creation. The 00340 * error number is "sticky" i.e. error codes are not reset to 0 at the 00341 * end of successful API calls. You can use ::pn_messenger_error to 00342 * access the messenger's error object and clear explicitly if 00343 * desired. 00344 * 00345 * @param[in] messenger the messenger to check for errors 00346 * @return an error code or zero if there is no error 00347 * @see error.h 00348 */ 00349 PN_EXTERN int pn_messenger_errno(pn_messenger_t *messenger); 00350 00351 /** 00352 * Get a messenger's error object. 00353 * 00354 * Returns a pointer to a pn_error_t that is valid until the messenger 00355 * is freed. The pn_error_* API allows you to access the text, error 00356 * number, and lets you set or clear the error code explicitly. 00357 * 00358 * @param[in] messenger the messenger to check for errors 00359 * @return a pointer to the messenger's error descriptor 00360 * @see error.h 00361 */ 00362 PN_EXTERN pn_error_t *pn_messenger_error(pn_messenger_t *messenger); 00363 00364 /** 00365 * Get the size of a messenger's outgoing window. 00366 * 00367 * The size of the outgoing window limits the number of messages whose 00368 * status you can check with a tracker. A message enters this window 00369 * when you call pn_messenger_put on the message. For example, if your 00370 * outgoing window size is 10, and you call pn_messenger_put 12 times, 00371 * new status information will no longer be available for the first 2 00372 * messages. 00373 * 00374 * The default outgoing window size is 0. 00375 * 00376 * @param[in] messenger a messenger object 00377 * @return the outgoing window for the messenger 00378 */ 00379 PN_EXTERN int pn_messenger_get_outgoing_window(pn_messenger_t *messenger); 00380 00381 /** 00382 * Set the size of a messenger's outgoing window. 00383 * 00384 * See ::pn_messenger_get_outgoing_window() for details. 00385 * 00386 * @param[in] messenger a messenger object 00387 * @param[in] window the number of deliveries to track 00388 * @return an error or zero on success 00389 * @see error.h 00390 */ 00391 PN_EXTERN int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window); 00392 00393 /** 00394 * Get the size of a messenger's incoming window. 00395 * 00396 * The size of a messenger's incoming window limits the number of 00397 * messages that can be accepted or rejected using trackers. Messages 00398 * *do not* enter this window when they have been received 00399 * (::pn_messenger_recv) onto you incoming queue. Messages only enter 00400 * this window only when you access them using pn_messenger_get. If 00401 * your incoming window size is N, and you get N+1 messages without 00402 * explicitly accepting or rejecting the oldest message, then it will 00403 * be implicitly accepted when it falls off the edge of the incoming 00404 * window. 00405 * 00406 * The default incoming window size is 0. 00407 * 00408 * @param[in] messenger a messenger object 00409 * @return the incoming window for the messenger 00410 */ 00411 PN_EXTERN int pn_messenger_get_incoming_window(pn_messenger_t *messenger); 00412 00413 /** 00414 * Set the size of a messenger's incoming window. 00415 * 00416 * See ::pn_messenger_get_incoming_window() for details. 00417 * 00418 * @param[in] messenger a messenger object 00419 * @param[in] window the number of deliveries to track 00420 * @return an error or zero on success 00421 * @see error.h 00422 */ 00423 PN_EXTERN int pn_messenger_set_incoming_window(pn_messenger_t *messenger, 00424 int window); 00425 00426 /** 00427 * Currently a no-op placeholder. For future compatibility, do not 00428 * send or receive messages before starting the messenger. 00429 * 00430 * @param[in] messenger the messenger to start 00431 * @return an error code or zero on success 00432 * @see error.h 00433 */ 00434 PN_EXTERN int pn_messenger_start(pn_messenger_t *messenger); 00435 00436 /** 00437 * Stops a messenger. 00438 * 00439 * Stopping a messenger will perform an orderly shutdown of all 00440 * underlying connections. This may require some time. If the 00441 * messenger is in non blocking mode (see ::pn_messenger_is_blocking), 00442 * this operation will return PN_INPROGRESS if it cannot finish 00443 * immediately. In that case, you can use ::pn_messenger_stopped() to 00444 * determine when the messenger has finished stopping. 00445 * 00446 * @param[in] messenger the messenger to stop 00447 * @return an error code or zero on success 00448 * @see error.h 00449 */ 00450 PN_EXTERN int pn_messenger_stop(pn_messenger_t *messenger); 00451 00452 /** 00453 * Returns true if a messenger is in the stopped state. This function 00454 * does not block. 00455 * 00456 * @param[in] messenger the messenger to stop 00457 * 00458 */ 00459 PN_EXTERN bool pn_messenger_stopped(pn_messenger_t *messenger); 00460 00461 /** 00462 * Subscribes a messenger to messages from the specified source. 00463 * 00464 * @param[in] messenger the messenger to subscribe 00465 * @param[in] source 00466 * @return a subscription 00467 */ 00468 PN_EXTERN pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source); 00469 00470 /** 00471 * Get a subscription's application context. 00472 * 00473 * See ::pn_subscription_set_context(). 00474 * 00475 * @param[in] sub a subscription object 00476 * @return the subscription's application context 00477 */ 00478 PN_EXTERN void *pn_subscription_get_context(pn_subscription_t *sub); 00479 00480 /** 00481 * Set an application context for a subscription. 00482 * 00483 * @param[in] sub a subscription object 00484 * @param[in] context the application context for the subscription 00485 */ 00486 PN_EXTERN void pn_subscription_set_context(pn_subscription_t *sub, void *context); 00487 00488 /** 00489 * Get the source address of a subscription. 00490 * 00491 * @param[in] sub a subscription object 00492 * @return the subscription's source address 00493 */ 00494 PN_EXTERN const char *pn_subscription_address(pn_subscription_t *sub); 00495 00496 /** 00497 * Puts a message onto the messenger's outgoing queue. The message may 00498 * also be sent if transmission would not cause blocking. This call 00499 * will not block. 00500 * 00501 * @param[in] messenger a messenger object 00502 * @param[in] msg a message to put on the messenger's outgoing queue 00503 * @return an error code or zero on success 00504 * @see error.h 00505 */ 00506 PN_EXTERN int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg); 00507 00508 /** 00509 * Track the status of a delivery. 00510 * 00511 * Get the current status of the delivery associated with the supplied 00512 * tracker. This may return PN_STATUS_UNKOWN if the tracker has fallen 00513 * outside the incoming/outgoing tracking windows of the messenger. 00514 * 00515 * @param[in] messenger the messenger 00516 * @param[in] tracker the tracker identifying the delivery 00517 * @return a status code for the delivery 00518 */ 00519 PN_EXTERN pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker); 00520 00521 /** 00522 * Check if the delivery associated with a given tracker is still 00523 * waiting to be sent. 00524 * 00525 * Note that returning false does not imply that the delivery was 00526 * actually sent over the wire. 00527 * 00528 * @param[in] messenger the messenger 00529 * @param[in] tracker the tracker identifying the delivery 00530 * 00531 * @return true if the delivery is still buffered 00532 */ 00533 PN_EXTERN bool pn_messenger_buffered(pn_messenger_t *messenger, pn_tracker_t tracker); 00534 00535 /** 00536 * Frees a Messenger from tracking the status associated with a given 00537 * tracker. Use the PN_CUMULATIVE flag to indicate everything up to 00538 * (and including) the given tracker. 00539 * 00540 * @param[in] messenger the Messenger 00541 * @param[in] tracker identifies a delivery 00542 * @param[in] flags 0 or PN_CUMULATIVE 00543 * 00544 * @return an error code or zero on success 00545 * @see error.h 00546 */ 00547 PN_EXTERN int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00548 00549 /** 00550 * Get a tracker for the outgoing message most recently given to 00551 * pn_messenger_put. 00552 * 00553 * This tracker may be used with pn_messenger_status to determine the 00554 * delivery status of the message, as long as the message is still 00555 * within your outgoing window. 00556 * 00557 * @param[in] messenger the messenger 00558 * 00559 * @return a pn_tracker_t or an undefined value if pn_messenger_get 00560 * has never been called for the given messenger 00561 */ 00562 PN_EXTERN pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger); 00563 00564 /** 00565 * Sends or receives any outstanding messages queued for a messenger. 00566 * This will block for the indicated timeout. 00567 * 00568 * @param[in] messenger the Messenger 00569 * @param[in] timeout the maximum time to block in milliseconds, -1 == 00570 * forever, 0 == do not block 00571 * 00572 * @return 0 if no work to do, < 0 if error, or 1 if work was done. 00573 */ 00574 PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout); 00575 00576 /** 00577 * Interrupt a messenger object that may be blocking in another 00578 * thread. 00579 * 00580 * The messenger interface is single-threaded. This is the only 00581 * messenger function intended to be concurrently called from another 00582 * thread. It will interrupt any messenger function which is currently 00583 * blocking and cause it to return with a status of ::PN_INTR. 00584 * 00585 * @param[in] messenger the Messenger to interrupt 00586 */ 00587 PN_EXTERN int pn_messenger_interrupt(pn_messenger_t *messenger); 00588 00589 /** 00590 * Send messages from a messenger's outgoing queue. 00591 * 00592 * If a messenger is in blocking mode (see 00593 * ::pn_messenger_is_blocking()), this operation will block until N 00594 * messages have been sent from the outgoing queue. A value of -1 for 00595 * N means "all messages in the outgoing queue". See below for a full 00596 * definition of what sent from the outgoing queue means. 00597 * 00598 * Any blocking will end once the messenger's configured timeout (if 00599 * any) has been reached. When this happens an error code of 00600 * ::PN_TIMEOUT is returned. 00601 * 00602 * If the messenger is in non blocking mode, this call will return an 00603 * error code of ::PN_INPROGRESS if it is unable to send the requested 00604 * number of messages without blocking. 00605 * 00606 * A message is considered to be sent from the outgoing queue when its 00607 * status has been fully determined. This does not necessarily mean 00608 * the message was successfully sent to the final recipient though, 00609 * for example of the receiver rejects the message, the final status 00610 * will be ::PN_STATUS_REJECTED. Similarly, if a message is sent to an 00611 * invalid address, it may be removed from the outgoing queue without 00612 * ever even being transmitted. In this case the final status will be 00613 * ::PN_STATUS_ABORTED. 00614 * 00615 * @param[in] messenger a messenger object 00616 * @param[in] n the number of messages to send 00617 * 00618 * @return an error code or zero on success 00619 * @see error.h 00620 */ 00621 PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n); 00622 00623 /** 00624 * Retrieve messages into a messenger's incoming queue. 00625 * 00626 * Instructs a messenger to receive up to @c limit messages into the 00627 * incoming message queue of a messenger. If @c limit is -1, the 00628 * messenger will receive as many messages as it can buffer 00629 * internally. If the messenger is in blocking mode, this call will 00630 * block until at least one message is available in the incoming 00631 * queue. 00632 * 00633 * Each call to pn_messenger_recv replaces the previous receive 00634 * operation, so pn_messenger_recv(messenger, 0) will cancel any 00635 * outstanding receive. 00636 * 00637 * After receiving messages onto your incoming queue use 00638 * ::pn_messenger_get() to access message content. 00639 * 00640 * @param[in] messenger the messenger 00641 * @param[in] limit the maximum number of messages to receive or -1 to 00642 * to receive as many messages as it can buffer 00643 * internally. 00644 * @return an error code or zero on success 00645 * @see error.h 00646 */ 00647 PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit); 00648 00649 /** 00650 * Get the capacity of the incoming message queue of a messenger. 00651 * 00652 * Note this count does not include those messages already available 00653 * on the incoming queue (@see pn_messenger_incoming()). Rather it 00654 * returns the number of incoming queue entries available for 00655 * receiving messages. 00656 * 00657 * @param[in] messenger the messenger 00658 */ 00659 PN_EXTERN int pn_messenger_receiving(pn_messenger_t *messenger); 00660 00661 /** 00662 * Get the next message from the head of a messenger's incoming queue. 00663 * 00664 * The get operation copies the message data from the head of the 00665 * messenger's incoming queue into the provided ::pn_message_t object. 00666 * If provided ::pn_message_t pointer is NULL, the head essage will be 00667 * discarded. This operation will return ::PN_EOS if there are no 00668 * messages left on the incoming queue. 00669 * 00670 * @param[in] messenger a messenger object 00671 * @param[out] message upon return contains the message from the head of the queue 00672 * @return an error code or zero on success 00673 * @see error.h 00674 */ 00675 PN_EXTERN int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *message); 00676 00677 /** 00678 * Get a tracker for the message most recently retrieved by 00679 * ::pn_messenger_get(). 00680 * 00681 * A tracker for an incoming message allows you to accept or reject 00682 * the associated message. It can also be used for cumulative 00683 * accept/reject operations for the associated message and all prior 00684 * messages as well. 00685 * 00686 * @param[in] messenger a messenger object 00687 * @return a pn_tracker_t or an undefined value if pn_messenger_get 00688 * has never been called for the given messenger 00689 */ 00690 PN_EXTERN pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger); 00691 00692 /** 00693 * Get the subscription of the message most recently retrieved by ::pn_messenger_get(). 00694 * 00695 * This operation will return NULL if ::pn_messenger_get() has never 00696 * been succesfully called. 00697 * 00698 * @param[in] messenger a messenger object 00699 * @return a pn_subscription_t or NULL 00700 */ 00701 PN_EXTERN pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger); 00702 00703 /** 00704 * Indicates that an accept or reject should operate cumulatively. 00705 */ 00706 #define PN_CUMULATIVE (0x1) 00707 00708 /** 00709 * Signal successful processing of message(s). 00710 * 00711 * With no flags this operation will signal the sender that the 00712 * message referenced by the tracker was accepted. If the 00713 * PN_CUMULATIVE flag is set, this operation will also reject all 00714 * pending messages prior to the message indicated by the tracker. 00715 * 00716 * Note that when a message is accepted or rejected multiple times, 00717 * either explicitly, or implicitly through use of the ::PN_CUMULATIVE 00718 * flag, only the first outcome applies. For example if a sequence of 00719 * three messages are received: M1, M2, M3, and M2 is rejected, and M3 00720 * is cumulatively accepted, M2 will remain rejected and only M1 and 00721 * M3 will be considered accepted. 00722 * 00723 * @param[in] messenger a messenger object 00724 * @param[in] tracker an incoming tracker 00725 * @param[in] flags 0 or PN_CUMULATIVE 00726 * @return an error code or zero on success 00727 * @see error.h 00728 */ 00729 PN_EXTERN int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00730 00731 /** 00732 * Signal unsuccessful processing of message(s). 00733 * 00734 * With no flags this operation will signal the sender that the 00735 * message indicated by the tracker was rejected. If the PN_CUMULATIVE 00736 * flag is used this operation will also reject all pending messages 00737 * prior to the message indicated by the tracker. 00738 * 00739 * Note that when a message is accepted or rejected multiple times, 00740 * either explicitly, or implicitly through use of the ::PN_CUMULATIVE 00741 * flag, only the first outcome applies. For example if a sequence of 00742 * three messages are received: M1, M2, M3, and M2 is accepted, and M3 00743 * is cumulatively rejected, M2 will remain accepted and only M1 and 00744 * M3 will be considered rejected. 00745 * 00746 * @param[in] messenger a messenger object 00747 * @param[in] tracker an incoming tracker 00748 * @param[in] flags 0 or PN_CUMULATIVE 00749 * @return an error code or zero on success 00750 * @see error.h 00751 */ 00752 PN_EXTERN int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00753 00754 /** 00755 * Get the number of messages in the outgoing message queue of a 00756 * messenger. 00757 * 00758 * @param[in] messenger a messenger object 00759 * @return the outgoing queue depth 00760 */ 00761 PN_EXTERN int pn_messenger_outgoing(pn_messenger_t *messenger); 00762 00763 /** 00764 * Get the number of messages in the incoming message queue of a messenger. 00765 * 00766 * @param[in] messenger a messenger object 00767 * @return the incoming queue depth 00768 */ 00769 PN_EXTERN int pn_messenger_incoming(pn_messenger_t *messenger); 00770 00771 //! Adds a routing rule to a Messenger's internal routing table. 00772 //! 00773 //! The route procedure may be used to influence how a messenger will 00774 //! internally treat a given address or class of addresses. Every call 00775 //! to the route procedure will result in messenger appending a routing 00776 //! rule to its internal routing table. 00777 //! 00778 //! Whenever a message is presented to a messenger for delivery, it 00779 //! will match the address of this message against the set of routing 00780 //! rules in order. The first rule to match will be triggered, and 00781 //! instead of routing based on the address presented in the message, 00782 //! the messenger will route based on the address supplied in the rule. 00783 //! 00784 //! The pattern matching syntax supports two types of matches, a '%' 00785 //! will match any character except a '/', and a '*' will match any 00786 //! character including a '/'. 00787 //! 00788 //! A routing address is specified as a normal AMQP address, however it 00789 //! may additionally use substitution variables from the pattern match 00790 //! that triggered the rule. 00791 //! 00792 //! Any message sent to "foo" will be routed to "amqp://foo.com": 00793 //! 00794 //! pn_messenger_route("foo", "amqp://foo.com"); 00795 //! 00796 //! Any message sent to "foobar" will be routed to 00797 //! "amqp://foo.com/bar": 00798 //! 00799 //! pn_messenger_route("foobar", "amqp://foo.com/bar"); 00800 //! 00801 //! Any message sent to bar/<path> will be routed to the corresponding 00802 //! path within the amqp://bar.com domain: 00803 //! 00804 //! pn_messenger_route("bar/*", "amqp://bar.com/$1"); 00805 //! 00806 //! Route all messages over TLS: 00807 //! 00808 //! pn_messenger_route("amqp:*", "amqps:$1") 00809 //! 00810 //! Supply credentials for foo.com: 00811 //! 00812 //! pn_messenger_route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 00813 //! 00814 //! Supply credentials for all domains: 00815 //! 00816 //! pn_messenger_route("amqp://*", "amqp://user:password@$1"); 00817 //! 00818 //! Route all addresses through a single proxy while preserving the 00819 //! original destination: 00820 //! 00821 //! pn_messenger_route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 00822 //! 00823 //! Route any address through a single broker: 00824 //! 00825 //! pn_messenger_route("*", "amqp://user:password@broker/$1"); 00826 //! 00827 //! @param[in] messenger the Messenger 00828 //! @param[in] pattern a glob pattern 00829 //! @param[in] address an address indicating alternative routing 00830 //! 00831 //! @return an error code or zero on success 00832 //! @see error.h 00833 PN_EXTERN int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, 00834 const char *address); 00835 00836 /** 00837 * Rewrite message addresses prior to transmission. 00838 * 00839 * This operation is similar to pn_messenger_route, except that the 00840 * destination of the message is determined before the message address 00841 * is rewritten. 00842 * 00843 * The outgoing address is only rewritten after routing has been 00844 * finalized. If a message has an outgoing address of 00845 * "amqp://0.0.0.0:5678", and a rewriting rule that changes its 00846 * outgoing address to "foo", it will still arrive at the peer that 00847 * is listening on "amqp://0.0.0.0:5678", but when it arrives there, 00848 * the receiver will see its outgoing address as "foo". 00849 * 00850 * The default rewrite rule removes username and password from 00851 * addresses before they are transmitted. 00852 * 00853 * @param[in] messenger a messenger object 00854 * @param[in] pattern a glob pattern to select messages 00855 * @param[in] address an address indicating outgoing address rewrite 00856 * @return an error code or zero on success 00857 */ 00858 PN_EXTERN int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern, 00859 const char *address); 00860 00861 /** 00862 * Extract @link pn_selectable_t selectables @endlink from a passive 00863 * messenger. 00864 * 00865 * A messenger that is in passive mode (see 00866 * ::pn_messenger_is_passive()) will never attempt to perform any I/O 00867 * internally, but instead make its internal file descriptors 00868 * available for external processing via the 00869 * ::pn_messenger_selectable() operation. 00870 * 00871 * An application wishing to perform I/O on behalf of a passive 00872 * messenger must extract all available selectables by calling this 00873 * operation until it returns NULL. The ::pn_selectable_t interface 00874 * may then be used by the application to perform I/O outside the 00875 * messenger. 00876 * 00877 * All selectables returned by this operation must be serviced until 00878 * they reach a terminal state and then freed. See 00879 * ::pn_selectable_is_terminal() for more details. 00880 * 00881 * By default any given selectable will only ever be returned once by 00882 * this operation, however if the selectable's registered flag is set 00883 * to true (see ::pn_selectable_set_registered()), then the selectable 00884 * will be returned whenever its interest set may have changed. 00885 * 00886 * @param[in] messenger a messenger object 00887 * @return the next selectable, or NULL if there are none left 00888 */ 00889 PN_EXTERN pn_selectable_t *pn_messenger_selectable(pn_messenger_t *messenger); 00890 00891 /** 00892 * Get the nearest deadline for selectables associated with a messenger. 00893 * 00894 * @param[in] messenger a messenger object 00895 * @return the nearest deadline 00896 */ 00897 PN_EXTERN pn_timestamp_t pn_messenger_deadline(pn_messenger_t *messenger); 00898 00899 /** 00900 * @} 00901 */ 00902 00903 #ifdef __cplusplus 00904 } 00905 #endif 00906 00907 #endif /* messenger.h */