queue.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. * Copyright (c) 2003-2008 Fabrice Bellard
  3. * Copyright (c) 2009 Red Hat, Inc.
  4. *
  5. * Permission is hereby granted, free of charge, to any person obtaining a copy
  6. * of this software and associated documentation files (the "Software"), to deal
  7. * in the Software without restriction, including without limitation the rights
  8. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. * copies of the Software, and to permit persons to whom the Software is
  10. * furnished to do so, subject to the following conditions:
  11. *
  12. * The above copyright notice and this permission notice shall be included in
  13. * all copies or substantial portions of the Software.
  14. *
  15. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  18. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. * THE SOFTWARE.
  22. */
  23. #include "qemu/osdep.h"
  24. #include "net/queue.h"
  25. #include "qemu/queue.h"
  26. #include "net/net.h"
  27. /* The delivery handler may only return zero if it will call
  28. * qemu_net_queue_flush() when it determines that it is once again able
  29. * to deliver packets. It must also call qemu_net_queue_purge() in its
  30. * cleanup path.
  31. *
  32. * If a sent callback is provided to send(), the caller must handle a
  33. * zero return from the delivery handler by not sending any more packets
  34. * until we have invoked the callback. Only in that case will we queue
  35. * the packet.
  36. *
  37. * If a sent callback isn't provided, we just drop the packet to avoid
  38. * unbounded queueing.
  39. */
  40. struct NetPacket {
  41. QTAILQ_ENTRY(NetPacket) entry;
  42. NetClientState *sender;
  43. unsigned flags;
  44. int size;
  45. NetPacketSent *sent_cb;
  46. uint8_t data[0];
  47. };
  48. struct NetQueue {
  49. void *opaque;
  50. uint32_t nq_maxlen;
  51. uint32_t nq_count;
  52. NetQueueDeliverFunc *deliver;
  53. QTAILQ_HEAD(packets, NetPacket) packets;
  54. unsigned delivering : 1;
  55. };
  56. NetQueue *qemu_new_net_queue(NetQueueDeliverFunc *deliver, void *opaque)
  57. {
  58. NetQueue *queue;
  59. queue = g_new0(NetQueue, 1);
  60. queue->opaque = opaque;
  61. queue->nq_maxlen = 10000;
  62. queue->nq_count = 0;
  63. queue->deliver = deliver;
  64. QTAILQ_INIT(&queue->packets);
  65. queue->delivering = 0;
  66. return queue;
  67. }
  68. void qemu_del_net_queue(NetQueue *queue)
  69. {
  70. NetPacket *packet, *next;
  71. QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) {
  72. QTAILQ_REMOVE(&queue->packets, packet, entry);
  73. g_free(packet);
  74. }
  75. g_free(queue);
  76. }
  77. static void qemu_net_queue_append(NetQueue *queue,
  78. NetClientState *sender,
  79. unsigned flags,
  80. const uint8_t *buf,
  81. size_t size,
  82. NetPacketSent *sent_cb)
  83. {
  84. NetPacket *packet;
  85. if (queue->nq_count >= queue->nq_maxlen && !sent_cb) {
  86. return; /* drop if queue full and no callback */
  87. }
  88. packet = g_malloc(sizeof(NetPacket) + size);
  89. packet->sender = sender;
  90. packet->flags = flags;
  91. packet->size = size;
  92. packet->sent_cb = sent_cb;
  93. memcpy(packet->data, buf, size);
  94. queue->nq_count++;
  95. QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
  96. }
  97. void qemu_net_queue_append_iov(NetQueue *queue,
  98. NetClientState *sender,
  99. unsigned flags,
  100. const struct iovec *iov,
  101. int iovcnt,
  102. NetPacketSent *sent_cb)
  103. {
  104. NetPacket *packet;
  105. size_t max_len = 0;
  106. int i;
  107. if (queue->nq_count >= queue->nq_maxlen && !sent_cb) {
  108. return; /* drop if queue full and no callback */
  109. }
  110. for (i = 0; i < iovcnt; i++) {
  111. max_len += iov[i].iov_len;
  112. }
  113. packet = g_malloc(sizeof(NetPacket) + max_len);
  114. packet->sender = sender;
  115. packet->sent_cb = sent_cb;
  116. packet->flags = flags;
  117. packet->size = 0;
  118. for (i = 0; i < iovcnt; i++) {
  119. size_t len = iov[i].iov_len;
  120. memcpy(packet->data + packet->size, iov[i].iov_base, len);
  121. packet->size += len;
  122. }
  123. queue->nq_count++;
  124. QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
  125. }
  126. static ssize_t qemu_net_queue_deliver(NetQueue *queue,
  127. NetClientState *sender,
  128. unsigned flags,
  129. const uint8_t *data,
  130. size_t size)
  131. {
  132. ssize_t ret = -1;
  133. struct iovec iov = {
  134. .iov_base = (void *)data,
  135. .iov_len = size
  136. };
  137. queue->delivering = 1;
  138. ret = queue->deliver(sender, flags, &iov, 1, queue->opaque);
  139. queue->delivering = 0;
  140. return ret;
  141. }
  142. static ssize_t qemu_net_queue_deliver_iov(NetQueue *queue,
  143. NetClientState *sender,
  144. unsigned flags,
  145. const struct iovec *iov,
  146. int iovcnt)
  147. {
  148. ssize_t ret = -1;
  149. queue->delivering = 1;
  150. ret = queue->deliver(sender, flags, iov, iovcnt, queue->opaque);
  151. queue->delivering = 0;
  152. return ret;
  153. }
  154. ssize_t qemu_net_queue_send(NetQueue *queue,
  155. NetClientState *sender,
  156. unsigned flags,
  157. const uint8_t *data,
  158. size_t size,
  159. NetPacketSent *sent_cb)
  160. {
  161. ssize_t ret;
  162. if (queue->delivering || !qemu_can_send_packet(sender)) {
  163. qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
  164. return 0;
  165. }
  166. ret = qemu_net_queue_deliver(queue, sender, flags, data, size);
  167. if (ret == 0) {
  168. qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
  169. return 0;
  170. }
  171. qemu_net_queue_flush(queue);
  172. return ret;
  173. }
  174. ssize_t qemu_net_queue_send_iov(NetQueue *queue,
  175. NetClientState *sender,
  176. unsigned flags,
  177. const struct iovec *iov,
  178. int iovcnt,
  179. NetPacketSent *sent_cb)
  180. {
  181. ssize_t ret;
  182. if (queue->delivering || !qemu_can_send_packet(sender)) {
  183. qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
  184. return 0;
  185. }
  186. ret = qemu_net_queue_deliver_iov(queue, sender, flags, iov, iovcnt);
  187. if (ret == 0) {
  188. qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
  189. return 0;
  190. }
  191. qemu_net_queue_flush(queue);
  192. return ret;
  193. }
  194. void qemu_net_queue_purge(NetQueue *queue, NetClientState *from)
  195. {
  196. NetPacket *packet, *next;
  197. QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) {
  198. if (packet->sender == from) {
  199. QTAILQ_REMOVE(&queue->packets, packet, entry);
  200. queue->nq_count--;
  201. if (packet->sent_cb) {
  202. packet->sent_cb(packet->sender, 0);
  203. }
  204. g_free(packet);
  205. }
  206. }
  207. }
  208. bool qemu_net_queue_flush(NetQueue *queue)
  209. {
  210. while (!QTAILQ_EMPTY(&queue->packets)) {
  211. NetPacket *packet;
  212. int ret;
  213. packet = QTAILQ_FIRST(&queue->packets);
  214. QTAILQ_REMOVE(&queue->packets, packet, entry);
  215. queue->nq_count--;
  216. ret = qemu_net_queue_deliver(queue,
  217. packet->sender,
  218. packet->flags,
  219. packet->data,
  220. packet->size);
  221. if (ret == 0) {
  222. queue->nq_count++;
  223. QTAILQ_INSERT_HEAD(&queue->packets, packet, entry);
  224. return false;
  225. }
  226. if (packet->sent_cb) {
  227. packet->sent_cb(packet->sender, ret);
  228. }
  229. g_free(packet);
  230. }
  231. return true;
  232. }