colo-compare.c 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504
  1. /*
  2. * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
  3. * (a.k.a. Fault Tolerance or Continuous Replication)
  4. *
  5. * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
  6. * Copyright (c) 2016 FUJITSU LIMITED
  7. * Copyright (c) 2016 Intel Corporation
  8. *
  9. * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10. *
  11. * This work is licensed under the terms of the GNU GPL, version 2 or
  12. * later. See the COPYING file in the top-level directory.
  13. */
  14. #include "qemu/osdep.h"
  15. #include "qemu-common.h"
  16. #include "qemu/error-report.h"
  17. #include "trace.h"
  18. #include "qapi/error.h"
  19. #include "net/net.h"
  20. #include "net/eth.h"
  21. #include "qom/object_interfaces.h"
  22. #include "qemu/iov.h"
  23. #include "qom/object.h"
  24. #include "net/queue.h"
  25. #include "chardev/char-fe.h"
  26. #include "qemu/sockets.h"
  27. #include "colo.h"
  28. #include "sysemu/iothread.h"
  29. #include "net/colo-compare.h"
  30. #include "migration/colo.h"
  31. #include "migration/migration.h"
  32. #include "util.h"
  33. #include "block/aio-wait.h"
  34. #include "qemu/coroutine.h"
  35. #define TYPE_COLO_COMPARE "colo-compare"
  36. #define COLO_COMPARE(obj) \
  37. OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
  38. static QTAILQ_HEAD(, CompareState) net_compares =
  39. QTAILQ_HEAD_INITIALIZER(net_compares);
  40. static NotifierList colo_compare_notifiers =
  41. NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  42. #define COMPARE_READ_LEN_MAX NET_BUFSIZE
  43. #define MAX_QUEUE_SIZE 1024
  44. #define COLO_COMPARE_FREE_PRIMARY 0x01
  45. #define COLO_COMPARE_FREE_SECONDARY 0x02
  46. #define REGULAR_PACKET_CHECK_MS 3000
  47. #define DEFAULT_TIME_OUT_MS 3000
  48. static QemuMutex colo_compare_mutex;
  49. static bool colo_compare_active;
  50. static QemuMutex event_mtx;
  51. static QemuCond event_complete_cond;
  52. static int event_unhandled_count;
  53. static uint32_t max_queue_size;
  54. /*
  55. * + CompareState ++
  56. * | |
  57. * +---------------+ +---------------+ +---------------+
  58. * | conn list + - > conn + ------- > conn + -- > ......
  59. * +---------------+ +---------------+ +---------------+
  60. * | | | | | |
  61. * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
  62. * |primary | |secondary |primary | |secondary
  63. * |packet | |packet + |packet | |packet +
  64. * +--------+ +--------+ +--------+ +--------+
  65. * | | | |
  66. * +---v----+ +---v----+ +---v----+ +---v----+
  67. * |primary | |secondary |primary | |secondary
  68. * |packet | |packet + |packet | |packet +
  69. * +--------+ +--------+ +--------+ +--------+
  70. * | | | |
  71. * +---v----+ +---v----+ +---v----+ +---v----+
  72. * |primary | |secondary |primary | |secondary
  73. * |packet | |packet + |packet | |packet +
  74. * +--------+ +--------+ +--------+ +--------+
  75. */
  76. typedef struct SendCo {
  77. Coroutine *co;
  78. struct CompareState *s;
  79. CharBackend *chr;
  80. GQueue send_list;
  81. bool notify_remote_frame;
  82. bool done;
  83. int ret;
  84. } SendCo;
  85. typedef struct SendEntry {
  86. uint32_t size;
  87. uint32_t vnet_hdr_len;
  88. uint8_t *buf;
  89. } SendEntry;
  90. typedef struct CompareState {
  91. Object parent;
  92. char *pri_indev;
  93. char *sec_indev;
  94. char *outdev;
  95. char *notify_dev;
  96. CharBackend chr_pri_in;
  97. CharBackend chr_sec_in;
  98. CharBackend chr_out;
  99. CharBackend chr_notify_dev;
  100. SocketReadState pri_rs;
  101. SocketReadState sec_rs;
  102. SocketReadState notify_rs;
  103. SendCo out_sendco;
  104. SendCo notify_sendco;
  105. bool vnet_hdr;
  106. uint32_t compare_timeout;
  107. uint32_t expired_scan_cycle;
  108. /*
  109. * Record the connection that through the NIC
  110. * Element type: Connection
  111. */
  112. GQueue conn_list;
  113. /* Record the connection without repetition */
  114. GHashTable *connection_track_table;
  115. IOThread *iothread;
  116. GMainContext *worker_context;
  117. QEMUTimer *packet_check_timer;
  118. QEMUBH *event_bh;
  119. enum colo_event event;
  120. QTAILQ_ENTRY(CompareState) next;
  121. } CompareState;
  122. typedef struct CompareClass {
  123. ObjectClass parent_class;
  124. } CompareClass;
  125. enum {
  126. PRIMARY_IN = 0,
  127. SECONDARY_IN,
  128. };
  129. static const char *colo_mode[] = {
  130. [PRIMARY_IN] = "primary",
  131. [SECONDARY_IN] = "secondary",
  132. };
  133. static int compare_chr_send(CompareState *s,
  134. uint8_t *buf,
  135. uint32_t size,
  136. uint32_t vnet_hdr_len,
  137. bool notify_remote_frame,
  138. bool zero_copy);
  139. static bool packet_matches_str(const char *str,
  140. const uint8_t *buf,
  141. uint32_t packet_len)
  142. {
  143. if (packet_len != strlen(str)) {
  144. return false;
  145. }
  146. return !memcmp(str, buf, strlen(str));
  147. }
  148. static void notify_remote_frame(CompareState *s)
  149. {
  150. char msg[] = "DO_CHECKPOINT";
  151. int ret = 0;
  152. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  153. if (ret < 0) {
  154. error_report("Notify Xen COLO-frame failed");
  155. }
  156. }
  157. static void colo_compare_inconsistency_notify(CompareState *s)
  158. {
  159. if (s->notify_dev) {
  160. notify_remote_frame(s);
  161. } else {
  162. notifier_list_notify(&colo_compare_notifiers,
  163. migrate_get_current());
  164. }
  165. }
  166. static gint seq_sorter(Packet *a, Packet *b, gpointer data)
  167. {
  168. struct tcp_hdr *atcp, *btcp;
  169. atcp = (struct tcp_hdr *)(a->transport_header);
  170. btcp = (struct tcp_hdr *)(b->transport_header);
  171. return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
  172. }
  173. static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
  174. {
  175. Packet *pkt = data;
  176. struct tcp_hdr *tcphd;
  177. tcphd = (struct tcp_hdr *)pkt->transport_header;
  178. pkt->tcp_seq = ntohl(tcphd->th_seq);
  179. pkt->tcp_ack = ntohl(tcphd->th_ack);
  180. *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
  181. pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
  182. + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
  183. pkt->payload_size = pkt->size - pkt->header_size;
  184. pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
  185. pkt->flags = tcphd->th_flags;
  186. }
  187. /*
  188. * Return 1 on success, if return 0 means the
  189. * packet will be dropped
  190. */
  191. static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
  192. {
  193. if (g_queue_get_length(queue) <= max_queue_size) {
  194. if (pkt->ip->ip_p == IPPROTO_TCP) {
  195. fill_pkt_tcp_info(pkt, max_ack);
  196. g_queue_insert_sorted(queue,
  197. pkt,
  198. (GCompareDataFunc)seq_sorter,
  199. NULL);
  200. } else {
  201. g_queue_push_tail(queue, pkt);
  202. }
  203. return 1;
  204. }
  205. return 0;
  206. }
  207. /*
  208. * Return 0 on success, if return -1 means the pkt
  209. * is unsupported(arp and ipv6) and will be sent later
  210. */
  211. static int packet_enqueue(CompareState *s, int mode, Connection **con)
  212. {
  213. ConnectionKey key;
  214. Packet *pkt = NULL;
  215. Connection *conn;
  216. int ret;
  217. if (mode == PRIMARY_IN) {
  218. pkt = packet_new(s->pri_rs.buf,
  219. s->pri_rs.packet_len,
  220. s->pri_rs.vnet_hdr_len);
  221. } else {
  222. pkt = packet_new(s->sec_rs.buf,
  223. s->sec_rs.packet_len,
  224. s->sec_rs.vnet_hdr_len);
  225. }
  226. if (parse_packet_early(pkt)) {
  227. packet_destroy(pkt, NULL);
  228. pkt = NULL;
  229. return -1;
  230. }
  231. fill_connection_key(pkt, &key);
  232. conn = connection_get(s->connection_track_table,
  233. &key,
  234. &s->conn_list);
  235. if (!conn->processing) {
  236. g_queue_push_tail(&s->conn_list, conn);
  237. conn->processing = true;
  238. }
  239. if (mode == PRIMARY_IN) {
  240. ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
  241. } else {
  242. ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
  243. }
  244. if (!ret) {
  245. trace_colo_compare_drop_packet(colo_mode[mode],
  246. "queue size too big, drop packet");
  247. packet_destroy(pkt, NULL);
  248. pkt = NULL;
  249. }
  250. *con = conn;
  251. return 0;
  252. }
  253. static inline bool after(uint32_t seq1, uint32_t seq2)
  254. {
  255. return (int32_t)(seq1 - seq2) > 0;
  256. }
  257. static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
  258. {
  259. int ret;
  260. ret = compare_chr_send(s,
  261. pkt->data,
  262. pkt->size,
  263. pkt->vnet_hdr_len,
  264. false,
  265. true);
  266. if (ret < 0) {
  267. error_report("colo send primary packet failed");
  268. }
  269. trace_colo_compare_main("packet same and release packet");
  270. packet_destroy_partial(pkt, NULL);
  271. }
  272. /*
  273. * The IP packets sent by primary and secondary
  274. * will be compared in here
  275. * TODO support ip fragment, Out-Of-Order
  276. * return: 0 means packet same
  277. * > 0 || < 0 means packet different
  278. */
  279. static int colo_compare_packet_payload(Packet *ppkt,
  280. Packet *spkt,
  281. uint16_t poffset,
  282. uint16_t soffset,
  283. uint16_t len)
  284. {
  285. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
  286. char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
  287. strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
  288. strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
  289. strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
  290. strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
  291. trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
  292. pri_ip_dst, spkt->size,
  293. sec_ip_src, sec_ip_dst);
  294. }
  295. return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
  296. }
  297. /*
  298. * return true means that the payload is consist and
  299. * need to make the next comparison, false means do
  300. * the checkpoint
  301. */
  302. static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
  303. int8_t *mark, uint32_t max_ack)
  304. {
  305. *mark = 0;
  306. if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
  307. if (!colo_compare_packet_payload(ppkt, spkt,
  308. ppkt->header_size, spkt->header_size,
  309. ppkt->payload_size)) {
  310. *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
  311. return true;
  312. }
  313. }
  314. /* one part of secondary packet payload still need to be compared */
  315. if (!after(ppkt->seq_end, spkt->seq_end)) {
  316. if (!colo_compare_packet_payload(ppkt, spkt,
  317. ppkt->header_size + ppkt->offset,
  318. spkt->header_size + spkt->offset,
  319. ppkt->payload_size - ppkt->offset)) {
  320. if (!after(ppkt->tcp_ack, max_ack)) {
  321. *mark = COLO_COMPARE_FREE_PRIMARY;
  322. spkt->offset += ppkt->payload_size - ppkt->offset;
  323. return true;
  324. } else {
  325. /* secondary guest hasn't ack the data, don't send
  326. * out this packet
  327. */
  328. return false;
  329. }
  330. }
  331. } else {
  332. /* primary packet is longer than secondary packet, compare
  333. * the same part and mark the primary packet offset
  334. */
  335. if (!colo_compare_packet_payload(ppkt, spkt,
  336. ppkt->header_size + ppkt->offset,
  337. spkt->header_size + spkt->offset,
  338. spkt->payload_size - spkt->offset)) {
  339. *mark = COLO_COMPARE_FREE_SECONDARY;
  340. ppkt->offset += spkt->payload_size - spkt->offset;
  341. return true;
  342. }
  343. }
  344. return false;
  345. }
  346. static void colo_compare_tcp(CompareState *s, Connection *conn)
  347. {
  348. Packet *ppkt = NULL, *spkt = NULL;
  349. int8_t mark;
  350. /*
  351. * If ppkt and spkt have the same payload, but ppkt's ACK
  352. * is greater than spkt's ACK, in this case we can not
  353. * send the ppkt because it will cause the secondary guest
  354. * to miss sending some data in the next. Therefore, we
  355. * record the maximum ACK in the current queue at both
  356. * primary side and secondary side. Only when the ack is
  357. * less than the smaller of the two maximum ack, then we
  358. * can ensure that the packet's payload is acknowledged by
  359. * primary and secondary.
  360. */
  361. uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
  362. pri:
  363. if (g_queue_is_empty(&conn->primary_list)) {
  364. return;
  365. }
  366. ppkt = g_queue_pop_head(&conn->primary_list);
  367. sec:
  368. if (g_queue_is_empty(&conn->secondary_list)) {
  369. g_queue_push_head(&conn->primary_list, ppkt);
  370. return;
  371. }
  372. spkt = g_queue_pop_head(&conn->secondary_list);
  373. if (ppkt->tcp_seq == ppkt->seq_end) {
  374. colo_release_primary_pkt(s, ppkt);
  375. ppkt = NULL;
  376. }
  377. if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
  378. trace_colo_compare_main("pri: this packet has compared");
  379. colo_release_primary_pkt(s, ppkt);
  380. ppkt = NULL;
  381. }
  382. if (spkt->tcp_seq == spkt->seq_end) {
  383. packet_destroy(spkt, NULL);
  384. if (!ppkt) {
  385. goto pri;
  386. } else {
  387. goto sec;
  388. }
  389. } else {
  390. if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
  391. trace_colo_compare_main("sec: this packet has compared");
  392. packet_destroy(spkt, NULL);
  393. if (!ppkt) {
  394. goto pri;
  395. } else {
  396. goto sec;
  397. }
  398. }
  399. if (!ppkt) {
  400. g_queue_push_head(&conn->secondary_list, spkt);
  401. goto pri;
  402. }
  403. }
  404. if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
  405. trace_colo_compare_tcp_info("pri",
  406. ppkt->tcp_seq, ppkt->tcp_ack,
  407. ppkt->header_size, ppkt->payload_size,
  408. ppkt->offset, ppkt->flags);
  409. trace_colo_compare_tcp_info("sec",
  410. spkt->tcp_seq, spkt->tcp_ack,
  411. spkt->header_size, spkt->payload_size,
  412. spkt->offset, spkt->flags);
  413. if (mark == COLO_COMPARE_FREE_PRIMARY) {
  414. conn->compare_seq = ppkt->seq_end;
  415. colo_release_primary_pkt(s, ppkt);
  416. g_queue_push_head(&conn->secondary_list, spkt);
  417. goto pri;
  418. }
  419. if (mark == COLO_COMPARE_FREE_SECONDARY) {
  420. conn->compare_seq = spkt->seq_end;
  421. packet_destroy(spkt, NULL);
  422. goto sec;
  423. }
  424. if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
  425. conn->compare_seq = ppkt->seq_end;
  426. colo_release_primary_pkt(s, ppkt);
  427. packet_destroy(spkt, NULL);
  428. goto pri;
  429. }
  430. } else {
  431. g_queue_push_head(&conn->primary_list, ppkt);
  432. g_queue_push_head(&conn->secondary_list, spkt);
  433. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
  434. qemu_hexdump((char *)ppkt->data, stderr,
  435. "colo-compare ppkt", ppkt->size);
  436. qemu_hexdump((char *)spkt->data, stderr,
  437. "colo-compare spkt", spkt->size);
  438. }
  439. colo_compare_inconsistency_notify(s);
  440. }
  441. }
  442. /*
  443. * Called from the compare thread on the primary
  444. * for compare udp packet
  445. */
  446. static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
  447. {
  448. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  449. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  450. trace_colo_compare_main("compare udp");
  451. /*
  452. * Because of ppkt and spkt are both in the same connection,
  453. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  454. * same with spkt. In addition, IP header's Identification is a random
  455. * field, we can handle it in IP fragmentation function later.
  456. * COLO just concern the response net packet payload from primary guest
  457. * and secondary guest are same or not, So we ignored all IP header include
  458. * other field like TOS,TTL,IP Checksum. we only need to compare
  459. * the ip payload here.
  460. */
  461. if (ppkt->size != spkt->size) {
  462. trace_colo_compare_main("UDP: payload size of packets are different");
  463. return -1;
  464. }
  465. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  466. ppkt->size - offset)) {
  467. trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
  468. trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
  469. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
  470. qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
  471. ppkt->size);
  472. qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
  473. spkt->size);
  474. }
  475. return -1;
  476. } else {
  477. return 0;
  478. }
  479. }
  480. /*
  481. * Called from the compare thread on the primary
  482. * for compare icmp packet
  483. */
  484. static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
  485. {
  486. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  487. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  488. trace_colo_compare_main("compare icmp");
  489. /*
  490. * Because of ppkt and spkt are both in the same connection,
  491. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  492. * same with spkt. In addition, IP header's Identification is a random
  493. * field, we can handle it in IP fragmentation function later.
  494. * COLO just concern the response net packet payload from primary guest
  495. * and secondary guest are same or not, So we ignored all IP header include
  496. * other field like TOS,TTL,IP Checksum. we only need to compare
  497. * the ip payload here.
  498. */
  499. if (ppkt->size != spkt->size) {
  500. trace_colo_compare_main("ICMP: payload size of packets are different");
  501. return -1;
  502. }
  503. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  504. ppkt->size - offset)) {
  505. trace_colo_compare_icmp_miscompare("primary pkt size",
  506. ppkt->size);
  507. trace_colo_compare_icmp_miscompare("Secondary pkt size",
  508. spkt->size);
  509. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
  510. qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
  511. ppkt->size);
  512. qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
  513. spkt->size);
  514. }
  515. return -1;
  516. } else {
  517. return 0;
  518. }
  519. }
  520. /*
  521. * Called from the compare thread on the primary
  522. * for compare other packet
  523. */
  524. static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
  525. {
  526. uint16_t offset = ppkt->vnet_hdr_len;
  527. trace_colo_compare_main("compare other");
  528. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
  529. char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
  530. strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
  531. strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
  532. strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
  533. strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
  534. trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
  535. pri_ip_dst, spkt->size,
  536. sec_ip_src, sec_ip_dst);
  537. }
  538. if (ppkt->size != spkt->size) {
  539. trace_colo_compare_main("Other: payload size of packets are different");
  540. return -1;
  541. }
  542. return colo_compare_packet_payload(ppkt, spkt, offset, offset,
  543. ppkt->size - offset);
  544. }
  545. static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
  546. {
  547. int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  548. if ((now - pkt->creation_ms) > (*check_time)) {
  549. trace_colo_old_packet_check_found(pkt->creation_ms);
  550. return 0;
  551. } else {
  552. return 1;
  553. }
  554. }
  555. void colo_compare_register_notifier(Notifier *notify)
  556. {
  557. notifier_list_add(&colo_compare_notifiers, notify);
  558. }
  559. void colo_compare_unregister_notifier(Notifier *notify)
  560. {
  561. notifier_remove(notify);
  562. }
  563. static int colo_old_packet_check_one_conn(Connection *conn,
  564. CompareState *s)
  565. {
  566. GList *result = NULL;
  567. result = g_queue_find_custom(&conn->primary_list,
  568. &s->compare_timeout,
  569. (GCompareFunc)colo_old_packet_check_one);
  570. if (result) {
  571. /* Do checkpoint will flush old packet */
  572. colo_compare_inconsistency_notify(s);
  573. return 0;
  574. }
  575. return 1;
  576. }
  577. /*
  578. * Look for old packets that the secondary hasn't matched,
  579. * if we have some then we have to checkpoint to wake
  580. * the secondary up.
  581. */
  582. static void colo_old_packet_check(void *opaque)
  583. {
  584. CompareState *s = opaque;
  585. /*
  586. * If we find one old packet, stop finding job and notify
  587. * COLO frame do checkpoint.
  588. */
  589. g_queue_find_custom(&s->conn_list, s,
  590. (GCompareFunc)colo_old_packet_check_one_conn);
  591. }
  592. static void colo_compare_packet(CompareState *s, Connection *conn,
  593. int (*HandlePacket)(Packet *spkt,
  594. Packet *ppkt))
  595. {
  596. Packet *pkt = NULL;
  597. GList *result = NULL;
  598. while (!g_queue_is_empty(&conn->primary_list) &&
  599. !g_queue_is_empty(&conn->secondary_list)) {
  600. pkt = g_queue_pop_head(&conn->primary_list);
  601. result = g_queue_find_custom(&conn->secondary_list,
  602. pkt, (GCompareFunc)HandlePacket);
  603. if (result) {
  604. colo_release_primary_pkt(s, pkt);
  605. g_queue_remove(&conn->secondary_list, result->data);
  606. } else {
  607. /*
  608. * If one packet arrive late, the secondary_list or
  609. * primary_list will be empty, so we can't compare it
  610. * until next comparison. If the packets in the list are
  611. * timeout, it will trigger a checkpoint request.
  612. */
  613. trace_colo_compare_main("packet different");
  614. g_queue_push_head(&conn->primary_list, pkt);
  615. colo_compare_inconsistency_notify(s);
  616. break;
  617. }
  618. }
  619. }
  620. /*
  621. * Called from the compare thread on the primary
  622. * for compare packet with secondary list of the
  623. * specified connection when a new packet was
  624. * queued to it.
  625. */
  626. static void colo_compare_connection(void *opaque, void *user_data)
  627. {
  628. CompareState *s = user_data;
  629. Connection *conn = opaque;
  630. switch (conn->ip_proto) {
  631. case IPPROTO_TCP:
  632. colo_compare_tcp(s, conn);
  633. break;
  634. case IPPROTO_UDP:
  635. colo_compare_packet(s, conn, colo_packet_compare_udp);
  636. break;
  637. case IPPROTO_ICMP:
  638. colo_compare_packet(s, conn, colo_packet_compare_icmp);
  639. break;
  640. default:
  641. colo_compare_packet(s, conn, colo_packet_compare_other);
  642. break;
  643. }
  644. }
  645. static void coroutine_fn _compare_chr_send(void *opaque)
  646. {
  647. SendCo *sendco = opaque;
  648. CompareState *s = sendco->s;
  649. int ret = 0;
  650. while (!g_queue_is_empty(&sendco->send_list)) {
  651. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  652. uint32_t len = htonl(entry->size);
  653. ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
  654. if (ret != sizeof(len)) {
  655. g_free(entry->buf);
  656. g_slice_free(SendEntry, entry);
  657. goto err;
  658. }
  659. if (!sendco->notify_remote_frame && s->vnet_hdr) {
  660. /*
  661. * We send vnet header len make other module(like filter-redirector)
  662. * know how to parse net packet correctly.
  663. */
  664. len = htonl(entry->vnet_hdr_len);
  665. ret = qemu_chr_fe_write_all(sendco->chr,
  666. (uint8_t *)&len,
  667. sizeof(len));
  668. if (ret != sizeof(len)) {
  669. g_free(entry->buf);
  670. g_slice_free(SendEntry, entry);
  671. goto err;
  672. }
  673. }
  674. ret = qemu_chr_fe_write_all(sendco->chr,
  675. (uint8_t *)entry->buf,
  676. entry->size);
  677. if (ret != entry->size) {
  678. g_free(entry->buf);
  679. g_slice_free(SendEntry, entry);
  680. goto err;
  681. }
  682. g_free(entry->buf);
  683. g_slice_free(SendEntry, entry);
  684. }
  685. sendco->ret = 0;
  686. goto out;
  687. err:
  688. while (!g_queue_is_empty(&sendco->send_list)) {
  689. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  690. g_free(entry->buf);
  691. g_slice_free(SendEntry, entry);
  692. }
  693. sendco->ret = ret < 0 ? ret : -EIO;
  694. out:
  695. sendco->co = NULL;
  696. sendco->done = true;
  697. aio_wait_kick();
  698. }
  699. static int compare_chr_send(CompareState *s,
  700. uint8_t *buf,
  701. uint32_t size,
  702. uint32_t vnet_hdr_len,
  703. bool notify_remote_frame,
  704. bool zero_copy)
  705. {
  706. SendCo *sendco;
  707. SendEntry *entry;
  708. if (notify_remote_frame) {
  709. sendco = &s->notify_sendco;
  710. } else {
  711. sendco = &s->out_sendco;
  712. }
  713. if (!size) {
  714. return 0;
  715. }
  716. entry = g_slice_new(SendEntry);
  717. entry->size = size;
  718. entry->vnet_hdr_len = vnet_hdr_len;
  719. if (zero_copy) {
  720. entry->buf = buf;
  721. } else {
  722. entry->buf = g_malloc(size);
  723. memcpy(entry->buf, buf, size);
  724. }
  725. g_queue_push_head(&sendco->send_list, entry);
  726. if (sendco->done) {
  727. sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
  728. sendco->done = false;
  729. qemu_coroutine_enter(sendco->co);
  730. if (sendco->done) {
  731. /* report early errors */
  732. return sendco->ret;
  733. }
  734. }
  735. /* assume success */
  736. return 0;
  737. }
  738. static int compare_chr_can_read(void *opaque)
  739. {
  740. return COMPARE_READ_LEN_MAX;
  741. }
  742. /*
  743. * Called from the main thread on the primary for packets
  744. * arriving over the socket from the primary.
  745. */
  746. static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
  747. {
  748. CompareState *s = COLO_COMPARE(opaque);
  749. int ret;
  750. ret = net_fill_rstate(&s->pri_rs, buf, size);
  751. if (ret == -1) {
  752. qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
  753. NULL, NULL, true);
  754. error_report("colo-compare primary_in error");
  755. }
  756. }
  757. /*
  758. * Called from the main thread on the primary for packets
  759. * arriving over the socket from the secondary.
  760. */
  761. static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
  762. {
  763. CompareState *s = COLO_COMPARE(opaque);
  764. int ret;
  765. ret = net_fill_rstate(&s->sec_rs, buf, size);
  766. if (ret == -1) {
  767. qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
  768. NULL, NULL, true);
  769. error_report("colo-compare secondary_in error");
  770. }
  771. }
  772. static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
  773. {
  774. CompareState *s = COLO_COMPARE(opaque);
  775. int ret;
  776. ret = net_fill_rstate(&s->notify_rs, buf, size);
  777. if (ret == -1) {
  778. qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
  779. NULL, NULL, true);
  780. error_report("colo-compare notify_dev error");
  781. }
  782. }
  783. /*
  784. * Check old packet regularly so it can watch for any packets
  785. * that the secondary hasn't produced equivalents of.
  786. */
  787. static void check_old_packet_regular(void *opaque)
  788. {
  789. CompareState *s = opaque;
  790. /* if have old packet we will notify checkpoint */
  791. colo_old_packet_check(s);
  792. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
  793. s->expired_scan_cycle);
  794. }
  795. /* Public API, Used for COLO frame to notify compare event */
  796. void colo_notify_compares_event(void *opaque, int event, Error **errp)
  797. {
  798. CompareState *s;
  799. qemu_mutex_lock(&colo_compare_mutex);
  800. if (!colo_compare_active) {
  801. qemu_mutex_unlock(&colo_compare_mutex);
  802. return;
  803. }
  804. qemu_mutex_lock(&event_mtx);
  805. QTAILQ_FOREACH(s, &net_compares, next) {
  806. s->event = event;
  807. qemu_bh_schedule(s->event_bh);
  808. event_unhandled_count++;
  809. }
  810. /* Wait all compare threads to finish handling this event */
  811. while (event_unhandled_count > 0) {
  812. qemu_cond_wait(&event_complete_cond, &event_mtx);
  813. }
  814. qemu_mutex_unlock(&event_mtx);
  815. qemu_mutex_unlock(&colo_compare_mutex);
  816. }
  817. static void colo_compare_timer_init(CompareState *s)
  818. {
  819. AioContext *ctx = iothread_get_aio_context(s->iothread);
  820. s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
  821. SCALE_MS, check_old_packet_regular,
  822. s);
  823. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
  824. s->expired_scan_cycle);
  825. }
  826. static void colo_compare_timer_del(CompareState *s)
  827. {
  828. if (s->packet_check_timer) {
  829. timer_del(s->packet_check_timer);
  830. timer_free(s->packet_check_timer);
  831. s->packet_check_timer = NULL;
  832. }
  833. }
  834. static void colo_flush_packets(void *opaque, void *user_data);
  835. static void colo_compare_handle_event(void *opaque)
  836. {
  837. CompareState *s = opaque;
  838. switch (s->event) {
  839. case COLO_EVENT_CHECKPOINT:
  840. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  841. break;
  842. case COLO_EVENT_FAILOVER:
  843. break;
  844. default:
  845. break;
  846. }
  847. qemu_mutex_lock(&event_mtx);
  848. assert(event_unhandled_count > 0);
  849. event_unhandled_count--;
  850. qemu_cond_broadcast(&event_complete_cond);
  851. qemu_mutex_unlock(&event_mtx);
  852. }
  853. static void colo_compare_iothread(CompareState *s)
  854. {
  855. AioContext *ctx = iothread_get_aio_context(s->iothread);
  856. object_ref(OBJECT(s->iothread));
  857. s->worker_context = iothread_get_g_main_context(s->iothread);
  858. qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
  859. compare_pri_chr_in, NULL, NULL,
  860. s, s->worker_context, true);
  861. qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
  862. compare_sec_chr_in, NULL, NULL,
  863. s, s->worker_context, true);
  864. if (s->notify_dev) {
  865. qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
  866. compare_notify_chr, NULL, NULL,
  867. s, s->worker_context, true);
  868. }
  869. colo_compare_timer_init(s);
  870. s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
  871. }
  872. static char *compare_get_pri_indev(Object *obj, Error **errp)
  873. {
  874. CompareState *s = COLO_COMPARE(obj);
  875. return g_strdup(s->pri_indev);
  876. }
  877. static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
  878. {
  879. CompareState *s = COLO_COMPARE(obj);
  880. g_free(s->pri_indev);
  881. s->pri_indev = g_strdup(value);
  882. }
  883. static char *compare_get_sec_indev(Object *obj, Error **errp)
  884. {
  885. CompareState *s = COLO_COMPARE(obj);
  886. return g_strdup(s->sec_indev);
  887. }
  888. static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
  889. {
  890. CompareState *s = COLO_COMPARE(obj);
  891. g_free(s->sec_indev);
  892. s->sec_indev = g_strdup(value);
  893. }
  894. static char *compare_get_outdev(Object *obj, Error **errp)
  895. {
  896. CompareState *s = COLO_COMPARE(obj);
  897. return g_strdup(s->outdev);
  898. }
  899. static void compare_set_outdev(Object *obj, const char *value, Error **errp)
  900. {
  901. CompareState *s = COLO_COMPARE(obj);
  902. g_free(s->outdev);
  903. s->outdev = g_strdup(value);
  904. }
  905. static bool compare_get_vnet_hdr(Object *obj, Error **errp)
  906. {
  907. CompareState *s = COLO_COMPARE(obj);
  908. return s->vnet_hdr;
  909. }
  910. static void compare_set_vnet_hdr(Object *obj,
  911. bool value,
  912. Error **errp)
  913. {
  914. CompareState *s = COLO_COMPARE(obj);
  915. s->vnet_hdr = value;
  916. }
  917. static char *compare_get_notify_dev(Object *obj, Error **errp)
  918. {
  919. CompareState *s = COLO_COMPARE(obj);
  920. return g_strdup(s->notify_dev);
  921. }
  922. static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
  923. {
  924. CompareState *s = COLO_COMPARE(obj);
  925. g_free(s->notify_dev);
  926. s->notify_dev = g_strdup(value);
  927. }
  928. static void compare_get_timeout(Object *obj, Visitor *v,
  929. const char *name, void *opaque,
  930. Error **errp)
  931. {
  932. CompareState *s = COLO_COMPARE(obj);
  933. uint32_t value = s->compare_timeout;
  934. visit_type_uint32(v, name, &value, errp);
  935. }
  936. static void compare_set_timeout(Object *obj, Visitor *v,
  937. const char *name, void *opaque,
  938. Error **errp)
  939. {
  940. CompareState *s = COLO_COMPARE(obj);
  941. uint32_t value;
  942. if (!visit_type_uint32(v, name, &value, errp)) {
  943. return;
  944. }
  945. if (!value) {
  946. error_setg(errp, "Property '%s.%s' requires a positive value",
  947. object_get_typename(obj), name);
  948. return;
  949. }
  950. s->compare_timeout = value;
  951. }
  952. static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
  953. const char *name, void *opaque,
  954. Error **errp)
  955. {
  956. CompareState *s = COLO_COMPARE(obj);
  957. uint32_t value = s->expired_scan_cycle;
  958. visit_type_uint32(v, name, &value, errp);
  959. }
  960. static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
  961. const char *name, void *opaque,
  962. Error **errp)
  963. {
  964. CompareState *s = COLO_COMPARE(obj);
  965. uint32_t value;
  966. if (!visit_type_uint32(v, name, &value, errp)) {
  967. return;
  968. }
  969. if (!value) {
  970. error_setg(errp, "Property '%s.%s' requires a positive value",
  971. object_get_typename(obj), name);
  972. return;
  973. }
  974. s->expired_scan_cycle = value;
  975. }
  976. static void get_max_queue_size(Object *obj, Visitor *v,
  977. const char *name, void *opaque,
  978. Error **errp)
  979. {
  980. uint32_t value = max_queue_size;
  981. visit_type_uint32(v, name, &value, errp);
  982. }
  983. static void set_max_queue_size(Object *obj, Visitor *v,
  984. const char *name, void *opaque,
  985. Error **errp)
  986. {
  987. Error *local_err = NULL;
  988. uint32_t value;
  989. visit_type_uint32(v, name, &value, &local_err);
  990. if (local_err) {
  991. goto out;
  992. }
  993. if (!value) {
  994. error_setg(&local_err, "Property '%s.%s' requires a positive value",
  995. object_get_typename(obj), name);
  996. goto out;
  997. }
  998. max_queue_size = value;
  999. out:
  1000. error_propagate(errp, local_err);
  1001. }
  1002. static void compare_pri_rs_finalize(SocketReadState *pri_rs)
  1003. {
  1004. CompareState *s = container_of(pri_rs, CompareState, pri_rs);
  1005. Connection *conn = NULL;
  1006. if (packet_enqueue(s, PRIMARY_IN, &conn)) {
  1007. trace_colo_compare_main("primary: unsupported packet in");
  1008. compare_chr_send(s,
  1009. pri_rs->buf,
  1010. pri_rs->packet_len,
  1011. pri_rs->vnet_hdr_len,
  1012. false,
  1013. false);
  1014. } else {
  1015. /* compare packet in the specified connection */
  1016. colo_compare_connection(conn, s);
  1017. }
  1018. }
  1019. static void compare_sec_rs_finalize(SocketReadState *sec_rs)
  1020. {
  1021. CompareState *s = container_of(sec_rs, CompareState, sec_rs);
  1022. Connection *conn = NULL;
  1023. if (packet_enqueue(s, SECONDARY_IN, &conn)) {
  1024. trace_colo_compare_main("secondary: unsupported packet in");
  1025. } else {
  1026. /* compare packet in the specified connection */
  1027. colo_compare_connection(conn, s);
  1028. }
  1029. }
  1030. static void compare_notify_rs_finalize(SocketReadState *notify_rs)
  1031. {
  1032. CompareState *s = container_of(notify_rs, CompareState, notify_rs);
  1033. const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
  1034. int ret;
  1035. if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
  1036. notify_rs->buf,
  1037. notify_rs->packet_len)) {
  1038. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  1039. if (ret < 0) {
  1040. error_report("Notify Xen COLO-frame INIT failed");
  1041. }
  1042. } else if (packet_matches_str("COLO_CHECKPOINT",
  1043. notify_rs->buf,
  1044. notify_rs->packet_len)) {
  1045. /* colo-compare do checkpoint, flush pri packet and remove sec packet */
  1046. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1047. } else {
  1048. error_report("COLO compare got unsupported instruction");
  1049. }
  1050. }
  1051. /*
  1052. * Return 0 is success.
  1053. * Return 1 is failed.
  1054. */
  1055. static int find_and_check_chardev(Chardev **chr,
  1056. char *chr_name,
  1057. Error **errp)
  1058. {
  1059. *chr = qemu_chr_find(chr_name);
  1060. if (*chr == NULL) {
  1061. error_setg(errp, "Device '%s' not found",
  1062. chr_name);
  1063. return 1;
  1064. }
  1065. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
  1066. error_setg(errp, "chardev \"%s\" is not reconnectable",
  1067. chr_name);
  1068. return 1;
  1069. }
  1070. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
  1071. error_setg(errp, "chardev \"%s\" cannot switch context",
  1072. chr_name);
  1073. return 1;
  1074. }
  1075. return 0;
  1076. }
  1077. /*
  1078. * Called from the main thread on the primary
  1079. * to setup colo-compare.
  1080. */
  1081. static void colo_compare_complete(UserCreatable *uc, Error **errp)
  1082. {
  1083. CompareState *s = COLO_COMPARE(uc);
  1084. Chardev *chr;
  1085. if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
  1086. error_setg(errp, "colo compare needs 'primary_in' ,"
  1087. "'secondary_in','outdev','iothread' property set");
  1088. return;
  1089. } else if (!strcmp(s->pri_indev, s->outdev) ||
  1090. !strcmp(s->sec_indev, s->outdev) ||
  1091. !strcmp(s->pri_indev, s->sec_indev)) {
  1092. error_setg(errp, "'indev' and 'outdev' could not be same "
  1093. "for compare module");
  1094. return;
  1095. }
  1096. if (!s->compare_timeout) {
  1097. /* Set default value to 3000 MS */
  1098. s->compare_timeout = DEFAULT_TIME_OUT_MS;
  1099. }
  1100. if (!s->expired_scan_cycle) {
  1101. /* Set default value to 3000 MS */
  1102. s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
  1103. }
  1104. if (!max_queue_size) {
  1105. /* Set default queue size to 1024 */
  1106. max_queue_size = MAX_QUEUE_SIZE;
  1107. }
  1108. if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
  1109. !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
  1110. return;
  1111. }
  1112. if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
  1113. !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
  1114. return;
  1115. }
  1116. if (find_and_check_chardev(&chr, s->outdev, errp) ||
  1117. !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
  1118. return;
  1119. }
  1120. net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
  1121. net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
  1122. /* Try to enable remote notify chardev, currently just for Xen COLO */
  1123. if (s->notify_dev) {
  1124. if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
  1125. !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
  1126. return;
  1127. }
  1128. net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
  1129. s->vnet_hdr);
  1130. }
  1131. s->out_sendco.s = s;
  1132. s->out_sendco.chr = &s->chr_out;
  1133. s->out_sendco.notify_remote_frame = false;
  1134. s->out_sendco.done = true;
  1135. g_queue_init(&s->out_sendco.send_list);
  1136. if (s->notify_dev) {
  1137. s->notify_sendco.s = s;
  1138. s->notify_sendco.chr = &s->chr_notify_dev;
  1139. s->notify_sendco.notify_remote_frame = true;
  1140. s->notify_sendco.done = true;
  1141. g_queue_init(&s->notify_sendco.send_list);
  1142. }
  1143. g_queue_init(&s->conn_list);
  1144. s->connection_track_table = g_hash_table_new_full(connection_key_hash,
  1145. connection_key_equal,
  1146. g_free,
  1147. connection_destroy);
  1148. colo_compare_iothread(s);
  1149. qemu_mutex_lock(&colo_compare_mutex);
  1150. if (!colo_compare_active) {
  1151. qemu_mutex_init(&event_mtx);
  1152. qemu_cond_init(&event_complete_cond);
  1153. colo_compare_active = true;
  1154. }
  1155. QTAILQ_INSERT_TAIL(&net_compares, s, next);
  1156. qemu_mutex_unlock(&colo_compare_mutex);
  1157. return;
  1158. }
  1159. static void colo_flush_packets(void *opaque, void *user_data)
  1160. {
  1161. CompareState *s = user_data;
  1162. Connection *conn = opaque;
  1163. Packet *pkt = NULL;
  1164. while (!g_queue_is_empty(&conn->primary_list)) {
  1165. pkt = g_queue_pop_head(&conn->primary_list);
  1166. compare_chr_send(s,
  1167. pkt->data,
  1168. pkt->size,
  1169. pkt->vnet_hdr_len,
  1170. false,
  1171. true);
  1172. packet_destroy_partial(pkt, NULL);
  1173. }
  1174. while (!g_queue_is_empty(&conn->secondary_list)) {
  1175. pkt = g_queue_pop_head(&conn->secondary_list);
  1176. packet_destroy(pkt, NULL);
  1177. }
  1178. }
  1179. static void colo_compare_class_init(ObjectClass *oc, void *data)
  1180. {
  1181. UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
  1182. ucc->complete = colo_compare_complete;
  1183. }
  1184. static void colo_compare_init(Object *obj)
  1185. {
  1186. CompareState *s = COLO_COMPARE(obj);
  1187. object_property_add_str(obj, "primary_in",
  1188. compare_get_pri_indev, compare_set_pri_indev);
  1189. object_property_add_str(obj, "secondary_in",
  1190. compare_get_sec_indev, compare_set_sec_indev);
  1191. object_property_add_str(obj, "outdev",
  1192. compare_get_outdev, compare_set_outdev);
  1193. object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
  1194. (Object **)&s->iothread,
  1195. object_property_allow_set_link,
  1196. OBJ_PROP_LINK_STRONG);
  1197. /* This parameter just for Xen COLO */
  1198. object_property_add_str(obj, "notify_dev",
  1199. compare_get_notify_dev, compare_set_notify_dev);
  1200. object_property_add(obj, "compare_timeout", "uint32",
  1201. compare_get_timeout,
  1202. compare_set_timeout, NULL, NULL);
  1203. object_property_add(obj, "expired_scan_cycle", "uint32",
  1204. compare_get_expired_scan_cycle,
  1205. compare_set_expired_scan_cycle, NULL, NULL);
  1206. object_property_add(obj, "max_queue_size", "uint32",
  1207. get_max_queue_size,
  1208. set_max_queue_size, NULL, NULL);
  1209. s->vnet_hdr = false;
  1210. object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
  1211. compare_set_vnet_hdr);
  1212. }
  1213. static void colo_compare_finalize(Object *obj)
  1214. {
  1215. CompareState *s = COLO_COMPARE(obj);
  1216. CompareState *tmp = NULL;
  1217. qemu_mutex_lock(&colo_compare_mutex);
  1218. QTAILQ_FOREACH(tmp, &net_compares, next) {
  1219. if (tmp == s) {
  1220. QTAILQ_REMOVE(&net_compares, s, next);
  1221. break;
  1222. }
  1223. }
  1224. if (QTAILQ_EMPTY(&net_compares)) {
  1225. colo_compare_active = false;
  1226. qemu_mutex_destroy(&event_mtx);
  1227. qemu_cond_destroy(&event_complete_cond);
  1228. }
  1229. qemu_mutex_unlock(&colo_compare_mutex);
  1230. qemu_chr_fe_deinit(&s->chr_pri_in, false);
  1231. qemu_chr_fe_deinit(&s->chr_sec_in, false);
  1232. qemu_chr_fe_deinit(&s->chr_out, false);
  1233. if (s->notify_dev) {
  1234. qemu_chr_fe_deinit(&s->chr_notify_dev, false);
  1235. }
  1236. colo_compare_timer_del(s);
  1237. qemu_bh_delete(s->event_bh);
  1238. AioContext *ctx = iothread_get_aio_context(s->iothread);
  1239. aio_context_acquire(ctx);
  1240. AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
  1241. if (s->notify_dev) {
  1242. AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
  1243. }
  1244. aio_context_release(ctx);
  1245. /* Release all unhandled packets after compare thead exited */
  1246. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1247. AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
  1248. g_queue_clear(&s->conn_list);
  1249. g_queue_clear(&s->out_sendco.send_list);
  1250. if (s->notify_dev) {
  1251. g_queue_clear(&s->notify_sendco.send_list);
  1252. }
  1253. if (s->connection_track_table) {
  1254. g_hash_table_destroy(s->connection_track_table);
  1255. }
  1256. object_unref(OBJECT(s->iothread));
  1257. g_free(s->pri_indev);
  1258. g_free(s->sec_indev);
  1259. g_free(s->outdev);
  1260. g_free(s->notify_dev);
  1261. }
  1262. static void __attribute__((__constructor__)) colo_compare_init_globals(void)
  1263. {
  1264. colo_compare_active = false;
  1265. qemu_mutex_init(&colo_compare_mutex);
  1266. }
  1267. static const TypeInfo colo_compare_info = {
  1268. .name = TYPE_COLO_COMPARE,
  1269. .parent = TYPE_OBJECT,
  1270. .instance_size = sizeof(CompareState),
  1271. .instance_init = colo_compare_init,
  1272. .instance_finalize = colo_compare_finalize,
  1273. .class_size = sizeof(CompareClass),
  1274. .class_init = colo_compare_class_init,
  1275. .interfaces = (InterfaceInfo[]) {
  1276. { TYPE_USER_CREATABLE },
  1277. { }
  1278. }
  1279. };
  1280. static void register_types(void)
  1281. {
  1282. type_register_static(&colo_compare_info);
  1283. }
  1284. type_init(register_types);