colo-compare.c 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492
  1. /*
  2. * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
  3. * (a.k.a. Fault Tolerance or Continuous Replication)
  4. *
  5. * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
  6. * Copyright (c) 2016 FUJITSU LIMITED
  7. * Copyright (c) 2016 Intel Corporation
  8. *
  9. * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10. *
  11. * This work is licensed under the terms of the GNU GPL, version 2 or
  12. * later. See the COPYING file in the top-level directory.
  13. */
  14. #include "qemu/osdep.h"
  15. #include "qemu/error-report.h"
  16. #include "trace.h"
  17. #include "qapi/error.h"
  18. #include "net/net.h"
  19. #include "net/eth.h"
  20. #include "qom/object_interfaces.h"
  21. #include "qemu/iov.h"
  22. #include "qom/object.h"
  23. #include "net/queue.h"
  24. #include "chardev/char-fe.h"
  25. #include "qemu/sockets.h"
  26. #include "colo.h"
  27. #include "system/iothread.h"
  28. #include "net/colo-compare.h"
  29. #include "migration/colo.h"
  30. #include "util.h"
  31. #include "block/aio-wait.h"
  32. #include "qemu/coroutine.h"
  33. #define TYPE_COLO_COMPARE "colo-compare"
  34. typedef struct CompareState CompareState;
  35. DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
  36. TYPE_COLO_COMPARE)
  37. static QTAILQ_HEAD(, CompareState) net_compares =
  38. QTAILQ_HEAD_INITIALIZER(net_compares);
  39. static NotifierList colo_compare_notifiers =
  40. NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  41. #define COMPARE_READ_LEN_MAX NET_BUFSIZE
  42. #define MAX_QUEUE_SIZE 1024
  43. #define COLO_COMPARE_FREE_PRIMARY 0x01
  44. #define COLO_COMPARE_FREE_SECONDARY 0x02
  45. #define REGULAR_PACKET_CHECK_MS 1000
  46. #define DEFAULT_TIME_OUT_MS 3000
  47. /* #define DEBUG_COLO_PACKETS */
  48. static QemuMutex colo_compare_mutex;
  49. static bool colo_compare_active;
  50. static QemuMutex event_mtx;
  51. static QemuCond event_complete_cond;
  52. static int event_unhandled_count;
  53. static uint32_t max_queue_size;
  54. /*
  55. * + CompareState ++
  56. * | |
  57. * +---------------+ +---------------+ +---------------+
  58. * | conn list + - > conn + ------- > conn + -- > ......
  59. * +---------------+ +---------------+ +---------------+
  60. * | | | | | |
  61. * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
  62. * |primary | |secondary |primary | |secondary
  63. * |packet | |packet + |packet | |packet +
  64. * +--------+ +--------+ +--------+ +--------+
  65. * | | | |
  66. * +---v----+ +---v----+ +---v----+ +---v----+
  67. * |primary | |secondary |primary | |secondary
  68. * |packet | |packet + |packet | |packet +
  69. * +--------+ +--------+ +--------+ +--------+
  70. * | | | |
  71. * +---v----+ +---v----+ +---v----+ +---v----+
  72. * |primary | |secondary |primary | |secondary
  73. * |packet | |packet + |packet | |packet +
  74. * +--------+ +--------+ +--------+ +--------+
  75. */
  76. typedef struct SendCo {
  77. Coroutine *co;
  78. struct CompareState *s;
  79. CharBackend *chr;
  80. GQueue send_list;
  81. bool notify_remote_frame;
  82. bool done;
  83. int ret;
  84. } SendCo;
  85. typedef struct SendEntry {
  86. uint32_t size;
  87. uint32_t vnet_hdr_len;
  88. uint8_t *buf;
  89. } SendEntry;
  90. struct CompareState {
  91. Object parent;
  92. char *pri_indev;
  93. char *sec_indev;
  94. char *outdev;
  95. char *notify_dev;
  96. CharBackend chr_pri_in;
  97. CharBackend chr_sec_in;
  98. CharBackend chr_out;
  99. CharBackend chr_notify_dev;
  100. SocketReadState pri_rs;
  101. SocketReadState sec_rs;
  102. SocketReadState notify_rs;
  103. SendCo out_sendco;
  104. SendCo notify_sendco;
  105. bool vnet_hdr;
  106. uint64_t compare_timeout;
  107. uint32_t expired_scan_cycle;
  108. /*
  109. * Record the connection that through the NIC
  110. * Element type: Connection
  111. */
  112. GQueue conn_list;
  113. /* Record the connection without repetition */
  114. GHashTable *connection_track_table;
  115. IOThread *iothread;
  116. GMainContext *worker_context;
  117. QEMUTimer *packet_check_timer;
  118. QEMUBH *event_bh;
  119. enum colo_event event;
  120. QTAILQ_ENTRY(CompareState) next;
  121. };
  122. typedef struct CompareClass {
  123. ObjectClass parent_class;
  124. } CompareClass;
  125. enum {
  126. PRIMARY_IN = 0,
  127. SECONDARY_IN,
  128. };
  129. static const char *colo_mode[] = {
  130. [PRIMARY_IN] = "primary",
  131. [SECONDARY_IN] = "secondary",
  132. };
  133. static int compare_chr_send(CompareState *s,
  134. uint8_t *buf,
  135. uint32_t size,
  136. uint32_t vnet_hdr_len,
  137. bool notify_remote_frame,
  138. bool zero_copy);
  139. static bool packet_matches_str(const char *str,
  140. const uint8_t *buf,
  141. uint32_t packet_len)
  142. {
  143. if (packet_len != strlen(str)) {
  144. return false;
  145. }
  146. return !memcmp(str, buf, packet_len);
  147. }
  148. static void notify_remote_frame(CompareState *s)
  149. {
  150. char msg[] = "DO_CHECKPOINT";
  151. int ret = 0;
  152. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  153. if (ret < 0) {
  154. error_report("Notify Xen COLO-frame failed");
  155. }
  156. }
  157. static void colo_compare_inconsistency_notify(CompareState *s)
  158. {
  159. if (s->notify_dev) {
  160. notify_remote_frame(s);
  161. } else {
  162. notifier_list_notify(&colo_compare_notifiers,
  163. NULL);
  164. }
  165. }
  166. /* Use restricted to colo_insert_packet() */
  167. static gint seq_sorter(Packet *a, Packet *b, gpointer data)
  168. {
  169. return b->tcp_seq - a->tcp_seq;
  170. }
  171. static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
  172. {
  173. Packet *pkt = data;
  174. struct tcp_hdr *tcphd;
  175. tcphd = (struct tcp_hdr *)pkt->transport_header;
  176. pkt->tcp_seq = ntohl(tcphd->th_seq);
  177. pkt->tcp_ack = ntohl(tcphd->th_ack);
  178. /* Need to consider ACK will bigger than uint32_t MAX */
  179. *max_ack = pkt->tcp_ack - *max_ack > 0 ? pkt->tcp_ack : *max_ack;
  180. pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
  181. + (tcphd->th_off << 2);
  182. pkt->payload_size = pkt->size - pkt->header_size;
  183. pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
  184. pkt->flags = tcphd->th_flags;
  185. }
  186. /*
  187. * Return 1 on success, if return 0 means the
  188. * packet will be dropped
  189. */
  190. static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
  191. {
  192. if (g_queue_get_length(queue) <= max_queue_size) {
  193. if (pkt->ip->ip_p == IPPROTO_TCP) {
  194. fill_pkt_tcp_info(pkt, max_ack);
  195. g_queue_insert_sorted(queue,
  196. pkt,
  197. (GCompareDataFunc)seq_sorter,
  198. NULL);
  199. } else {
  200. g_queue_push_tail(queue, pkt);
  201. }
  202. return 1;
  203. }
  204. return 0;
  205. }
  206. /*
  207. * Return 0 on success, if return -1 means the pkt
  208. * is unsupported(arp and ipv6) and will be sent later
  209. */
  210. static int packet_enqueue(CompareState *s, int mode, Connection **con)
  211. {
  212. ConnectionKey key;
  213. Packet *pkt = NULL;
  214. Connection *conn;
  215. int ret;
  216. if (mode == PRIMARY_IN) {
  217. pkt = packet_new(s->pri_rs.buf,
  218. s->pri_rs.packet_len,
  219. s->pri_rs.vnet_hdr_len);
  220. } else {
  221. pkt = packet_new(s->sec_rs.buf,
  222. s->sec_rs.packet_len,
  223. s->sec_rs.vnet_hdr_len);
  224. }
  225. if (parse_packet_early(pkt)) {
  226. packet_destroy(pkt, NULL);
  227. pkt = NULL;
  228. return -1;
  229. }
  230. fill_connection_key(pkt, &key, false);
  231. conn = connection_get(s->connection_track_table,
  232. &key,
  233. &s->conn_list);
  234. if (!conn->processing) {
  235. g_queue_push_tail(&s->conn_list, conn);
  236. conn->processing = true;
  237. }
  238. if (mode == PRIMARY_IN) {
  239. ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
  240. } else {
  241. ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
  242. }
  243. if (!ret) {
  244. trace_colo_compare_drop_packet(colo_mode[mode],
  245. "queue size too big, drop packet");
  246. packet_destroy(pkt, NULL);
  247. pkt = NULL;
  248. }
  249. *con = conn;
  250. return 0;
  251. }
  252. static inline bool after(uint32_t seq1, uint32_t seq2)
  253. {
  254. return (int32_t)(seq1 - seq2) > 0;
  255. }
  256. static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
  257. {
  258. int ret;
  259. ret = compare_chr_send(s,
  260. pkt->data,
  261. pkt->size,
  262. pkt->vnet_hdr_len,
  263. false,
  264. true);
  265. if (ret < 0) {
  266. error_report("colo send primary packet failed");
  267. }
  268. trace_colo_compare_main("packet same and release packet");
  269. packet_destroy_partial(pkt, NULL);
  270. }
  271. /*
  272. * The IP packets sent by primary and secondary
  273. * will be compared in here
  274. * TODO support ip fragment, Out-Of-Order
  275. * return: 0 means packet same
  276. * > 0 || < 0 means packet different
  277. */
  278. static int colo_compare_packet_payload(Packet *ppkt,
  279. Packet *spkt,
  280. uint16_t poffset,
  281. uint16_t soffset,
  282. uint16_t len)
  283. {
  284. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
  285. char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
  286. strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
  287. strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
  288. strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
  289. strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
  290. trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
  291. pri_ip_dst, spkt->size,
  292. sec_ip_src, sec_ip_dst);
  293. }
  294. return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
  295. }
  296. /*
  297. * return true means that the payload is consist and
  298. * need to make the next comparison, false means do
  299. * the checkpoint
  300. */
  301. static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
  302. int8_t *mark, uint32_t max_ack)
  303. {
  304. *mark = 0;
  305. if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
  306. if (!colo_compare_packet_payload(ppkt, spkt,
  307. ppkt->header_size, spkt->header_size,
  308. ppkt->payload_size)) {
  309. *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
  310. return true;
  311. }
  312. }
  313. /* one part of secondary packet payload still need to be compared */
  314. if (!after(ppkt->seq_end, spkt->seq_end)) {
  315. if (!colo_compare_packet_payload(ppkt, spkt,
  316. ppkt->header_size + ppkt->offset,
  317. spkt->header_size + spkt->offset,
  318. ppkt->payload_size - ppkt->offset)) {
  319. if (!after(ppkt->tcp_ack, max_ack)) {
  320. *mark = COLO_COMPARE_FREE_PRIMARY;
  321. spkt->offset += ppkt->payload_size - ppkt->offset;
  322. return true;
  323. } else {
  324. /* secondary guest hasn't ack the data, don't send
  325. * out this packet
  326. */
  327. return false;
  328. }
  329. }
  330. } else {
  331. /* primary packet is longer than secondary packet, compare
  332. * the same part and mark the primary packet offset
  333. */
  334. if (!colo_compare_packet_payload(ppkt, spkt,
  335. ppkt->header_size + ppkt->offset,
  336. spkt->header_size + spkt->offset,
  337. spkt->payload_size - spkt->offset)) {
  338. *mark = COLO_COMPARE_FREE_SECONDARY;
  339. ppkt->offset += spkt->payload_size - spkt->offset;
  340. return true;
  341. }
  342. }
  343. return false;
  344. }
  345. static void colo_compare_tcp(CompareState *s, Connection *conn)
  346. {
  347. Packet *ppkt = NULL, *spkt = NULL;
  348. int8_t mark;
  349. /*
  350. * If ppkt and spkt have the same payload, but ppkt's ACK
  351. * is greater than spkt's ACK, in this case we can not
  352. * send the ppkt because it will cause the secondary guest
  353. * to miss sending some data in the next. Therefore, we
  354. * record the maximum ACK in the current queue at both
  355. * primary side and secondary side. Only when the ack is
  356. * less than the smaller of the two maximum ack, then we
  357. * can ensure that the packet's payload is acknowledged by
  358. * primary and secondary.
  359. */
  360. uint32_t min_ack = MIN(conn->pack, conn->sack);
  361. pri:
  362. if (g_queue_is_empty(&conn->primary_list)) {
  363. return;
  364. }
  365. ppkt = g_queue_pop_tail(&conn->primary_list);
  366. sec:
  367. if (g_queue_is_empty(&conn->secondary_list)) {
  368. g_queue_push_tail(&conn->primary_list, ppkt);
  369. return;
  370. }
  371. spkt = g_queue_pop_tail(&conn->secondary_list);
  372. if (ppkt->tcp_seq == ppkt->seq_end) {
  373. colo_release_primary_pkt(s, ppkt);
  374. ppkt = NULL;
  375. }
  376. if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
  377. trace_colo_compare_main("pri: this packet has compared");
  378. colo_release_primary_pkt(s, ppkt);
  379. ppkt = NULL;
  380. }
  381. if (spkt->tcp_seq == spkt->seq_end) {
  382. packet_destroy(spkt, NULL);
  383. if (!ppkt) {
  384. goto pri;
  385. } else {
  386. goto sec;
  387. }
  388. } else {
  389. if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
  390. trace_colo_compare_main("sec: this packet has compared");
  391. packet_destroy(spkt, NULL);
  392. if (!ppkt) {
  393. goto pri;
  394. } else {
  395. goto sec;
  396. }
  397. }
  398. if (!ppkt) {
  399. g_queue_push_tail(&conn->secondary_list, spkt);
  400. goto pri;
  401. }
  402. }
  403. if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
  404. trace_colo_compare_tcp_info("pri",
  405. ppkt->tcp_seq, ppkt->tcp_ack,
  406. ppkt->header_size, ppkt->payload_size,
  407. ppkt->offset, ppkt->flags);
  408. trace_colo_compare_tcp_info("sec",
  409. spkt->tcp_seq, spkt->tcp_ack,
  410. spkt->header_size, spkt->payload_size,
  411. spkt->offset, spkt->flags);
  412. if (mark == COLO_COMPARE_FREE_PRIMARY) {
  413. conn->compare_seq = ppkt->seq_end;
  414. colo_release_primary_pkt(s, ppkt);
  415. g_queue_push_tail(&conn->secondary_list, spkt);
  416. goto pri;
  417. } else if (mark == COLO_COMPARE_FREE_SECONDARY) {
  418. conn->compare_seq = spkt->seq_end;
  419. packet_destroy(spkt, NULL);
  420. goto sec;
  421. } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
  422. conn->compare_seq = ppkt->seq_end;
  423. colo_release_primary_pkt(s, ppkt);
  424. packet_destroy(spkt, NULL);
  425. goto pri;
  426. }
  427. } else {
  428. g_queue_push_tail(&conn->primary_list, ppkt);
  429. g_queue_push_tail(&conn->secondary_list, spkt);
  430. #ifdef DEBUG_COLO_PACKETS
  431. qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
  432. qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
  433. #endif
  434. colo_compare_inconsistency_notify(s);
  435. }
  436. }
  437. /*
  438. * Called from the compare thread on the primary
  439. * for compare udp packet
  440. */
  441. static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
  442. {
  443. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  444. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  445. trace_colo_compare_main("compare udp");
  446. /*
  447. * Because of ppkt and spkt are both in the same connection,
  448. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  449. * same with spkt. In addition, IP header's Identification is a random
  450. * field, we can handle it in IP fragmentation function later.
  451. * COLO just concern the response net packet payload from primary guest
  452. * and secondary guest are same or not, So we ignored all IP header include
  453. * other field like TOS,TTL,IP Checksum. we only need to compare
  454. * the ip payload here.
  455. */
  456. if (ppkt->size != spkt->size) {
  457. trace_colo_compare_main("UDP: payload size of packets are different");
  458. return -1;
  459. }
  460. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  461. ppkt->size - offset)) {
  462. trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
  463. trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
  464. #ifdef DEBUG_COLO_PACKETS
  465. qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
  466. qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
  467. #endif
  468. return -1;
  469. } else {
  470. return 0;
  471. }
  472. }
  473. /*
  474. * Called from the compare thread on the primary
  475. * for compare icmp packet
  476. */
  477. static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
  478. {
  479. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  480. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  481. trace_colo_compare_main("compare icmp");
  482. /*
  483. * Because of ppkt and spkt are both in the same connection,
  484. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  485. * same with spkt. In addition, IP header's Identification is a random
  486. * field, we can handle it in IP fragmentation function later.
  487. * COLO just concern the response net packet payload from primary guest
  488. * and secondary guest are same or not, So we ignored all IP header include
  489. * other field like TOS,TTL,IP Checksum. we only need to compare
  490. * the ip payload here.
  491. */
  492. if (ppkt->size != spkt->size) {
  493. trace_colo_compare_main("ICMP: payload size of packets are different");
  494. return -1;
  495. }
  496. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  497. ppkt->size - offset)) {
  498. trace_colo_compare_icmp_miscompare("primary pkt size",
  499. ppkt->size);
  500. trace_colo_compare_icmp_miscompare("Secondary pkt size",
  501. spkt->size);
  502. #ifdef DEBUG_COLO_PACKETS
  503. qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
  504. qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
  505. #endif
  506. return -1;
  507. } else {
  508. return 0;
  509. }
  510. }
  511. /*
  512. * Called from the compare thread on the primary
  513. * for compare other packet
  514. */
  515. static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
  516. {
  517. uint16_t offset = ppkt->vnet_hdr_len;
  518. trace_colo_compare_main("compare other");
  519. if (ppkt->size != spkt->size) {
  520. trace_colo_compare_main("Other: payload size of packets are different");
  521. return -1;
  522. }
  523. return colo_compare_packet_payload(ppkt, spkt, offset, offset,
  524. ppkt->size - offset);
  525. }
  526. static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
  527. {
  528. int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  529. if ((now - pkt->creation_ms) > (*check_time)) {
  530. trace_colo_old_packet_check_found(pkt->creation_ms);
  531. return 0;
  532. } else {
  533. return 1;
  534. }
  535. }
  536. void colo_compare_register_notifier(Notifier *notify)
  537. {
  538. notifier_list_add(&colo_compare_notifiers, notify);
  539. }
  540. void colo_compare_unregister_notifier(Notifier *notify)
  541. {
  542. notifier_remove(notify);
  543. }
  544. static int colo_old_packet_check_one_conn(Connection *conn,
  545. CompareState *s)
  546. {
  547. if (!g_queue_is_empty(&conn->primary_list)) {
  548. if (g_queue_find_custom(&conn->primary_list,
  549. &s->compare_timeout,
  550. (GCompareFunc)colo_old_packet_check_one))
  551. goto out;
  552. }
  553. if (!g_queue_is_empty(&conn->secondary_list)) {
  554. if (g_queue_find_custom(&conn->secondary_list,
  555. &s->compare_timeout,
  556. (GCompareFunc)colo_old_packet_check_one))
  557. goto out;
  558. }
  559. return 1;
  560. out:
  561. /* Do checkpoint will flush old packet */
  562. colo_compare_inconsistency_notify(s);
  563. return 0;
  564. }
  565. /*
  566. * Look for old packets that the secondary hasn't matched,
  567. * if we have some then we have to checkpoint to wake
  568. * the secondary up.
  569. */
  570. static void colo_old_packet_check(void *opaque)
  571. {
  572. CompareState *s = opaque;
  573. /*
  574. * If we find one old packet, stop finding job and notify
  575. * COLO frame do checkpoint.
  576. */
  577. g_queue_find_custom(&s->conn_list, s,
  578. (GCompareFunc)colo_old_packet_check_one_conn);
  579. }
  580. static void colo_compare_packet(CompareState *s, Connection *conn,
  581. int (*HandlePacket)(Packet *spkt,
  582. Packet *ppkt))
  583. {
  584. Packet *pkt = NULL;
  585. GList *result = NULL;
  586. while (!g_queue_is_empty(&conn->primary_list) &&
  587. !g_queue_is_empty(&conn->secondary_list)) {
  588. pkt = g_queue_pop_tail(&conn->primary_list);
  589. result = g_queue_find_custom(&conn->secondary_list,
  590. pkt, (GCompareFunc)HandlePacket);
  591. if (result) {
  592. colo_release_primary_pkt(s, pkt);
  593. packet_destroy(result->data, NULL);
  594. g_queue_delete_link(&conn->secondary_list, result);
  595. } else {
  596. /*
  597. * If one packet arrive late, the secondary_list or
  598. * primary_list will be empty, so we can't compare it
  599. * until next comparison. If the packets in the list are
  600. * timeout, it will trigger a checkpoint request.
  601. */
  602. trace_colo_compare_main("packet different");
  603. g_queue_push_tail(&conn->primary_list, pkt);
  604. colo_compare_inconsistency_notify(s);
  605. break;
  606. }
  607. }
  608. }
  609. /*
  610. * Called from the compare thread on the primary
  611. * for compare packet with secondary list of the
  612. * specified connection when a new packet was
  613. * queued to it.
  614. */
  615. static void colo_compare_connection(void *opaque, void *user_data)
  616. {
  617. CompareState *s = user_data;
  618. Connection *conn = opaque;
  619. switch (conn->ip_proto) {
  620. case IPPROTO_TCP:
  621. colo_compare_tcp(s, conn);
  622. break;
  623. case IPPROTO_UDP:
  624. colo_compare_packet(s, conn, colo_packet_compare_udp);
  625. break;
  626. case IPPROTO_ICMP:
  627. colo_compare_packet(s, conn, colo_packet_compare_icmp);
  628. break;
  629. default:
  630. colo_compare_packet(s, conn, colo_packet_compare_other);
  631. break;
  632. }
  633. }
  634. static void coroutine_fn _compare_chr_send(void *opaque)
  635. {
  636. SendCo *sendco = opaque;
  637. CompareState *s = sendco->s;
  638. int ret = 0;
  639. while (!g_queue_is_empty(&sendco->send_list)) {
  640. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  641. uint32_t len = htonl(entry->size);
  642. ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
  643. if (ret != sizeof(len)) {
  644. g_free(entry->buf);
  645. g_slice_free(SendEntry, entry);
  646. goto err;
  647. }
  648. if (!sendco->notify_remote_frame && s->vnet_hdr) {
  649. /*
  650. * We send vnet header len make other module(like filter-redirector)
  651. * know how to parse net packet correctly.
  652. */
  653. len = htonl(entry->vnet_hdr_len);
  654. ret = qemu_chr_fe_write_all(sendco->chr,
  655. (uint8_t *)&len,
  656. sizeof(len));
  657. if (ret != sizeof(len)) {
  658. g_free(entry->buf);
  659. g_slice_free(SendEntry, entry);
  660. goto err;
  661. }
  662. }
  663. ret = qemu_chr_fe_write_all(sendco->chr,
  664. (uint8_t *)entry->buf,
  665. entry->size);
  666. if (ret != entry->size) {
  667. g_free(entry->buf);
  668. g_slice_free(SendEntry, entry);
  669. goto err;
  670. }
  671. g_free(entry->buf);
  672. g_slice_free(SendEntry, entry);
  673. }
  674. sendco->ret = 0;
  675. goto out;
  676. err:
  677. while (!g_queue_is_empty(&sendco->send_list)) {
  678. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  679. g_free(entry->buf);
  680. g_slice_free(SendEntry, entry);
  681. }
  682. sendco->ret = ret < 0 ? ret : -EIO;
  683. out:
  684. sendco->co = NULL;
  685. sendco->done = true;
  686. aio_wait_kick();
  687. }
  688. static int compare_chr_send(CompareState *s,
  689. uint8_t *buf,
  690. uint32_t size,
  691. uint32_t vnet_hdr_len,
  692. bool notify_remote_frame,
  693. bool zero_copy)
  694. {
  695. SendCo *sendco;
  696. SendEntry *entry;
  697. if (notify_remote_frame) {
  698. sendco = &s->notify_sendco;
  699. } else {
  700. sendco = &s->out_sendco;
  701. }
  702. if (!size) {
  703. return -1;
  704. }
  705. entry = g_slice_new(SendEntry);
  706. entry->size = size;
  707. entry->vnet_hdr_len = vnet_hdr_len;
  708. if (zero_copy) {
  709. entry->buf = buf;
  710. } else {
  711. entry->buf = g_malloc(size);
  712. memcpy(entry->buf, buf, size);
  713. }
  714. g_queue_push_tail(&sendco->send_list, entry);
  715. if (sendco->done) {
  716. sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
  717. sendco->done = false;
  718. qemu_coroutine_enter(sendco->co);
  719. if (sendco->done) {
  720. /* report early errors */
  721. return sendco->ret;
  722. }
  723. }
  724. /* assume success */
  725. return 0;
  726. }
  727. static int compare_chr_can_read(void *opaque)
  728. {
  729. return COMPARE_READ_LEN_MAX;
  730. }
  731. /*
  732. * Called from the main thread on the primary for packets
  733. * arriving over the socket from the primary.
  734. */
  735. static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
  736. {
  737. CompareState *s = COLO_COMPARE(opaque);
  738. int ret;
  739. ret = net_fill_rstate(&s->pri_rs, buf, size);
  740. if (ret == -1) {
  741. qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
  742. NULL, NULL, true);
  743. error_report("colo-compare primary_in error");
  744. }
  745. }
  746. /*
  747. * Called from the main thread on the primary for packets
  748. * arriving over the socket from the secondary.
  749. */
  750. static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
  751. {
  752. CompareState *s = COLO_COMPARE(opaque);
  753. int ret;
  754. ret = net_fill_rstate(&s->sec_rs, buf, size);
  755. if (ret == -1) {
  756. qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
  757. NULL, NULL, true);
  758. error_report("colo-compare secondary_in error");
  759. }
  760. }
  761. static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
  762. {
  763. CompareState *s = COLO_COMPARE(opaque);
  764. int ret;
  765. ret = net_fill_rstate(&s->notify_rs, buf, size);
  766. if (ret == -1) {
  767. qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
  768. NULL, NULL, true);
  769. error_report("colo-compare notify_dev error");
  770. }
  771. }
  772. /*
  773. * Check old packet regularly so it can watch for any packets
  774. * that the secondary hasn't produced equivalents of.
  775. */
  776. static void check_old_packet_regular(void *opaque)
  777. {
  778. CompareState *s = opaque;
  779. /* if have old packet we will notify checkpoint */
  780. colo_old_packet_check(s);
  781. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
  782. s->expired_scan_cycle);
  783. }
  784. /* Public API, Used for COLO frame to notify compare event */
  785. void colo_notify_compares_event(void *opaque, int event, Error **errp)
  786. {
  787. CompareState *s;
  788. qemu_mutex_lock(&colo_compare_mutex);
  789. if (!colo_compare_active) {
  790. qemu_mutex_unlock(&colo_compare_mutex);
  791. return;
  792. }
  793. qemu_mutex_lock(&event_mtx);
  794. QTAILQ_FOREACH(s, &net_compares, next) {
  795. s->event = event;
  796. qemu_bh_schedule(s->event_bh);
  797. event_unhandled_count++;
  798. }
  799. /* Wait all compare threads to finish handling this event */
  800. while (event_unhandled_count > 0) {
  801. qemu_cond_wait(&event_complete_cond, &event_mtx);
  802. }
  803. qemu_mutex_unlock(&event_mtx);
  804. qemu_mutex_unlock(&colo_compare_mutex);
  805. }
  806. static void colo_compare_timer_init(CompareState *s)
  807. {
  808. AioContext *ctx = iothread_get_aio_context(s->iothread);
  809. s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST,
  810. SCALE_MS, check_old_packet_regular,
  811. s);
  812. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
  813. s->expired_scan_cycle);
  814. }
  815. static void colo_compare_timer_del(CompareState *s)
  816. {
  817. if (s->packet_check_timer) {
  818. timer_free(s->packet_check_timer);
  819. s->packet_check_timer = NULL;
  820. }
  821. }
  822. static void colo_flush_packets(void *opaque, void *user_data);
  823. static void colo_compare_handle_event(void *opaque)
  824. {
  825. CompareState *s = opaque;
  826. switch (s->event) {
  827. case COLO_EVENT_CHECKPOINT:
  828. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  829. break;
  830. case COLO_EVENT_FAILOVER:
  831. break;
  832. default:
  833. break;
  834. }
  835. qemu_mutex_lock(&event_mtx);
  836. assert(event_unhandled_count > 0);
  837. event_unhandled_count--;
  838. qemu_cond_broadcast(&event_complete_cond);
  839. qemu_mutex_unlock(&event_mtx);
  840. }
  841. static void colo_compare_iothread(CompareState *s)
  842. {
  843. AioContext *ctx = iothread_get_aio_context(s->iothread);
  844. object_ref(OBJECT(s->iothread));
  845. s->worker_context = iothread_get_g_main_context(s->iothread);
  846. qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
  847. compare_pri_chr_in, NULL, NULL,
  848. s, s->worker_context, true);
  849. qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
  850. compare_sec_chr_in, NULL, NULL,
  851. s, s->worker_context, true);
  852. if (s->notify_dev) {
  853. qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
  854. compare_notify_chr, NULL, NULL,
  855. s, s->worker_context, true);
  856. }
  857. colo_compare_timer_init(s);
  858. s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
  859. }
  860. static char *compare_get_pri_indev(Object *obj, Error **errp)
  861. {
  862. CompareState *s = COLO_COMPARE(obj);
  863. return g_strdup(s->pri_indev);
  864. }
  865. static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
  866. {
  867. CompareState *s = COLO_COMPARE(obj);
  868. g_free(s->pri_indev);
  869. s->pri_indev = g_strdup(value);
  870. }
  871. static char *compare_get_sec_indev(Object *obj, Error **errp)
  872. {
  873. CompareState *s = COLO_COMPARE(obj);
  874. return g_strdup(s->sec_indev);
  875. }
  876. static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
  877. {
  878. CompareState *s = COLO_COMPARE(obj);
  879. g_free(s->sec_indev);
  880. s->sec_indev = g_strdup(value);
  881. }
  882. static char *compare_get_outdev(Object *obj, Error **errp)
  883. {
  884. CompareState *s = COLO_COMPARE(obj);
  885. return g_strdup(s->outdev);
  886. }
  887. static void compare_set_outdev(Object *obj, const char *value, Error **errp)
  888. {
  889. CompareState *s = COLO_COMPARE(obj);
  890. g_free(s->outdev);
  891. s->outdev = g_strdup(value);
  892. }
  893. static bool compare_get_vnet_hdr(Object *obj, Error **errp)
  894. {
  895. CompareState *s = COLO_COMPARE(obj);
  896. return s->vnet_hdr;
  897. }
  898. static void compare_set_vnet_hdr(Object *obj,
  899. bool value,
  900. Error **errp)
  901. {
  902. CompareState *s = COLO_COMPARE(obj);
  903. s->vnet_hdr = value;
  904. }
  905. static char *compare_get_notify_dev(Object *obj, Error **errp)
  906. {
  907. CompareState *s = COLO_COMPARE(obj);
  908. return g_strdup(s->notify_dev);
  909. }
  910. static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
  911. {
  912. CompareState *s = COLO_COMPARE(obj);
  913. g_free(s->notify_dev);
  914. s->notify_dev = g_strdup(value);
  915. }
  916. static void compare_get_timeout(Object *obj, Visitor *v,
  917. const char *name, void *opaque,
  918. Error **errp)
  919. {
  920. CompareState *s = COLO_COMPARE(obj);
  921. uint64_t value = s->compare_timeout;
  922. visit_type_uint64(v, name, &value, errp);
  923. }
  924. static void compare_set_timeout(Object *obj, Visitor *v,
  925. const char *name, void *opaque,
  926. Error **errp)
  927. {
  928. CompareState *s = COLO_COMPARE(obj);
  929. uint32_t value;
  930. if (!visit_type_uint32(v, name, &value, errp)) {
  931. return;
  932. }
  933. if (!value) {
  934. error_setg(errp, "Property '%s.%s' requires a positive value",
  935. object_get_typename(obj), name);
  936. return;
  937. }
  938. s->compare_timeout = value;
  939. }
  940. static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
  941. const char *name, void *opaque,
  942. Error **errp)
  943. {
  944. CompareState *s = COLO_COMPARE(obj);
  945. uint32_t value = s->expired_scan_cycle;
  946. visit_type_uint32(v, name, &value, errp);
  947. }
  948. static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
  949. const char *name, void *opaque,
  950. Error **errp)
  951. {
  952. CompareState *s = COLO_COMPARE(obj);
  953. uint32_t value;
  954. if (!visit_type_uint32(v, name, &value, errp)) {
  955. return;
  956. }
  957. if (!value) {
  958. error_setg(errp, "Property '%s.%s' requires a positive value",
  959. object_get_typename(obj), name);
  960. return;
  961. }
  962. s->expired_scan_cycle = value;
  963. }
  964. static void get_max_queue_size(Object *obj, Visitor *v,
  965. const char *name, void *opaque,
  966. Error **errp)
  967. {
  968. uint32_t value = max_queue_size;
  969. visit_type_uint32(v, name, &value, errp);
  970. }
  971. static void set_max_queue_size(Object *obj, Visitor *v,
  972. const char *name, void *opaque,
  973. Error **errp)
  974. {
  975. uint64_t value;
  976. if (!visit_type_uint64(v, name, &value, errp)) {
  977. return;
  978. }
  979. if (!value) {
  980. error_setg(errp, "Property '%s.%s' requires a positive value",
  981. object_get_typename(obj), name);
  982. return;
  983. }
  984. max_queue_size = value;
  985. }
  986. static void compare_pri_rs_finalize(SocketReadState *pri_rs)
  987. {
  988. CompareState *s = container_of(pri_rs, CompareState, pri_rs);
  989. Connection *conn = NULL;
  990. if (packet_enqueue(s, PRIMARY_IN, &conn)) {
  991. trace_colo_compare_main("primary: unsupported packet in");
  992. compare_chr_send(s,
  993. pri_rs->buf,
  994. pri_rs->packet_len,
  995. pri_rs->vnet_hdr_len,
  996. false,
  997. false);
  998. } else {
  999. /* compare packet in the specified connection */
  1000. colo_compare_connection(conn, s);
  1001. }
  1002. }
  1003. static void compare_sec_rs_finalize(SocketReadState *sec_rs)
  1004. {
  1005. CompareState *s = container_of(sec_rs, CompareState, sec_rs);
  1006. Connection *conn = NULL;
  1007. if (packet_enqueue(s, SECONDARY_IN, &conn)) {
  1008. trace_colo_compare_main("secondary: unsupported packet in");
  1009. } else {
  1010. /* compare packet in the specified connection */
  1011. colo_compare_connection(conn, s);
  1012. }
  1013. }
  1014. static void compare_notify_rs_finalize(SocketReadState *notify_rs)
  1015. {
  1016. CompareState *s = container_of(notify_rs, CompareState, notify_rs);
  1017. const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
  1018. int ret;
  1019. if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
  1020. notify_rs->buf,
  1021. notify_rs->packet_len)) {
  1022. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  1023. if (ret < 0) {
  1024. error_report("Notify Xen COLO-frame INIT failed");
  1025. }
  1026. } else if (packet_matches_str("COLO_CHECKPOINT",
  1027. notify_rs->buf,
  1028. notify_rs->packet_len)) {
  1029. /* colo-compare do checkpoint, flush pri packet and remove sec packet */
  1030. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1031. } else {
  1032. error_report("COLO compare got unsupported instruction");
  1033. }
  1034. }
  1035. /*
  1036. * Return 0 is success.
  1037. * Return 1 is failed.
  1038. */
  1039. static int find_and_check_chardev(Chardev **chr,
  1040. char *chr_name,
  1041. Error **errp)
  1042. {
  1043. *chr = qemu_chr_find(chr_name);
  1044. if (*chr == NULL) {
  1045. error_setg(errp, "Device '%s' not found",
  1046. chr_name);
  1047. return 1;
  1048. }
  1049. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
  1050. error_setg(errp, "chardev \"%s\" is not reconnectable",
  1051. chr_name);
  1052. return 1;
  1053. }
  1054. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
  1055. error_setg(errp, "chardev \"%s\" cannot switch context",
  1056. chr_name);
  1057. return 1;
  1058. }
  1059. return 0;
  1060. }
  1061. /*
  1062. * Called from the main thread on the primary
  1063. * to setup colo-compare.
  1064. */
  1065. static void colo_compare_complete(UserCreatable *uc, Error **errp)
  1066. {
  1067. CompareState *s = COLO_COMPARE(uc);
  1068. Chardev *chr;
  1069. if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
  1070. error_setg(errp, "colo compare needs 'primary_in' ,"
  1071. "'secondary_in','outdev','iothread' property set");
  1072. return;
  1073. } else if (!strcmp(s->pri_indev, s->outdev) ||
  1074. !strcmp(s->sec_indev, s->outdev) ||
  1075. !strcmp(s->pri_indev, s->sec_indev)) {
  1076. error_setg(errp, "'indev' and 'outdev' could not be same "
  1077. "for compare module");
  1078. return;
  1079. }
  1080. if (!s->compare_timeout) {
  1081. /* Set default value to 3000 MS */
  1082. s->compare_timeout = DEFAULT_TIME_OUT_MS;
  1083. }
  1084. if (!s->expired_scan_cycle) {
  1085. /* Set default value to 1000 MS */
  1086. s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
  1087. }
  1088. if (!max_queue_size) {
  1089. /* Set default queue size to 1024 */
  1090. max_queue_size = MAX_QUEUE_SIZE;
  1091. }
  1092. if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
  1093. !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
  1094. return;
  1095. }
  1096. if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
  1097. !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
  1098. return;
  1099. }
  1100. if (find_and_check_chardev(&chr, s->outdev, errp) ||
  1101. !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
  1102. return;
  1103. }
  1104. net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
  1105. net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
  1106. /* Try to enable remote notify chardev, currently just for Xen COLO */
  1107. if (s->notify_dev) {
  1108. if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
  1109. !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
  1110. return;
  1111. }
  1112. net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
  1113. s->vnet_hdr);
  1114. }
  1115. s->out_sendco.s = s;
  1116. s->out_sendco.chr = &s->chr_out;
  1117. s->out_sendco.notify_remote_frame = false;
  1118. s->out_sendco.done = true;
  1119. g_queue_init(&s->out_sendco.send_list);
  1120. if (s->notify_dev) {
  1121. s->notify_sendco.s = s;
  1122. s->notify_sendco.chr = &s->chr_notify_dev;
  1123. s->notify_sendco.notify_remote_frame = true;
  1124. s->notify_sendco.done = true;
  1125. g_queue_init(&s->notify_sendco.send_list);
  1126. }
  1127. g_queue_init(&s->conn_list);
  1128. s->connection_track_table = g_hash_table_new_full(connection_key_hash,
  1129. connection_key_equal,
  1130. g_free,
  1131. NULL);
  1132. colo_compare_iothread(s);
  1133. qemu_mutex_lock(&colo_compare_mutex);
  1134. if (!colo_compare_active) {
  1135. qemu_mutex_init(&event_mtx);
  1136. qemu_cond_init(&event_complete_cond);
  1137. colo_compare_active = true;
  1138. }
  1139. QTAILQ_INSERT_TAIL(&net_compares, s, next);
  1140. qemu_mutex_unlock(&colo_compare_mutex);
  1141. return;
  1142. }
  1143. static void colo_flush_packets(void *opaque, void *user_data)
  1144. {
  1145. CompareState *s = user_data;
  1146. Connection *conn = opaque;
  1147. Packet *pkt = NULL;
  1148. while (!g_queue_is_empty(&conn->primary_list)) {
  1149. pkt = g_queue_pop_tail(&conn->primary_list);
  1150. compare_chr_send(s,
  1151. pkt->data,
  1152. pkt->size,
  1153. pkt->vnet_hdr_len,
  1154. false,
  1155. true);
  1156. packet_destroy_partial(pkt, NULL);
  1157. }
  1158. while (!g_queue_is_empty(&conn->secondary_list)) {
  1159. pkt = g_queue_pop_tail(&conn->secondary_list);
  1160. packet_destroy(pkt, NULL);
  1161. }
  1162. }
  1163. static void colo_compare_class_init(ObjectClass *oc, void *data)
  1164. {
  1165. UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
  1166. ucc->complete = colo_compare_complete;
  1167. }
  1168. static void colo_compare_init(Object *obj)
  1169. {
  1170. CompareState *s = COLO_COMPARE(obj);
  1171. object_property_add_str(obj, "primary_in",
  1172. compare_get_pri_indev, compare_set_pri_indev);
  1173. object_property_add_str(obj, "secondary_in",
  1174. compare_get_sec_indev, compare_set_sec_indev);
  1175. object_property_add_str(obj, "outdev",
  1176. compare_get_outdev, compare_set_outdev);
  1177. object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
  1178. (Object **)&s->iothread,
  1179. object_property_allow_set_link,
  1180. OBJ_PROP_LINK_STRONG);
  1181. /* This parameter just for Xen COLO */
  1182. object_property_add_str(obj, "notify_dev",
  1183. compare_get_notify_dev, compare_set_notify_dev);
  1184. object_property_add(obj, "compare_timeout", "uint64",
  1185. compare_get_timeout,
  1186. compare_set_timeout, NULL, NULL);
  1187. object_property_add(obj, "expired_scan_cycle", "uint32",
  1188. compare_get_expired_scan_cycle,
  1189. compare_set_expired_scan_cycle, NULL, NULL);
  1190. object_property_add(obj, "max_queue_size", "uint32",
  1191. get_max_queue_size,
  1192. set_max_queue_size, NULL, NULL);
  1193. s->vnet_hdr = false;
  1194. object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
  1195. compare_set_vnet_hdr);
  1196. }
  1197. void colo_compare_cleanup(void)
  1198. {
  1199. CompareState *tmp = NULL;
  1200. CompareState *n = NULL;
  1201. QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) {
  1202. object_unparent(OBJECT(tmp));
  1203. }
  1204. }
  1205. static void colo_compare_finalize(Object *obj)
  1206. {
  1207. CompareState *s = COLO_COMPARE(obj);
  1208. CompareState *tmp = NULL;
  1209. qemu_mutex_lock(&colo_compare_mutex);
  1210. QTAILQ_FOREACH(tmp, &net_compares, next) {
  1211. if (tmp == s) {
  1212. QTAILQ_REMOVE(&net_compares, s, next);
  1213. break;
  1214. }
  1215. }
  1216. if (QTAILQ_EMPTY(&net_compares)) {
  1217. colo_compare_active = false;
  1218. qemu_mutex_destroy(&event_mtx);
  1219. qemu_cond_destroy(&event_complete_cond);
  1220. }
  1221. qemu_mutex_unlock(&colo_compare_mutex);
  1222. qemu_chr_fe_deinit(&s->chr_pri_in, false);
  1223. qemu_chr_fe_deinit(&s->chr_sec_in, false);
  1224. qemu_chr_fe_deinit(&s->chr_out, false);
  1225. if (s->notify_dev) {
  1226. qemu_chr_fe_deinit(&s->chr_notify_dev, false);
  1227. }
  1228. colo_compare_timer_del(s);
  1229. qemu_bh_delete(s->event_bh);
  1230. AioContext *ctx = iothread_get_aio_context(s->iothread);
  1231. AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
  1232. if (s->notify_dev) {
  1233. AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
  1234. }
  1235. /* Release all unhandled packets after compare thead exited */
  1236. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1237. AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
  1238. g_queue_clear(&s->conn_list);
  1239. g_queue_clear(&s->out_sendco.send_list);
  1240. if (s->notify_dev) {
  1241. g_queue_clear(&s->notify_sendco.send_list);
  1242. }
  1243. if (s->connection_track_table) {
  1244. g_hash_table_destroy(s->connection_track_table);
  1245. }
  1246. object_unref(OBJECT(s->iothread));
  1247. g_free(s->pri_indev);
  1248. g_free(s->sec_indev);
  1249. g_free(s->outdev);
  1250. g_free(s->notify_dev);
  1251. }
  1252. static void __attribute__((__constructor__)) colo_compare_init_globals(void)
  1253. {
  1254. colo_compare_active = false;
  1255. qemu_mutex_init(&colo_compare_mutex);
  1256. }
  1257. static const TypeInfo colo_compare_info = {
  1258. .name = TYPE_COLO_COMPARE,
  1259. .parent = TYPE_OBJECT,
  1260. .instance_size = sizeof(CompareState),
  1261. .instance_init = colo_compare_init,
  1262. .instance_finalize = colo_compare_finalize,
  1263. .class_size = sizeof(CompareClass),
  1264. .class_init = colo_compare_class_init,
  1265. .interfaces = (InterfaceInfo[]) {
  1266. { TYPE_USER_CREATABLE },
  1267. { }
  1268. }
  1269. };
  1270. static void register_types(void)
  1271. {
  1272. type_register_static(&colo_compare_info);
  1273. }
  1274. type_init(register_types);