2
0

channel-socket.c 28 KB


  1. /*
  2. * QEMU I/O channels sockets driver
  3. *
  4. * Copyright (c) 2015 Red Hat, Inc.
  5. *
  6. * This library is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * This library is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #include "qemu/osdep.h"
  20. #include "qapi/error.h"
  21. #include "qapi/qapi-visit-sockets.h"
  22. #include "qemu/module.h"
  23. #include "io/channel-socket.h"
  24. #include "io/channel-util.h"
  25. #include "io/channel-watch.h"
  26. #include "trace.h"
  27. #include "qapi/clone-visitor.h"
  28. #ifdef CONFIG_LINUX
  29. #include <linux/errqueue.h>
  30. #include <sys/socket.h>
  31. #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
  32. #define QEMU_MSG_ZEROCOPY
  33. #endif
  34. #endif
  35. #define SOCKET_MAX_FDS 16
  36. SocketAddress *
  37. qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
  38. Error **errp)
  39. {
  40. return socket_sockaddr_to_address(&ioc->localAddr,
  41. ioc->localAddrLen,
  42. errp);
  43. }
  44. SocketAddress *
  45. qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
  46. Error **errp)
  47. {
  48. return socket_sockaddr_to_address(&ioc->remoteAddr,
  49. ioc->remoteAddrLen,
  50. errp);
  51. }
  52. QIOChannelSocket *
  53. qio_channel_socket_new(void)
  54. {
  55. QIOChannelSocket *sioc;
  56. QIOChannel *ioc;
  57. sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
  58. sioc->fd = -1;
  59. sioc->zero_copy_queued = 0;
  60. sioc->zero_copy_sent = 0;
  61. ioc = QIO_CHANNEL(sioc);
  62. qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
  63. #ifdef WIN32
  64. ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
  65. #endif
  66. trace_qio_channel_socket_new(sioc);
  67. return sioc;
  68. }
  69. static int
  70. qio_channel_socket_set_fd(QIOChannelSocket *sioc,
  71. int fd,
  72. Error **errp)
  73. {
  74. if (sioc->fd != -1) {
  75. error_setg(errp, "Socket is already open");
  76. return -1;
  77. }
  78. sioc->fd = fd;
  79. sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
  80. sioc->localAddrLen = sizeof(sioc->localAddr);
  81. if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
  82. &sioc->remoteAddrLen) < 0) {
  83. if (errno == ENOTCONN) {
  84. memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
  85. sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
  86. } else {
  87. error_setg_errno(errp, errno,
  88. "Unable to query remote socket address");
  89. goto error;
  90. }
  91. }
  92. if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
  93. &sioc->localAddrLen) < 0) {
  94. error_setg_errno(errp, errno,
  95. "Unable to query local socket address");
  96. goto error;
  97. }
  98. #ifndef WIN32
  99. if (sioc->localAddr.ss_family == AF_UNIX) {
  100. QIOChannel *ioc = QIO_CHANNEL(sioc);
  101. qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
  102. }
  103. #endif /* WIN32 */
  104. return 0;
  105. error:
  106. sioc->fd = -1; /* Let the caller close FD on failure */
  107. return -1;
  108. }
  109. QIOChannelSocket *
  110. qio_channel_socket_new_fd(int fd,
  111. Error **errp)
  112. {
  113. QIOChannelSocket *ioc;
  114. ioc = qio_channel_socket_new();
  115. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  116. object_unref(OBJECT(ioc));
  117. return NULL;
  118. }
  119. trace_qio_channel_socket_new_fd(ioc, fd);
  120. return ioc;
  121. }
  122. int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
  123. SocketAddress *addr,
  124. Error **errp)
  125. {
  126. int fd;
  127. trace_qio_channel_socket_connect_sync(ioc, addr);
  128. fd = socket_connect(addr, errp);
  129. if (fd < 0) {
  130. trace_qio_channel_socket_connect_fail(ioc);
  131. return -1;
  132. }
  133. trace_qio_channel_socket_connect_complete(ioc, fd);
  134. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  135. close(fd);
  136. return -1;
  137. }
  138. #ifdef QEMU_MSG_ZEROCOPY
  139. int ret, v = 1;
  140. ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
  141. if (ret == 0) {
  142. /* Zero copy available on host */
  143. qio_channel_set_feature(QIO_CHANNEL(ioc),
  144. QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
  145. }
  146. #endif
  147. qio_channel_set_feature(QIO_CHANNEL(ioc),
  148. QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
  149. return 0;
  150. }
  151. static void qio_channel_socket_connect_worker(QIOTask *task,
  152. gpointer opaque)
  153. {
  154. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  155. SocketAddress *addr = opaque;
  156. Error *err = NULL;
  157. qio_channel_socket_connect_sync(ioc, addr, &err);
  158. qio_task_set_error(task, err);
  159. }
  160. void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
  161. SocketAddress *addr,
  162. QIOTaskFunc callback,
  163. gpointer opaque,
  164. GDestroyNotify destroy,
  165. GMainContext *context)
  166. {
  167. QIOTask *task = qio_task_new(
  168. OBJECT(ioc), callback, opaque, destroy);
  169. SocketAddress *addrCopy;
  170. addrCopy = QAPI_CLONE(SocketAddress, addr);
  171. /* socket_connect() does a non-blocking connect(), but it
  172. * still blocks in DNS lookups, so we must use a thread */
  173. trace_qio_channel_socket_connect_async(ioc, addr);
  174. qio_task_run_in_thread(task,
  175. qio_channel_socket_connect_worker,
  176. addrCopy,
  177. (GDestroyNotify)qapi_free_SocketAddress,
  178. context);
  179. }
  180. int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
  181. SocketAddress *addr,
  182. int num,
  183. Error **errp)
  184. {
  185. int fd;
  186. trace_qio_channel_socket_listen_sync(ioc, addr, num);
  187. fd = socket_listen(addr, num, errp);
  188. if (fd < 0) {
  189. trace_qio_channel_socket_listen_fail(ioc);
  190. return -1;
  191. }
  192. trace_qio_channel_socket_listen_complete(ioc, fd);
  193. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  194. close(fd);
  195. return -1;
  196. }
  197. qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
  198. return 0;
  199. }
  200. struct QIOChannelListenWorkerData {
  201. SocketAddress *addr;
  202. int num; /* amount of expected connections */
  203. };
  204. static void qio_channel_listen_worker_free(gpointer opaque)
  205. {
  206. struct QIOChannelListenWorkerData *data = opaque;
  207. qapi_free_SocketAddress(data->addr);
  208. g_free(data);
  209. }
  210. static void qio_channel_socket_listen_worker(QIOTask *task,
  211. gpointer opaque)
  212. {
  213. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  214. struct QIOChannelListenWorkerData *data = opaque;
  215. Error *err = NULL;
  216. qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
  217. qio_task_set_error(task, err);
  218. }
  219. void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
  220. SocketAddress *addr,
  221. int num,
  222. QIOTaskFunc callback,
  223. gpointer opaque,
  224. GDestroyNotify destroy,
  225. GMainContext *context)
  226. {
  227. QIOTask *task = qio_task_new(
  228. OBJECT(ioc), callback, opaque, destroy);
  229. struct QIOChannelListenWorkerData *data;
  230. data = g_new0(struct QIOChannelListenWorkerData, 1);
  231. data->addr = QAPI_CLONE(SocketAddress, addr);
  232. data->num = num;
  233. /* socket_listen() blocks in DNS lookups, so we must use a thread */
  234. trace_qio_channel_socket_listen_async(ioc, addr, num);
  235. qio_task_run_in_thread(task,
  236. qio_channel_socket_listen_worker,
  237. data,
  238. qio_channel_listen_worker_free,
  239. context);
  240. }
  241. int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
  242. SocketAddress *localAddr,
  243. SocketAddress *remoteAddr,
  244. Error **errp)
  245. {
  246. int fd;
  247. trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
  248. fd = socket_dgram(remoteAddr, localAddr, errp);
  249. if (fd < 0) {
  250. trace_qio_channel_socket_dgram_fail(ioc);
  251. return -1;
  252. }
  253. trace_qio_channel_socket_dgram_complete(ioc, fd);
  254. if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
  255. close(fd);
  256. return -1;
  257. }
  258. return 0;
  259. }
  260. struct QIOChannelSocketDGramWorkerData {
  261. SocketAddress *localAddr;
  262. SocketAddress *remoteAddr;
  263. };
  264. static void qio_channel_socket_dgram_worker_free(gpointer opaque)
  265. {
  266. struct QIOChannelSocketDGramWorkerData *data = opaque;
  267. qapi_free_SocketAddress(data->localAddr);
  268. qapi_free_SocketAddress(data->remoteAddr);
  269. g_free(data);
  270. }
  271. static void qio_channel_socket_dgram_worker(QIOTask *task,
  272. gpointer opaque)
  273. {
  274. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
  275. struct QIOChannelSocketDGramWorkerData *data = opaque;
  276. Error *err = NULL;
  277. /* socket_dgram() blocks in DNS lookups, so we must use a thread */
  278. qio_channel_socket_dgram_sync(ioc, data->localAddr,
  279. data->remoteAddr, &err);
  280. qio_task_set_error(task, err);
  281. }
  282. void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
  283. SocketAddress *localAddr,
  284. SocketAddress *remoteAddr,
  285. QIOTaskFunc callback,
  286. gpointer opaque,
  287. GDestroyNotify destroy,
  288. GMainContext *context)
  289. {
  290. QIOTask *task = qio_task_new(
  291. OBJECT(ioc), callback, opaque, destroy);
  292. struct QIOChannelSocketDGramWorkerData *data = g_new0(
  293. struct QIOChannelSocketDGramWorkerData, 1);
  294. data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
  295. data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
  296. trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
  297. qio_task_run_in_thread(task,
  298. qio_channel_socket_dgram_worker,
  299. data,
  300. qio_channel_socket_dgram_worker_free,
  301. context);
  302. }
  303. QIOChannelSocket *
  304. qio_channel_socket_accept(QIOChannelSocket *ioc,
  305. Error **errp)
  306. {
  307. QIOChannelSocket *cioc;
  308. cioc = qio_channel_socket_new();
  309. cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
  310. cioc->localAddrLen = sizeof(ioc->localAddr);
  311. retry:
  312. trace_qio_channel_socket_accept(ioc);
  313. cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
  314. &cioc->remoteAddrLen);
  315. if (cioc->fd < 0) {
  316. if (errno == EINTR) {
  317. goto retry;
  318. }
  319. error_setg_errno(errp, errno, "Unable to accept connection");
  320. trace_qio_channel_socket_accept_fail(ioc);
  321. goto error;
  322. }
  323. if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
  324. &cioc->localAddrLen) < 0) {
  325. error_setg_errno(errp, errno,
  326. "Unable to query local socket address");
  327. goto error;
  328. }
  329. #ifndef WIN32
  330. if (cioc->localAddr.ss_family == AF_UNIX) {
  331. QIOChannel *ioc_local = QIO_CHANNEL(cioc);
  332. qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
  333. }
  334. #endif /* WIN32 */
  335. qio_channel_set_feature(QIO_CHANNEL(cioc),
  336. QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
  337. trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
  338. return cioc;
  339. error:
  340. object_unref(OBJECT(cioc));
  341. return NULL;
  342. }
  343. static void qio_channel_socket_init(Object *obj)
  344. {
  345. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
  346. ioc->fd = -1;
  347. }
  348. static void qio_channel_socket_finalize(Object *obj)
  349. {
  350. QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
  351. if (ioc->fd != -1) {
  352. QIOChannel *ioc_local = QIO_CHANNEL(ioc);
  353. if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
  354. Error *err = NULL;
  355. socket_listen_cleanup(ioc->fd, &err);
  356. if (err) {
  357. error_report_err(err);
  358. err = NULL;
  359. }
  360. }
  361. #ifdef WIN32
  362. qemu_socket_unselect(ioc->fd, NULL);
  363. #endif
  364. close(ioc->fd);
  365. ioc->fd = -1;
  366. }
  367. }
  368. #ifndef WIN32
  369. static void qio_channel_socket_copy_fds(struct msghdr *msg,
  370. int **fds, size_t *nfds)
  371. {
  372. struct cmsghdr *cmsg;
  373. *nfds = 0;
  374. *fds = NULL;
  375. for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
  376. int fd_size, i;
  377. int gotfds;
  378. if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
  379. cmsg->cmsg_level != SOL_SOCKET ||
  380. cmsg->cmsg_type != SCM_RIGHTS) {
  381. continue;
  382. }
  383. fd_size = cmsg->cmsg_len - CMSG_LEN(0);
  384. if (!fd_size) {
  385. continue;
  386. }
  387. gotfds = fd_size / sizeof(int);
  388. *fds = g_renew(int, *fds, *nfds + gotfds);
  389. memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
  390. for (i = 0; i < gotfds; i++) {
  391. int fd = (*fds)[*nfds + i];
  392. if (fd < 0) {
  393. continue;
  394. }
  395. /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
  396. qemu_socket_set_block(fd);
  397. #ifndef MSG_CMSG_CLOEXEC
  398. qemu_set_cloexec(fd);
  399. #endif
  400. }
  401. *nfds += gotfds;
  402. }
  403. }
  404. static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
  405. const struct iovec *iov,
  406. size_t niov,
  407. int **fds,
  408. size_t *nfds,
  409. int flags,
  410. Error **errp)
  411. {
  412. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  413. ssize_t ret;
  414. struct msghdr msg = { NULL, };
  415. char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
  416. int sflags = 0;
  417. memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
  418. msg.msg_iov = (struct iovec *)iov;
  419. msg.msg_iovlen = niov;
  420. if (fds && nfds) {
  421. msg.msg_control = control;
  422. msg.msg_controllen = sizeof(control);
  423. #ifdef MSG_CMSG_CLOEXEC
  424. sflags |= MSG_CMSG_CLOEXEC;
  425. #endif
  426. }
  427. if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
  428. sflags |= MSG_PEEK;
  429. }
  430. retry:
  431. ret = recvmsg(sioc->fd, &msg, sflags);
  432. if (ret < 0) {
  433. if (errno == EAGAIN) {
  434. return QIO_CHANNEL_ERR_BLOCK;
  435. }
  436. if (errno == EINTR) {
  437. goto retry;
  438. }
  439. error_setg_errno(errp, errno,
  440. "Unable to read from socket");
  441. return -1;
  442. }
  443. if (fds && nfds) {
  444. qio_channel_socket_copy_fds(&msg, fds, nfds);
  445. }
  446. return ret;
  447. }
  448. static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
  449. const struct iovec *iov,
  450. size_t niov,
  451. int *fds,
  452. size_t nfds,
  453. int flags,
  454. Error **errp)
  455. {
  456. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  457. ssize_t ret;
  458. struct msghdr msg = { NULL, };
  459. char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
  460. size_t fdsize = sizeof(int) * nfds;
  461. struct cmsghdr *cmsg;
  462. int sflags = 0;
  463. memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
  464. msg.msg_iov = (struct iovec *)iov;
  465. msg.msg_iovlen = niov;
  466. if (nfds) {
  467. if (nfds > SOCKET_MAX_FDS) {
  468. error_setg_errno(errp, EINVAL,
  469. "Only %d FDs can be sent, got %zu",
  470. SOCKET_MAX_FDS, nfds);
  471. return -1;
  472. }
  473. msg.msg_control = control;
  474. msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
  475. cmsg = CMSG_FIRSTHDR(&msg);
  476. cmsg->cmsg_len = CMSG_LEN(fdsize);
  477. cmsg->cmsg_level = SOL_SOCKET;
  478. cmsg->cmsg_type = SCM_RIGHTS;
  479. memcpy(CMSG_DATA(cmsg), fds, fdsize);
  480. }
  481. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  482. #ifdef QEMU_MSG_ZEROCOPY
  483. sflags = MSG_ZEROCOPY;
  484. #else
  485. /*
  486. * We expect QIOChannel class entry point to have
  487. * blocked this code path already
  488. */
  489. g_assert_not_reached();
  490. #endif
  491. }
  492. retry:
  493. ret = sendmsg(sioc->fd, &msg, sflags);
  494. if (ret <= 0) {
  495. switch (errno) {
  496. case EAGAIN:
  497. return QIO_CHANNEL_ERR_BLOCK;
  498. case EINTR:
  499. goto retry;
  500. case ENOBUFS:
  501. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  502. error_setg_errno(errp, errno,
  503. "Process can't lock enough memory for using MSG_ZEROCOPY");
  504. return -1;
  505. }
  506. break;
  507. }
  508. error_setg_errno(errp, errno,
  509. "Unable to write to socket");
  510. return -1;
  511. }
  512. if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  513. sioc->zero_copy_queued++;
  514. }
  515. return ret;
  516. }
  517. #else /* WIN32 */
  518. static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
  519. const struct iovec *iov,
  520. size_t niov,
  521. int **fds,
  522. size_t *nfds,
  523. int flags,
  524. Error **errp)
  525. {
  526. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  527. ssize_t done = 0;
  528. ssize_t i;
  529. int sflags = 0;
  530. if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
  531. sflags |= MSG_PEEK;
  532. }
  533. for (i = 0; i < niov; i++) {
  534. ssize_t ret;
  535. retry:
  536. ret = recv(sioc->fd,
  537. iov[i].iov_base,
  538. iov[i].iov_len,
  539. sflags);
  540. if (ret < 0) {
  541. if (errno == EAGAIN) {
  542. if (done) {
  543. return done;
  544. } else {
  545. return QIO_CHANNEL_ERR_BLOCK;
  546. }
  547. } else if (errno == EINTR) {
  548. goto retry;
  549. } else {
  550. error_setg_errno(errp, errno,
  551. "Unable to read from socket");
  552. return -1;
  553. }
  554. }
  555. done += ret;
  556. if (ret < iov[i].iov_len) {
  557. return done;
  558. }
  559. }
  560. return done;
  561. }
  562. static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
  563. const struct iovec *iov,
  564. size_t niov,
  565. int *fds,
  566. size_t nfds,
  567. int flags,
  568. Error **errp)
  569. {
  570. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  571. ssize_t done = 0;
  572. ssize_t i;
  573. for (i = 0; i < niov; i++) {
  574. ssize_t ret;
  575. retry:
  576. ret = send(sioc->fd,
  577. iov[i].iov_base,
  578. iov[i].iov_len,
  579. 0);
  580. if (ret < 0) {
  581. if (errno == EAGAIN) {
  582. if (done) {
  583. return done;
  584. } else {
  585. return QIO_CHANNEL_ERR_BLOCK;
  586. }
  587. } else if (errno == EINTR) {
  588. goto retry;
  589. } else {
  590. error_setg_errno(errp, errno,
  591. "Unable to write to socket");
  592. return -1;
  593. }
  594. }
  595. done += ret;
  596. if (ret < iov[i].iov_len) {
  597. return done;
  598. }
  599. }
  600. return done;
  601. }
  602. #endif /* WIN32 */
  603. #ifdef QEMU_MSG_ZEROCOPY
  604. static int qio_channel_socket_flush(QIOChannel *ioc,
  605. Error **errp)
  606. {
  607. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  608. struct msghdr msg = {};
  609. struct sock_extended_err *serr;
  610. struct cmsghdr *cm;
  611. char control[CMSG_SPACE(sizeof(*serr))];
  612. int received;
  613. int ret;
  614. if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
  615. return 0;
  616. }
  617. msg.msg_control = control;
  618. msg.msg_controllen = sizeof(control);
  619. memset(control, 0, sizeof(control));
  620. ret = 1;
  621. while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
  622. received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
  623. if (received < 0) {
  624. switch (errno) {
  625. case EAGAIN:
  626. /* Nothing on errqueue, wait until something is available */
  627. qio_channel_wait(ioc, G_IO_ERR);
  628. continue;
  629. case EINTR:
  630. continue;
  631. default:
  632. error_setg_errno(errp, errno,
  633. "Unable to read errqueue");
  634. return -1;
  635. }
  636. }
  637. cm = CMSG_FIRSTHDR(&msg);
  638. if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR &&
  639. cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
  640. error_setg_errno(errp, EPROTOTYPE,
  641. "Wrong cmsg in errqueue");
  642. return -1;
  643. }
  644. serr = (void *) CMSG_DATA(cm);
  645. if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
  646. error_setg_errno(errp, serr->ee_errno,
  647. "Error on socket");
  648. return -1;
  649. }
  650. if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
  651. error_setg_errno(errp, serr->ee_origin,
  652. "Error not from zero copy");
  653. return -1;
  654. }
  655. if (serr->ee_data < serr->ee_info) {
  656. error_setg_errno(errp, serr->ee_origin,
  657. "Wrong notification bounds");
  658. return -1;
  659. }
  660. /* No errors, count successfully finished sendmsg()*/
  661. sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
  662. /* If any sendmsg() succeeded using zero copy, return 0 at the end */
  663. if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
  664. ret = 0;
  665. }
  666. }
  667. return ret;
  668. }
  669. #endif /* QEMU_MSG_ZEROCOPY */
  670. static int
  671. qio_channel_socket_set_blocking(QIOChannel *ioc,
  672. bool enabled,
  673. Error **errp)
  674. {
  675. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  676. if (enabled) {
  677. qemu_socket_set_block(sioc->fd);
  678. } else {
  679. qemu_socket_set_nonblock(sioc->fd);
  680. }
  681. return 0;
  682. }
  683. static void
  684. qio_channel_socket_set_delay(QIOChannel *ioc,
  685. bool enabled)
  686. {
  687. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  688. int v = enabled ? 0 : 1;
  689. setsockopt(sioc->fd,
  690. IPPROTO_TCP, TCP_NODELAY,
  691. &v, sizeof(v));
  692. }
  693. static void
  694. qio_channel_socket_set_cork(QIOChannel *ioc,
  695. bool enabled)
  696. {
  697. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  698. int v = enabled ? 1 : 0;
  699. socket_set_cork(sioc->fd, v);
  700. }
  701. static int
  702. qio_channel_socket_get_peerpid(QIOChannel *ioc,
  703. unsigned int *pid,
  704. Error **errp)
  705. {
  706. #ifdef CONFIG_LINUX
  707. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  708. Error *err = NULL;
  709. socklen_t len = sizeof(struct ucred);
  710. struct ucred cred;
  711. if (getsockopt(sioc->fd,
  712. SOL_SOCKET, SO_PEERCRED,
  713. &cred, &len) == -1) {
  714. error_setg_errno(&err, errno, "Unable to get peer credentials");
  715. error_propagate(errp, err);
  716. *pid = -1;
  717. return -1;
  718. }
  719. *pid = (unsigned int)cred.pid;
  720. return 0;
  721. #else
  722. error_setg(errp, "Unsupported feature");
  723. *pid = -1;
  724. return -1;
  725. #endif
  726. }
  727. static int
  728. qio_channel_socket_close(QIOChannel *ioc,
  729. Error **errp)
  730. {
  731. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  732. int rc = 0;
  733. Error *err = NULL;
  734. if (sioc->fd != -1) {
  735. #ifdef WIN32
  736. qemu_socket_unselect(sioc->fd, NULL);
  737. #endif
  738. if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
  739. socket_listen_cleanup(sioc->fd, errp);
  740. }
  741. if (close(sioc->fd) < 0) {
  742. sioc->fd = -1;
  743. error_setg_errno(&err, errno, "Unable to close socket");
  744. error_propagate(errp, err);
  745. return -1;
  746. }
  747. sioc->fd = -1;
  748. }
  749. return rc;
  750. }
  751. static int
  752. qio_channel_socket_shutdown(QIOChannel *ioc,
  753. QIOChannelShutdown how,
  754. Error **errp)
  755. {
  756. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  757. int sockhow;
  758. switch (how) {
  759. case QIO_CHANNEL_SHUTDOWN_READ:
  760. sockhow = SHUT_RD;
  761. break;
  762. case QIO_CHANNEL_SHUTDOWN_WRITE:
  763. sockhow = SHUT_WR;
  764. break;
  765. case QIO_CHANNEL_SHUTDOWN_BOTH:
  766. default:
  767. sockhow = SHUT_RDWR;
  768. break;
  769. }
  770. if (shutdown(sioc->fd, sockhow) < 0) {
  771. error_setg_errno(errp, errno,
  772. "Unable to shutdown socket");
  773. return -1;
  774. }
  775. return 0;
  776. }
  777. static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
  778. AioContext *read_ctx,
  779. IOHandler *io_read,
  780. AioContext *write_ctx,
  781. IOHandler *io_write,
  782. void *opaque)
  783. {
  784. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  785. qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
  786. sioc->fd, write_ctx, io_write,
  787. opaque);
  788. }
  789. static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
  790. GIOCondition condition)
  791. {
  792. QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
  793. return qio_channel_create_socket_watch(ioc,
  794. sioc->fd,
  795. condition);
  796. }
  797. static void qio_channel_socket_class_init(ObjectClass *klass,
  798. void *class_data G_GNUC_UNUSED)
  799. {
  800. QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
  801. ioc_klass->io_writev = qio_channel_socket_writev;
  802. ioc_klass->io_readv = qio_channel_socket_readv;
  803. ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
  804. ioc_klass->io_close = qio_channel_socket_close;
  805. ioc_klass->io_shutdown = qio_channel_socket_shutdown;
  806. ioc_klass->io_set_cork = qio_channel_socket_set_cork;
  807. ioc_klass->io_set_delay = qio_channel_socket_set_delay;
  808. ioc_klass->io_create_watch = qio_channel_socket_create_watch;
  809. ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
  810. #ifdef QEMU_MSG_ZEROCOPY
  811. ioc_klass->io_flush = qio_channel_socket_flush;
  812. #endif
  813. ioc_klass->io_peerpid = qio_channel_socket_get_peerpid;
  814. }
  815. static const TypeInfo qio_channel_socket_info = {
  816. .parent = TYPE_QIO_CHANNEL,
  817. .name = TYPE_QIO_CHANNEL_SOCKET,
  818. .instance_size = sizeof(QIOChannelSocket),
  819. .instance_init = qio_channel_socket_init,
  820. .instance_finalize = qio_channel_socket_finalize,
  821. .class_init = qio_channel_socket_class_init,
  822. };
  823. static void qio_channel_socket_register_types(void)
  824. {
  825. type_register_static(&qio_channel_socket_info);
  826. }
  827. type_init(qio_channel_socket_register_types);