2
0

channel-socket.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
  1. /*
  2. * QEMU I/O channels sockets driver
  3. *
  4. * Copyright (c) 2015 Red Hat, Inc.
  5. *
  6. * This library is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * This library is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #include "qemu/osdep.h"
  20. #include "qapi/error.h"
  21. #include "qapi/qapi-visit-sockets.h"
  22. #include "qemu/module.h"
  23. #include "io/channel-socket.h"
  24. #include "io/channel-watch.h"
  25. #include "trace.h"
  26. #include "qapi/clone-visitor.h"
  27. #ifdef CONFIG_LINUX
  28. #include <linux/errqueue.h>
  29. #include <sys/socket.h>
  30. #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
  31. #define QEMU_MSG_ZEROCOPY
  32. #endif
  33. #endif
  34. #define SOCKET_MAX_FDS 16
  35. SocketAddress *
  36. qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
  37. Error **errp)
  38. {
  39. return socket_sockaddr_to_address(&ioc->localAddr,
  40. ioc->localAddrLen,
  41. errp);
  42. }
  43. SocketAddress *
  44. qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
  45. Error **errp)
  46. {
  47. return socket_sockaddr_to_address(&ioc->remoteAddr,
  48. ioc->remoteAddrLen,
  49. errp);
  50. }
  51. QIOChannelSocket *
  52. qio_channel_socket_new(void)
  53. {
  54. QIOChannelSocket *sioc;
  55. QIOChannel *ioc;
  56. sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
  57. sioc->fd = -1;
  58. sioc->zero_copy_queued = 0;
  59. sioc->zero_copy_sent = 0;
  60. ioc = QIO_CHANNEL(sioc);
  61. qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
  62. #ifdef WIN32
  63. ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
  64. #endif
  65. trace_qio_channel_socket_new(sioc);
  66. return sioc;
  67. }
  68. static int
  69. qio_channel_socket_set_fd(QIOChannelSocket *sioc,
  70. int fd,
  71. Error **errp)
  72. {
  73. if (sioc->fd != -1) {
  74. error_setg(errp, "Socket is already open");
  75. return -1;
  76. }
  77. sioc->fd = fd;
  78. sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
  79. sioc->localAddrLen = sizeof(sioc->localAddr);
  80. if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
  81. &sioc->remoteAddrLen) < 0) {
  82. if (errno == ENOTCONN) {
  83. memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
  84. sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
  85. } else {
  86. error_setg_errno(errp, errno,
  87. "Unable to query remote socket address");
  88. goto error;
  89. }
  90. }
  91. if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
  92. &sioc->localAddrLen) < 0) {
  93. error_setg_errno(errp, errno,
  94. "Unable to query local socket address");
  95. goto error;
  96. }
  97. #ifndef WIN32
  98. if (sioc->localAddr.ss_family == AF_UNIX) {
  99. QIOChannel *ioc = QIO_CHANNEL(sioc);
  100. qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
  101. }
  102. #endif /* WIN32 */
  103. return 0;
  104. error:
  105. sioc->fd = -1; /* Let the caller close FD on failure */
  106. return -1;
  107. }
  108. QIOChannelSocket *
  109. qio_channel_socket_new_fd(int fd,
  110. Error **errp)
  111. {
  112. QIOChannelSocket *ioc;
  113. ioc = qio_channel_socket_new();
  114. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  115. object_unref(OBJECT(ioc));
  116. return NULL;
  117. }
  118. trace_qio_channel_socket_new_fd(ioc, fd);
  119. return ioc;
  120. }
  121. int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
  122. SocketAddress *addr,
  123. Error **errp)
  124. {
  125. int fd;
  126. trace_qio_channel_socket_connect_sync(ioc, addr);
  127. fd = socket_connect(addr, errp);
  128. if (fd < 0) {
  129. trace_qio_channel_socket_connect_fail(ioc);
  130. return -1;
  131. }
  132. trace_qio_channel_socket_connect_complete(ioc, fd);
  133. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  134. close(fd);
  135. return -1;
  136. }
  137. #ifdef QEMU_MSG_ZEROCOPY
  138. int ret, v = 1;
  139. ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
  140. if (ret == 0) {
  141. /* Zero copy available on host */
  142. qio_channel_set_feature(QIO_CHANNEL(ioc),
  143. QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
  144. }
  145. #endif
  146. qio_channel_set_feature(QIO_CHANNEL(ioc),
  147. QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
  148. return 0;
  149. }
  150. static void qio_channel_socket_connect_worker(QIOTask *task,
  151. gpointer opaque)
  152. {
  153. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  154. SocketAddress *addr = opaque;
  155. Error *err = NULL;
  156. qio_channel_socket_connect_sync(ioc, addr, &err);
  157. qio_task_set_error(task, err);
  158. }
  159. void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
  160. SocketAddress *addr,
  161. QIOTaskFunc callback,
  162. gpointer opaque,
  163. GDestroyNotify destroy,
  164. GMainContext *context)
  165. {
  166. QIOTask *task = qio_task_new(
  167. OBJECT(ioc), callback, opaque, destroy);
  168. SocketAddress *addrCopy;
  169. addrCopy = QAPI_CLONE(SocketAddress, addr);
  170. /* socket_connect() does a non-blocking connect(), but it
  171. * still blocks in DNS lookups, so we must use a thread */
  172. trace_qio_channel_socket_connect_async(ioc, addr);
  173. qio_task_run_in_thread(task,
  174. qio_channel_socket_connect_worker,
  175. addrCopy,
  176. (GDestroyNotify)qapi_free_SocketAddress,
  177. context);
  178. }
  179. int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
  180. SocketAddress *addr,
  181. int num,
  182. Error **errp)
  183. {
  184. int fd;
  185. trace_qio_channel_socket_listen_sync(ioc, addr, num);
  186. fd = socket_listen(addr, num, errp);
  187. if (fd < 0) {
  188. trace_qio_channel_socket_listen_fail(ioc);
  189. return -1;
  190. }
  191. trace_qio_channel_socket_listen_complete(ioc, fd);
  192. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  193. close(fd);
  194. return -1;
  195. }
  196. qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
  197. return 0;
  198. }
  199. struct QIOChannelListenWorkerData {
  200. SocketAddress *addr;
  201. int num; /* amount of expected connections */
  202. };
  203. static void qio_channel_listen_worker_free(gpointer opaque)
  204. {
  205. struct QIOChannelListenWorkerData *data = opaque;
  206. qapi_free_SocketAddress(data->addr);
  207. g_free(data);
  208. }
  209. static void qio_channel_socket_listen_worker(QIOTask *task,
  210. gpointer opaque)
  211. {
  212. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  213. struct QIOChannelListenWorkerData *data = opaque;
  214. Error *err = NULL;
  215. qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
  216. qio_task_set_error(task, err);
  217. }
  218. void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
  219. SocketAddress *addr,
  220. int num,
  221. QIOTaskFunc callback,
  222. gpointer opaque,
  223. GDestroyNotify destroy,
  224. GMainContext *context)
  225. {
  226. QIOTask *task = qio_task_new(
  227. OBJECT(ioc), callback, opaque, destroy);
  228. struct QIOChannelListenWorkerData *data;
  229. data = g_new0(struct QIOChannelListenWorkerData, 1);
  230. data->addr = QAPI_CLONE(SocketAddress, addr);
  231. data->num = num;
  232. /* socket_listen() blocks in DNS lookups, so we must use a thread */
  233. trace_qio_channel_socket_listen_async(ioc, addr, num);
  234. qio_task_run_in_thread(task,
  235. qio_channel_socket_listen_worker,
  236. data,
  237. qio_channel_listen_worker_free,
  238. context);
  239. }
  240. int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
  241. SocketAddress *localAddr,
  242. SocketAddress *remoteAddr,
  243. Error **errp)
  244. {
  245. int fd;
  246. trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
  247. fd = socket_dgram(remoteAddr, localAddr, errp);
  248. if (fd < 0) {
  249. trace_qio_channel_socket_dgram_fail(ioc);
  250. return -1;
  251. }
  252. trace_qio_channel_socket_dgram_complete(ioc, fd);
  253. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  254. close(fd);
  255. return -1;
  256. }
  257. return 0;
  258. }
  259. struct QIOChannelSocketDGramWorkerData {
  260. SocketAddress *localAddr;
  261. SocketAddress *remoteAddr;
  262. };
  263. static void qio_channel_socket_dgram_worker_free(gpointer opaque)
  264. {
  265. struct QIOChannelSocketDGramWorkerData *data = opaque;
  266. qapi_free_SocketAddress(data->localAddr);
  267. qapi_free_SocketAddress(data->remoteAddr);
  268. g_free(data);
  269. }
  270. static void qio_channel_socket_dgram_worker(QIOTask *task,
  271. gpointer opaque)
  272. {
  273. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  274. struct QIOChannelSocketDGramWorkerData *data = opaque;
  275. Error *err = NULL;
  276. /* socket_dgram() blocks in DNS lookups, so we must use a thread */
  277. qio_channel_socket_dgram_sync(ioc, data->localAddr,
  278. data->remoteAddr, &err);
  279. qio_task_set_error(task, err);
  280. }
  281. void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
  282. SocketAddress *localAddr,
  283. SocketAddress *remoteAddr,
  284. QIOTaskFunc callback,
  285. gpointer opaque,
  286. GDestroyNotify destroy,
  287. GMainContext *context)
  288. {
  289. QIOTask *task = qio_task_new(
  290. OBJECT(ioc), callback, opaque, destroy);
  291. struct QIOChannelSocketDGramWorkerData *data = g_new0(
  292. struct QIOChannelSocketDGramWorkerData, 1);
  293. data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
  294. data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
  295. trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
  296. qio_task_run_in_thread(task,
  297. qio_channel_socket_dgram_worker,
  298. data,
  299. qio_channel_socket_dgram_worker_free,
  300. context);
  301. }
  302. QIOChannelSocket *
  303. qio_channel_socket_accept(QIOChannelSocket *ioc,
  304. Error **errp)
  305. {
  306. QIOChannelSocket *cioc;
  307. cioc = qio_channel_socket_new();
  308. cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
  309. cioc->localAddrLen = sizeof(ioc->localAddr);
  310. retry:
  311. trace_qio_channel_socket_accept(ioc);
  312. cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
  313. &cioc->remoteAddrLen);
  314. if (cioc->fd < 0) {
  315. if (errno == EINTR) {
  316. goto retry;
  317. }
  318. error_setg_errno(errp, errno, "Unable to accept connection");
  319. trace_qio_channel_socket_accept_fail(ioc);
  320. goto error;
  321. }
  322. if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
  323. &cioc->localAddrLen) < 0) {
  324. error_setg_errno(errp, errno,
  325. "Unable to query local socket address");
  326. goto error;
  327. }
  328. #ifndef WIN32
  329. if (cioc->localAddr.ss_family == AF_UNIX) {
  330. QIOChannel *ioc_local = QIO_CHANNEL(cioc);
  331. qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
  332. }
  333. #endif /* WIN32 */
  334. qio_channel_set_feature(QIO_CHANNEL(cioc),
  335. QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
  336. trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
  337. return cioc;
  338. error:
  339. object_unref(OBJECT(cioc));
  340. return NULL;
  341. }
  342. static void qio_channel_socket_init(Object *obj)
  343. {
  344. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
  345. ioc->fd = -1;
  346. }
  347. static void qio_channel_socket_finalize(Object *obj)
  348. {
  349. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
  350. if (ioc->fd != -1) {
  351. QIOChannel *ioc_local = QIO_CHANNEL(ioc);
  352. if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
  353. Error *err = NULL;
  354. socket_listen_cleanup(ioc->fd, &err);
  355. if (err) {
  356. error_report_err(err);
  357. err = NULL;
  358. }
  359. }
  360. #ifdef WIN32
  361. qemu_socket_unselect(ioc->fd, NULL);
  362. #endif
  363. close(ioc->fd);
  364. ioc->fd = -1;
  365. }
  366. }
  367. #ifndef WIN32
  368. static void qio_channel_socket_copy_fds(struct msghdr *msg,
  369. int **fds, size_t *nfds)
  370. {
  371. struct cmsghdr *cmsg;
  372. *nfds = 0;
  373. *fds = NULL;
  374. for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
  375. int fd_size, i;
  376. int gotfds;
  377. if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
  378. cmsg->cmsg_level != SOL_SOCKET ||
  379. cmsg->cmsg_type != SCM_RIGHTS) {
  380. continue;
  381. }
  382. fd_size = cmsg->cmsg_len - CMSG_LEN(0);
  383. if (!fd_size) {
  384. continue;
  385. }
  386. gotfds = fd_size / sizeof(int);
  387. *fds = g_renew(int, *fds, *nfds + gotfds);
  388. memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
  389. for (i = 0; i < gotfds; i++) {
  390. int fd = (*fds)[*nfds + i];
  391. if (fd < 0) {
  392. continue;
  393. }
  394. /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
  395. qemu_socket_set_block(fd);
  396. #ifndef MSG_CMSG_CLOEXEC
  397. qemu_set_cloexec(fd);
  398. #endif
  399. }
  400. *nfds += gotfds;
  401. }
  402. }
  403. static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
  404. const struct iovec *iov,
  405. size_t niov,
  406. int **fds,
  407. size_t *nfds,
  408. int flags,
  409. Error **errp)
  410. {
  411. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  412. ssize_t ret;
  413. struct msghdr msg = { NULL, };
  414. char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
  415. int sflags = 0;
  416. memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
  417. msg.msg_iov = (struct iovec *)iov;
  418. msg.msg_iovlen = niov;
  419. if (fds && nfds) {
  420. msg.msg_control = control;
  421. msg.msg_controllen = sizeof(control);
  422. #ifdef MSG_CMSG_CLOEXEC
  423. sflags |= MSG_CMSG_CLOEXEC;
  424. #endif
  425. }
  426. if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
  427. sflags |= MSG_PEEK;
  428. }
  429. retry:
  430. ret = recvmsg(sioc->fd, &msg, sflags);
  431. if (ret < 0) {
  432. if (errno == EAGAIN) {
  433. return QIO_CHANNEL_ERR_BLOCK;
  434. }
  435. if (errno == EINTR) {
  436. goto retry;
  437. }
  438. error_setg_errno(errp, errno,
  439. "Unable to read from socket");
  440. return -1;
  441. }
  442. if (fds && nfds) {
  443. qio_channel_socket_copy_fds(&msg, fds, nfds);
  444. }
  445. return ret;
  446. }
  447. static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
  448. const struct iovec *iov,
  449. size_t niov,
  450. int *fds,
  451. size_t nfds,
  452. int flags,
  453. Error **errp)
  454. {
  455. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  456. ssize_t ret;
  457. struct msghdr msg = { NULL, };
  458. char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
  459. size_t fdsize = sizeof(int) * nfds;
  460. struct cmsghdr *cmsg;
  461. int sflags = 0;
  462. memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
  463. msg.msg_iov = (struct iovec *)iov;
  464. msg.msg_iovlen = niov;
  465. if (nfds) {
  466. if (nfds > SOCKET_MAX_FDS) {
  467. error_setg_errno(errp, EINVAL,
  468. "Only %d FDs can be sent, got %zu",
  469. SOCKET_MAX_FDS, nfds);
  470. return -1;
  471. }
  472. msg.msg_control = control;
  473. msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
  474. cmsg = CMSG_FIRSTHDR(&msg);
  475. cmsg->cmsg_len = CMSG_LEN(fdsize);
  476. cmsg->cmsg_level = SOL_SOCKET;
  477. cmsg->cmsg_type = SCM_RIGHTS;
  478. memcpy(CMSG_DATA(cmsg), fds, fdsize);
  479. }
  480. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  481. #ifdef QEMU_MSG_ZEROCOPY
  482. sflags = MSG_ZEROCOPY;
  483. #else
  484. /*
  485. * We expect QIOChannel class entry point to have
  486. * blocked this code path already
  487. */
  488. g_assert_not_reached();
  489. #endif
  490. }
  491. retry:
  492. ret = sendmsg(sioc->fd, &msg, sflags);
  493. if (ret <= 0) {
  494. switch (errno) {
  495. case EAGAIN:
  496. return QIO_CHANNEL_ERR_BLOCK;
  497. case EINTR:
  498. goto retry;
  499. case ENOBUFS:
  500. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  501. error_setg_errno(errp, errno,
  502. "Process can't lock enough memory for using MSG_ZEROCOPY");
  503. return -1;
  504. }
  505. break;
  506. }
  507. error_setg_errno(errp, errno,
  508. "Unable to write to socket");
  509. return -1;
  510. }
  511. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  512. sioc->zero_copy_queued++;
  513. }
  514. return ret;
  515. }
  516. #else /* WIN32 */
  517. static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
  518. const struct iovec *iov,
  519. size_t niov,
  520. int **fds,
  521. size_t *nfds,
  522. int flags,
  523. Error **errp)
  524. {
  525. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  526. ssize_t done = 0;
  527. ssize_t i;
  528. int sflags = 0;
  529. if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
  530. sflags |= MSG_PEEK;
  531. }
  532. for (i = 0; i < niov; i++) {
  533. ssize_t ret;
  534. retry:
  535. ret = recv(sioc->fd,
  536. iov[i].iov_base,
  537. iov[i].iov_len,
  538. sflags);
  539. if (ret < 0) {
  540. if (errno == EAGAIN) {
  541. if (done) {
  542. return done;
  543. } else {
  544. return QIO_CHANNEL_ERR_BLOCK;
  545. }
  546. } else if (errno == EINTR) {
  547. goto retry;
  548. } else {
  549. error_setg_errno(errp, errno,
  550. "Unable to read from socket");
  551. return -1;
  552. }
  553. }
  554. done += ret;
  555. if (ret < iov[i].iov_len) {
  556. return done;
  557. }
  558. }
  559. return done;
  560. }
  561. static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
  562. const struct iovec *iov,
  563. size_t niov,
  564. int *fds,
  565. size_t nfds,
  566. int flags,
  567. Error **errp)
  568. {
  569. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  570. ssize_t done = 0;
  571. ssize_t i;
  572. for (i = 0; i < niov; i++) {
  573. ssize_t ret;
  574. retry:
  575. ret = send(sioc->fd,
  576. iov[i].iov_base,
  577. iov[i].iov_len,
  578. 0);
  579. if (ret < 0) {
  580. if (errno == EAGAIN) {
  581. if (done) {
  582. return done;
  583. } else {
  584. return QIO_CHANNEL_ERR_BLOCK;
  585. }
  586. } else if (errno == EINTR) {
  587. goto retry;
  588. } else {
  589. error_setg_errno(errp, errno,
  590. "Unable to write to socket");
  591. return -1;
  592. }
  593. }
  594. done += ret;
  595. if (ret < iov[i].iov_len) {
  596. return done;
  597. }
  598. }
  599. return done;
  600. }
  601. #endif /* WIN32 */
  602. #ifdef QEMU_MSG_ZEROCOPY
  603. static int qio_channel_socket_flush(QIOChannel *ioc,
  604. Error **errp)
  605. {
  606. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  607. struct msghdr msg = {};
  608. struct sock_extended_err *serr;
  609. struct cmsghdr *cm;
  610. char control[CMSG_SPACE(sizeof(*serr))];
  611. int received;
  612. int ret;
  613. if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
  614. return 0;
  615. }
  616. msg.msg_control = control;
  617. msg.msg_controllen = sizeof(control);
  618. memset(control, 0, sizeof(control));
  619. ret = 1;
  620. while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
  621. received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
  622. if (received < 0) {
  623. switch (errno) {
  624. case EAGAIN:
  625. /* Nothing on errqueue, wait until something is available */
  626. qio_channel_wait(ioc, G_IO_ERR);
  627. continue;
  628. case EINTR:
  629. continue;
  630. default:
  631. error_setg_errno(errp, errno,
  632. "Unable to read errqueue");
  633. return -1;
  634. }
  635. }
  636. cm = CMSG_FIRSTHDR(&msg);
  637. if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR &&
  638. cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
  639. error_setg_errno(errp, EPROTOTYPE,
  640. "Wrong cmsg in errqueue");
  641. return -1;
  642. }
  643. serr = (void *) CMSG_DATA(cm);
  644. if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
  645. error_setg_errno(errp, serr->ee_errno,
  646. "Error on socket");
  647. return -1;
  648. }
  649. if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
  650. error_setg_errno(errp, serr->ee_origin,
  651. "Error not from zero copy");
  652. return -1;
  653. }
  654. /* No errors, count successfully finished sendmsg()*/
  655. sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
  656. /* If any sendmsg() succeeded using zero copy, return 0 at the end */
  657. if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
  658. ret = 0;
  659. }
  660. }
  661. return ret;
  662. }
  663. #endif /* QEMU_MSG_ZEROCOPY */
  664. static int
  665. qio_channel_socket_set_blocking(QIOChannel *ioc,
  666. bool enabled,
  667. Error **errp)
  668. {
  669. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  670. if (enabled) {
  671. qemu_socket_set_block(sioc->fd);
  672. } else {
  673. qemu_socket_set_nonblock(sioc->fd);
  674. }
  675. return 0;
  676. }
  677. static void
  678. qio_channel_socket_set_delay(QIOChannel *ioc,
  679. bool enabled)
  680. {
  681. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  682. int v = enabled ? 0 : 1;
  683. setsockopt(sioc->fd,
  684. IPPROTO_TCP, TCP_NODELAY,
  685. &v, sizeof(v));
  686. }
  687. static void
  688. qio_channel_socket_set_cork(QIOChannel *ioc,
  689. bool enabled)
  690. {
  691. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  692. int v = enabled ? 1 : 0;
  693. socket_set_cork(sioc->fd, v);
  694. }
  695. static int
  696. qio_channel_socket_close(QIOChannel *ioc,
  697. Error **errp)
  698. {
  699. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  700. int rc = 0;
  701. Error *err = NULL;
  702. if (sioc->fd != -1) {
  703. #ifdef WIN32
  704. qemu_socket_unselect(sioc->fd, NULL);
  705. #endif
  706. if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
  707. socket_listen_cleanup(sioc->fd, errp);
  708. }
  709. if (close(sioc->fd) < 0) {
  710. sioc->fd = -1;
  711. error_setg_errno(&err, errno, "Unable to close socket");
  712. error_propagate(errp, err);
  713. return -1;
  714. }
  715. sioc->fd = -1;
  716. }
  717. return rc;
  718. }
  719. static int
  720. qio_channel_socket_shutdown(QIOChannel *ioc,
  721. QIOChannelShutdown how,
  722. Error **errp)
  723. {
  724. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  725. int sockhow;
  726. switch (how) {
  727. case QIO_CHANNEL_SHUTDOWN_READ:
  728. sockhow = SHUT_RD;
  729. break;
  730. case QIO_CHANNEL_SHUTDOWN_WRITE:
  731. sockhow = SHUT_WR;
  732. break;
  733. case QIO_CHANNEL_SHUTDOWN_BOTH:
  734. default:
  735. sockhow = SHUT_RDWR;
  736. break;
  737. }
  738. if (shutdown(sioc->fd, sockhow) < 0) {
  739. error_setg_errno(errp, errno,
  740. "Unable to shutdown socket");
  741. return -1;
  742. }
  743. return 0;
  744. }
  745. static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
  746. AioContext *ctx,
  747. IOHandler *io_read,
  748. IOHandler *io_write,
  749. void *opaque)
  750. {
  751. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  752. aio_set_fd_handler(ctx, sioc->fd, false,
  753. io_read, io_write, NULL, NULL, opaque);
  754. }
  755. static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
  756. GIOCondition condition)
  757. {
  758. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  759. return qio_channel_create_socket_watch(ioc,
  760. sioc->fd,
  761. condition);
  762. }
  763. static void qio_channel_socket_class_init(ObjectClass *klass,
  764. void *class_data G_GNUC_UNUSED)
  765. {
  766. QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
  767. ioc_klass->io_writev = qio_channel_socket_writev;
  768. ioc_klass->io_readv = qio_channel_socket_readv;
  769. ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
  770. ioc_klass->io_close = qio_channel_socket_close;
  771. ioc_klass->io_shutdown = qio_channel_socket_shutdown;
  772. ioc_klass->io_set_cork = qio_channel_socket_set_cork;
  773. ioc_klass->io_set_delay = qio_channel_socket_set_delay;
  774. ioc_klass->io_create_watch = qio_channel_socket_create_watch;
  775. ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
  776. #ifdef QEMU_MSG_ZEROCOPY
  777. ioc_klass->io_flush = qio_channel_socket_flush;
  778. #endif
  779. }
  780. static const TypeInfo qio_channel_socket_info = {
  781. .parent = TYPE_QIO_CHANNEL,
  782. .name = TYPE_QIO_CHANNEL_SOCKET,
  783. .instance_size = sizeof(QIOChannelSocket),
  784. .instance_init = qio_channel_socket_init,
  785. .instance_finalize = qio_channel_socket_finalize,
  786. .class_init = qio_channel_socket_class_init,
  787. };
  788. static void qio_channel_socket_register_types(void)
  789. {
  790. type_register_static(&qio_channel_socket_info);
  791. }
  792. type_init(qio_channel_socket_register_types);