2
0

stream.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /*
  2. * QEMU System Emulator
  3. *
  4. * Copyright (c) 2003-2008 Fabrice Bellard
  5. * Copyright (c) 2022 Red Hat, Inc.
  6. *
  7. * Permission is hereby granted, free of charge, to any person obtaining a copy
  8. * of this software and associated documentation files (the "Software"), to deal
  9. * in the Software without restriction, including without limitation the rights
  10. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. * copies of the Software, and to permit persons to whom the Software is
  12. * furnished to do so, subject to the following conditions:
  13. *
  14. * The above copyright notice and this permission notice shall be included in
  15. * all copies or substantial portions of the Software.
  16. *
  17. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. * THE SOFTWARE.
  24. */
  25. #include "qemu/osdep.h"
  26. #include "net/net.h"
  27. #include "clients.h"
  28. #include "monitor/monitor.h"
  29. #include "qapi/error.h"
  30. #include "qemu/error-report.h"
  31. #include "qemu/option.h"
  32. #include "qemu/sockets.h"
  33. #include "qemu/iov.h"
  34. #include "qemu/main-loop.h"
  35. #include "qemu/cutils.h"
  36. #include "io/channel.h"
  37. #include "io/channel-socket.h"
  38. #include "io/net-listener.h"
  39. #include "qapi/qapi-events-net.h"
  40. #include "qapi/qapi-visit-sockets.h"
  41. #include "qapi/clone-visitor.h"
  42. typedef struct NetStreamState {
  43. NetClientState nc;
  44. QIOChannel *listen_ioc;
  45. QIONetListener *listener;
  46. QIOChannel *ioc;
  47. guint ioc_read_tag;
  48. guint ioc_write_tag;
  49. SocketReadState rs;
  50. unsigned int send_index; /* number of bytes sent*/
  51. uint32_t reconnect_ms;
  52. guint timer_tag;
  53. SocketAddress *addr;
  54. } NetStreamState;
  55. static void net_stream_listen(QIONetListener *listener,
  56. QIOChannelSocket *cioc,
  57. void *opaque);
  58. static void net_stream_arm_reconnect(NetStreamState *s);
  59. static gboolean net_stream_writable(QIOChannel *ioc,
  60. GIOCondition condition,
  61. gpointer data)
  62. {
  63. NetStreamState *s = data;
  64. s->ioc_write_tag = 0;
  65. qemu_flush_queued_packets(&s->nc);
  66. return G_SOURCE_REMOVE;
  67. }
  68. static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
  69. size_t size)
  70. {
  71. NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
  72. uint32_t len = htonl(size);
  73. struct iovec iov[] = {
  74. {
  75. .iov_base = &len,
  76. .iov_len = sizeof(len),
  77. }, {
  78. .iov_base = (void *)buf,
  79. .iov_len = size,
  80. },
  81. };
  82. struct iovec local_iov[2];
  83. unsigned int nlocal_iov;
  84. size_t remaining;
  85. ssize_t ret;
  86. remaining = iov_size(iov, 2) - s->send_index;
  87. nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
  88. ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
  89. if (ret == QIO_CHANNEL_ERR_BLOCK) {
  90. ret = 0; /* handled further down */
  91. }
  92. if (ret == -1) {
  93. s->send_index = 0;
  94. return -errno;
  95. }
  96. if (ret < (ssize_t)remaining) {
  97. s->send_index += ret;
  98. s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
  99. net_stream_writable, s, NULL);
  100. return 0;
  101. }
  102. s->send_index = 0;
  103. return size;
  104. }
  105. static gboolean net_stream_send(QIOChannel *ioc,
  106. GIOCondition condition,
  107. gpointer data);
  108. static void net_stream_send_completed(NetClientState *nc, ssize_t len)
  109. {
  110. NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
  111. if (!s->ioc_read_tag) {
  112. s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
  113. net_stream_send, s, NULL);
  114. }
  115. }
  116. static void net_stream_rs_finalize(SocketReadState *rs)
  117. {
  118. NetStreamState *s = container_of(rs, NetStreamState, rs);
  119. if (qemu_send_packet_async(&s->nc, rs->buf,
  120. rs->packet_len,
  121. net_stream_send_completed) == 0) {
  122. if (s->ioc_read_tag) {
  123. g_source_remove(s->ioc_read_tag);
  124. s->ioc_read_tag = 0;
  125. }
  126. }
  127. }
  128. static gboolean net_stream_send(QIOChannel *ioc,
  129. GIOCondition condition,
  130. gpointer data)
  131. {
  132. NetStreamState *s = data;
  133. int size;
  134. int ret;
  135. char buf1[NET_BUFSIZE];
  136. const char *buf;
  137. size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
  138. if (size < 0) {
  139. if (errno != EWOULDBLOCK) {
  140. goto eoc;
  141. }
  142. } else if (size == 0) {
  143. /* end of connection */
  144. eoc:
  145. s->ioc_read_tag = 0;
  146. if (s->ioc_write_tag) {
  147. g_source_remove(s->ioc_write_tag);
  148. s->ioc_write_tag = 0;
  149. }
  150. if (s->listener) {
  151. qemu_set_info_str(&s->nc, "listening");
  152. qio_net_listener_set_client_func(s->listener, net_stream_listen,
  153. s, NULL);
  154. }
  155. object_unref(OBJECT(s->ioc));
  156. s->ioc = NULL;
  157. net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
  158. s->nc.link_down = true;
  159. qapi_event_send_netdev_stream_disconnected(s->nc.name);
  160. net_stream_arm_reconnect(s);
  161. return G_SOURCE_REMOVE;
  162. }
  163. buf = buf1;
  164. ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
  165. if (ret == -1) {
  166. goto eoc;
  167. }
  168. return G_SOURCE_CONTINUE;
  169. }
  170. static void net_stream_cleanup(NetClientState *nc)
  171. {
  172. NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
  173. if (s->timer_tag) {
  174. g_source_remove(s->timer_tag);
  175. s->timer_tag = 0;
  176. }
  177. if (s->addr) {
  178. qapi_free_SocketAddress(s->addr);
  179. s->addr = NULL;
  180. }
  181. if (s->ioc) {
  182. if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
  183. if (s->ioc_read_tag) {
  184. g_source_remove(s->ioc_read_tag);
  185. s->ioc_read_tag = 0;
  186. }
  187. if (s->ioc_write_tag) {
  188. g_source_remove(s->ioc_write_tag);
  189. s->ioc_write_tag = 0;
  190. }
  191. }
  192. object_unref(OBJECT(s->ioc));
  193. s->ioc = NULL;
  194. }
  195. if (s->listen_ioc) {
  196. if (s->listener) {
  197. qio_net_listener_disconnect(s->listener);
  198. object_unref(OBJECT(s->listener));
  199. s->listener = NULL;
  200. }
  201. object_unref(OBJECT(s->listen_ioc));
  202. s->listen_ioc = NULL;
  203. }
  204. }
  205. static NetClientInfo net_stream_info = {
  206. .type = NET_CLIENT_DRIVER_STREAM,
  207. .size = sizeof(NetStreamState),
  208. .receive = net_stream_receive,
  209. .cleanup = net_stream_cleanup,
  210. };
  211. static void net_stream_listen(QIONetListener *listener,
  212. QIOChannelSocket *cioc,
  213. void *opaque)
  214. {
  215. NetStreamState *s = opaque;
  216. SocketAddress *addr;
  217. char *uri;
  218. object_ref(OBJECT(cioc));
  219. qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
  220. s->ioc = QIO_CHANNEL(cioc);
  221. qio_channel_set_name(s->ioc, "stream-server");
  222. s->nc.link_down = false;
  223. s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
  224. s, NULL);
  225. if (cioc->localAddr.ss_family == AF_UNIX) {
  226. addr = qio_channel_socket_get_local_address(cioc, NULL);
  227. } else {
  228. addr = qio_channel_socket_get_remote_address(cioc, NULL);
  229. }
  230. g_assert(addr != NULL);
  231. uri = socket_uri(addr);
  232. qemu_set_info_str(&s->nc, "%s", uri);
  233. g_free(uri);
  234. qapi_event_send_netdev_stream_connected(s->nc.name, addr);
  235. qapi_free_SocketAddress(addr);
  236. }
  237. static void net_stream_server_listening(QIOTask *task, gpointer opaque)
  238. {
  239. NetStreamState *s = opaque;
  240. QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
  241. SocketAddress *addr;
  242. int ret;
  243. Error *err = NULL;
  244. if (qio_task_propagate_error(task, &err)) {
  245. qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
  246. error_free(err);
  247. return;
  248. }
  249. addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
  250. g_assert(addr != NULL);
  251. ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
  252. if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
  253. qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
  254. addr->u.fd.str, -ret);
  255. return;
  256. }
  257. g_assert(ret == 0);
  258. qapi_free_SocketAddress(addr);
  259. s->nc.link_down = true;
  260. s->listener = qio_net_listener_new();
  261. qemu_set_info_str(&s->nc, "listening");
  262. net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
  263. qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
  264. qio_net_listener_add(s->listener, listen_sioc);
  265. }
  266. static int net_stream_server_init(NetClientState *peer,
  267. const char *model,
  268. const char *name,
  269. SocketAddress *addr,
  270. Error **errp)
  271. {
  272. NetClientState *nc;
  273. NetStreamState *s;
  274. QIOChannelSocket *listen_sioc = qio_channel_socket_new();
  275. nc = qemu_new_net_client(&net_stream_info, peer, model, name);
  276. s = DO_UPCAST(NetStreamState, nc, nc);
  277. qemu_set_info_str(&s->nc, "initializing");
  278. s->listen_ioc = QIO_CHANNEL(listen_sioc);
  279. qio_channel_socket_listen_async(listen_sioc, addr, 0,
  280. net_stream_server_listening, s,
  281. NULL, NULL);
  282. return 0;
  283. }
  284. static void net_stream_client_connected(QIOTask *task, gpointer opaque)
  285. {
  286. NetStreamState *s = opaque;
  287. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
  288. SocketAddress *addr;
  289. gchar *uri;
  290. int ret;
  291. Error *err = NULL;
  292. if (qio_task_propagate_error(task, &err)) {
  293. qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
  294. error_free(err);
  295. goto error;
  296. }
  297. addr = qio_channel_socket_get_remote_address(sioc, NULL);
  298. g_assert(addr != NULL);
  299. uri = socket_uri(addr);
  300. qemu_set_info_str(&s->nc, "%s", uri);
  301. g_free(uri);
  302. ret = qemu_socket_try_set_nonblock(sioc->fd);
  303. if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
  304. qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
  305. addr->u.fd.str, -ret);
  306. qapi_free_SocketAddress(addr);
  307. goto error;
  308. }
  309. g_assert(ret == 0);
  310. net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
  311. /* Disable Nagle algorithm on TCP sockets to reduce latency */
  312. qio_channel_set_delay(s->ioc, false);
  313. s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
  314. s, NULL);
  315. s->nc.link_down = false;
  316. qapi_event_send_netdev_stream_connected(s->nc.name, addr);
  317. qapi_free_SocketAddress(addr);
  318. return;
  319. error:
  320. object_unref(OBJECT(s->ioc));
  321. s->ioc = NULL;
  322. net_stream_arm_reconnect(s);
  323. }
  324. static gboolean net_stream_reconnect(gpointer data)
  325. {
  326. NetStreamState *s = data;
  327. QIOChannelSocket *sioc;
  328. s->timer_tag = 0;
  329. sioc = qio_channel_socket_new();
  330. s->ioc = QIO_CHANNEL(sioc);
  331. qio_channel_socket_connect_async(sioc, s->addr,
  332. net_stream_client_connected, s,
  333. NULL, NULL);
  334. return G_SOURCE_REMOVE;
  335. }
  336. static void net_stream_arm_reconnect(NetStreamState *s)
  337. {
  338. if (s->reconnect_ms && s->timer_tag == 0) {
  339. qemu_set_info_str(&s->nc, "connecting");
  340. s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
  341. }
  342. }
  343. static int net_stream_client_init(NetClientState *peer,
  344. const char *model,
  345. const char *name,
  346. SocketAddress *addr,
  347. uint32_t reconnect_ms,
  348. Error **errp)
  349. {
  350. NetStreamState *s;
  351. NetClientState *nc;
  352. QIOChannelSocket *sioc = qio_channel_socket_new();
  353. nc = qemu_new_net_client(&net_stream_info, peer, model, name);
  354. s = DO_UPCAST(NetStreamState, nc, nc);
  355. qemu_set_info_str(&s->nc, "connecting");
  356. s->ioc = QIO_CHANNEL(sioc);
  357. s->nc.link_down = true;
  358. s->reconnect_ms = reconnect_ms;
  359. if (reconnect_ms) {
  360. s->addr = QAPI_CLONE(SocketAddress, addr);
  361. }
  362. qio_channel_socket_connect_async(sioc, addr,
  363. net_stream_client_connected, s,
  364. NULL, NULL);
  365. return 0;
  366. }
  367. int net_init_stream(const Netdev *netdev, const char *name,
  368. NetClientState *peer, Error **errp)
  369. {
  370. const NetdevStreamOptions *sock;
  371. assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
  372. sock = &netdev->u.stream;
  373. if (!sock->has_server || !sock->server) {
  374. uint32_t reconnect_ms = 0;
  375. if (sock->has_reconnect && sock->has_reconnect_ms) {
  376. error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually "
  377. "exclusive");
  378. return -1;
  379. } else if (sock->has_reconnect_ms) {
  380. reconnect_ms = sock->reconnect_ms;
  381. } else if (sock->has_reconnect) {
  382. reconnect_ms = sock->reconnect * 1000u;
  383. }
  384. return net_stream_client_init(peer, "stream", name, sock->addr,
  385. reconnect_ms, errp);
  386. }
  387. if (sock->has_reconnect || sock->has_reconnect_ms) {
  388. error_setg(errp, "'reconnect' and 'reconnect-ms' options are "
  389. "incompatible with socket in server mode");
  390. return -1;
  391. }
  392. return net_stream_server_init(peer, "stream", name, sock->addr, errp);
  393. }