colo-compare.c 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496
  1. /*
  2. * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
  3. * (a.k.a. Fault Tolerance or Continuous Replication)
  4. *
  5. * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
  6. * Copyright (c) 2016 FUJITSU LIMITED
  7. * Copyright (c) 2016 Intel Corporation
  8. *
  9. * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10. *
  11. * This work is licensed under the terms of the GNU GPL, version 2 or
  12. * later. See the COPYING file in the top-level directory.
  13. */
  14. #include "qemu/osdep.h"
  15. #include "qemu/error-report.h"
  16. #include "trace.h"
  17. #include "qapi/error.h"
  18. #include "net/net.h"
  19. #include "net/eth.h"
  20. #include "qom/object_interfaces.h"
  21. #include "qemu/iov.h"
  22. #include "qom/object.h"
  23. #include "net/queue.h"
  24. #include "chardev/char-fe.h"
  25. #include "qemu/sockets.h"
  26. #include "colo.h"
  27. #include "sysemu/iothread.h"
  28. #include "net/colo-compare.h"
  29. #include "migration/colo.h"
  30. #include "migration/migration.h"
  31. #include "util.h"
  32. #include "block/aio-wait.h"
  33. #include "qemu/coroutine.h"
  34. #define TYPE_COLO_COMPARE "colo-compare"
  35. typedef struct CompareState CompareState;
  36. DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
  37. TYPE_COLO_COMPARE)
  38. static QTAILQ_HEAD(, CompareState) net_compares =
  39. QTAILQ_HEAD_INITIALIZER(net_compares);
  40. static NotifierList colo_compare_notifiers =
  41. NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  42. #define COMPARE_READ_LEN_MAX NET_BUFSIZE
  43. #define MAX_QUEUE_SIZE 1024
  44. #define COLO_COMPARE_FREE_PRIMARY 0x01
  45. #define COLO_COMPARE_FREE_SECONDARY 0x02
  46. #define REGULAR_PACKET_CHECK_MS 1000
  47. #define DEFAULT_TIME_OUT_MS 3000
  48. /* #define DEBUG_COLO_PACKETS */
  49. static QemuMutex colo_compare_mutex;
  50. static bool colo_compare_active;
  51. static QemuMutex event_mtx;
  52. static QemuCond event_complete_cond;
  53. static int event_unhandled_count;
  54. static uint32_t max_queue_size;
  55. /*
  56. * + CompareState ++
  57. * | |
  58. * +---------------+ +---------------+ +---------------+
  59. * | conn list + - > conn + ------- > conn + -- > ......
  60. * +---------------+ +---------------+ +---------------+
  61. * | | | | | |
  62. * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
  63. * |primary | |secondary |primary | |secondary
  64. * |packet | |packet + |packet | |packet +
  65. * +--------+ +--------+ +--------+ +--------+
  66. * | | | |
  67. * +---v----+ +---v----+ +---v----+ +---v----+
  68. * |primary | |secondary |primary | |secondary
  69. * |packet | |packet + |packet | |packet +
  70. * +--------+ +--------+ +--------+ +--------+
  71. * | | | |
  72. * +---v----+ +---v----+ +---v----+ +---v----+
  73. * |primary | |secondary |primary | |secondary
  74. * |packet | |packet + |packet | |packet +
  75. * +--------+ +--------+ +--------+ +--------+
  76. */
  77. typedef struct SendCo {
  78. Coroutine *co;
  79. struct CompareState *s;
  80. CharBackend *chr;
  81. GQueue send_list;
  82. bool notify_remote_frame;
  83. bool done;
  84. int ret;
  85. } SendCo;
  86. typedef struct SendEntry {
  87. uint32_t size;
  88. uint32_t vnet_hdr_len;
  89. uint8_t *buf;
  90. } SendEntry;
  91. struct CompareState {
  92. Object parent;
  93. char *pri_indev;
  94. char *sec_indev;
  95. char *outdev;
  96. char *notify_dev;
  97. CharBackend chr_pri_in;
  98. CharBackend chr_sec_in;
  99. CharBackend chr_out;
  100. CharBackend chr_notify_dev;
  101. SocketReadState pri_rs;
  102. SocketReadState sec_rs;
  103. SocketReadState notify_rs;
  104. SendCo out_sendco;
  105. SendCo notify_sendco;
  106. bool vnet_hdr;
  107. uint64_t compare_timeout;
  108. uint32_t expired_scan_cycle;
  109. /*
  110. * Record the connection that through the NIC
  111. * Element type: Connection
  112. */
  113. GQueue conn_list;
  114. /* Record the connection without repetition */
  115. GHashTable *connection_track_table;
  116. IOThread *iothread;
  117. GMainContext *worker_context;
  118. QEMUTimer *packet_check_timer;
  119. QEMUBH *event_bh;
  120. enum colo_event event;
  121. QTAILQ_ENTRY(CompareState) next;
  122. };
  123. typedef struct CompareClass {
  124. ObjectClass parent_class;
  125. } CompareClass;
  126. enum {
  127. PRIMARY_IN = 0,
  128. SECONDARY_IN,
  129. };
  130. static const char *colo_mode[] = {
  131. [PRIMARY_IN] = "primary",
  132. [SECONDARY_IN] = "secondary",
  133. };
  134. static int compare_chr_send(CompareState *s,
  135. uint8_t *buf,
  136. uint32_t size,
  137. uint32_t vnet_hdr_len,
  138. bool notify_remote_frame,
  139. bool zero_copy);
  140. static bool packet_matches_str(const char *str,
  141. const uint8_t *buf,
  142. uint32_t packet_len)
  143. {
  144. if (packet_len != strlen(str)) {
  145. return false;
  146. }
  147. return !memcmp(str, buf, packet_len);
  148. }
  149. static void notify_remote_frame(CompareState *s)
  150. {
  151. char msg[] = "DO_CHECKPOINT";
  152. int ret = 0;
  153. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  154. if (ret < 0) {
  155. error_report("Notify Xen COLO-frame failed");
  156. }
  157. }
  158. static void colo_compare_inconsistency_notify(CompareState *s)
  159. {
  160. if (s->notify_dev) {
  161. notify_remote_frame(s);
  162. } else {
  163. notifier_list_notify(&colo_compare_notifiers,
  164. migrate_get_current());
  165. }
  166. }
  167. /* Use restricted to colo_insert_packet() */
  168. static gint seq_sorter(Packet *a, Packet *b, gpointer data)
  169. {
  170. return b->tcp_seq - a->tcp_seq;
  171. }
  172. static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
  173. {
  174. Packet *pkt = data;
  175. struct tcp_hdr *tcphd;
  176. tcphd = (struct tcp_hdr *)pkt->transport_header;
  177. pkt->tcp_seq = ntohl(tcphd->th_seq);
  178. pkt->tcp_ack = ntohl(tcphd->th_ack);
  179. /* Need to consider ACK will bigger than uint32_t MAX */
  180. *max_ack = pkt->tcp_ack - *max_ack > 0 ? pkt->tcp_ack : *max_ack;
  181. pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
  182. + (tcphd->th_off << 2);
  183. pkt->payload_size = pkt->size - pkt->header_size;
  184. pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
  185. pkt->flags = tcphd->th_flags;
  186. }
  187. /*
  188. * Return 1 on success, if return 0 means the
  189. * packet will be dropped
  190. */
  191. static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
  192. {
  193. if (g_queue_get_length(queue) <= max_queue_size) {
  194. if (pkt->ip->ip_p == IPPROTO_TCP) {
  195. fill_pkt_tcp_info(pkt, max_ack);
  196. g_queue_insert_sorted(queue,
  197. pkt,
  198. (GCompareDataFunc)seq_sorter,
  199. NULL);
  200. } else {
  201. g_queue_push_tail(queue, pkt);
  202. }
  203. return 1;
  204. }
  205. return 0;
  206. }
  207. /*
  208. * Return 0 on success, if return -1 means the pkt
  209. * is unsupported(arp and ipv6) and will be sent later
  210. */
  211. static int packet_enqueue(CompareState *s, int mode, Connection **con)
  212. {
  213. ConnectionKey key;
  214. Packet *pkt = NULL;
  215. Connection *conn;
  216. int ret;
  217. if (mode == PRIMARY_IN) {
  218. pkt = packet_new(s->pri_rs.buf,
  219. s->pri_rs.packet_len,
  220. s->pri_rs.vnet_hdr_len);
  221. } else {
  222. pkt = packet_new(s->sec_rs.buf,
  223. s->sec_rs.packet_len,
  224. s->sec_rs.vnet_hdr_len);
  225. }
  226. if (parse_packet_early(pkt)) {
  227. packet_destroy(pkt, NULL);
  228. pkt = NULL;
  229. return -1;
  230. }
  231. fill_connection_key(pkt, &key, false);
  232. conn = connection_get(s->connection_track_table,
  233. &key,
  234. &s->conn_list);
  235. if (!conn->processing) {
  236. g_queue_push_tail(&s->conn_list, conn);
  237. conn->processing = true;
  238. }
  239. if (mode == PRIMARY_IN) {
  240. ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
  241. } else {
  242. ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
  243. }
  244. if (!ret) {
  245. trace_colo_compare_drop_packet(colo_mode[mode],
  246. "queue size too big, drop packet");
  247. packet_destroy(pkt, NULL);
  248. pkt = NULL;
  249. }
  250. *con = conn;
  251. return 0;
  252. }
  253. static inline bool after(uint32_t seq1, uint32_t seq2)
  254. {
  255. return (int32_t)(seq1 - seq2) > 0;
  256. }
  257. static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
  258. {
  259. int ret;
  260. ret = compare_chr_send(s,
  261. pkt->data,
  262. pkt->size,
  263. pkt->vnet_hdr_len,
  264. false,
  265. true);
  266. if (ret < 0) {
  267. error_report("colo send primary packet failed");
  268. }
  269. trace_colo_compare_main("packet same and release packet");
  270. packet_destroy_partial(pkt, NULL);
  271. }
  272. /*
  273. * The IP packets sent by primary and secondary
  274. * will be compared in here
  275. * TODO support ip fragment, Out-Of-Order
  276. * return: 0 means packet same
  277. * > 0 || < 0 means packet different
  278. */
  279. static int colo_compare_packet_payload(Packet *ppkt,
  280. Packet *spkt,
  281. uint16_t poffset,
  282. uint16_t soffset,
  283. uint16_t len)
  284. {
  285. if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
  286. char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
  287. strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
  288. strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
  289. strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
  290. strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
  291. trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
  292. pri_ip_dst, spkt->size,
  293. sec_ip_src, sec_ip_dst);
  294. }
  295. return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
  296. }
  297. /*
  298. * return true means that the payload is consist and
  299. * need to make the next comparison, false means do
  300. * the checkpoint
  301. */
  302. static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
  303. int8_t *mark, uint32_t max_ack)
  304. {
  305. *mark = 0;
  306. if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
  307. if (!colo_compare_packet_payload(ppkt, spkt,
  308. ppkt->header_size, spkt->header_size,
  309. ppkt->payload_size)) {
  310. *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
  311. return true;
  312. }
  313. }
  314. /* one part of secondary packet payload still need to be compared */
  315. if (!after(ppkt->seq_end, spkt->seq_end)) {
  316. if (!colo_compare_packet_payload(ppkt, spkt,
  317. ppkt->header_size + ppkt->offset,
  318. spkt->header_size + spkt->offset,
  319. ppkt->payload_size - ppkt->offset)) {
  320. if (!after(ppkt->tcp_ack, max_ack)) {
  321. *mark = COLO_COMPARE_FREE_PRIMARY;
  322. spkt->offset += ppkt->payload_size - ppkt->offset;
  323. return true;
  324. } else {
  325. /* secondary guest hasn't ack the data, don't send
  326. * out this packet
  327. */
  328. return false;
  329. }
  330. }
  331. } else {
  332. /* primary packet is longer than secondary packet, compare
  333. * the same part and mark the primary packet offset
  334. */
  335. if (!colo_compare_packet_payload(ppkt, spkt,
  336. ppkt->header_size + ppkt->offset,
  337. spkt->header_size + spkt->offset,
  338. spkt->payload_size - spkt->offset)) {
  339. *mark = COLO_COMPARE_FREE_SECONDARY;
  340. ppkt->offset += spkt->payload_size - spkt->offset;
  341. return true;
  342. }
  343. }
  344. return false;
  345. }
  346. static void colo_compare_tcp(CompareState *s, Connection *conn)
  347. {
  348. Packet *ppkt = NULL, *spkt = NULL;
  349. int8_t mark;
  350. /*
  351. * If ppkt and spkt have the same payload, but ppkt's ACK
  352. * is greater than spkt's ACK, in this case we can not
  353. * send the ppkt because it will cause the secondary guest
  354. * to miss sending some data in the next. Therefore, we
  355. * record the maximum ACK in the current queue at both
  356. * primary side and secondary side. Only when the ack is
  357. * less than the smaller of the two maximum ack, then we
  358. * can ensure that the packet's payload is acknowledged by
  359. * primary and secondary.
  360. */
  361. uint32_t min_ack = conn->pack - conn->sack > 0 ?
  362. conn->sack : conn->pack;
  363. pri:
  364. if (g_queue_is_empty(&conn->primary_list)) {
  365. return;
  366. }
  367. ppkt = g_queue_pop_tail(&conn->primary_list);
  368. sec:
  369. if (g_queue_is_empty(&conn->secondary_list)) {
  370. g_queue_push_tail(&conn->primary_list, ppkt);
  371. return;
  372. }
  373. spkt = g_queue_pop_tail(&conn->secondary_list);
  374. if (ppkt->tcp_seq == ppkt->seq_end) {
  375. colo_release_primary_pkt(s, ppkt);
  376. ppkt = NULL;
  377. }
  378. if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
  379. trace_colo_compare_main("pri: this packet has compared");
  380. colo_release_primary_pkt(s, ppkt);
  381. ppkt = NULL;
  382. }
  383. if (spkt->tcp_seq == spkt->seq_end) {
  384. packet_destroy(spkt, NULL);
  385. if (!ppkt) {
  386. goto pri;
  387. } else {
  388. goto sec;
  389. }
  390. } else {
  391. if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
  392. trace_colo_compare_main("sec: this packet has compared");
  393. packet_destroy(spkt, NULL);
  394. if (!ppkt) {
  395. goto pri;
  396. } else {
  397. goto sec;
  398. }
  399. }
  400. if (!ppkt) {
  401. g_queue_push_tail(&conn->secondary_list, spkt);
  402. goto pri;
  403. }
  404. }
  405. if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
  406. trace_colo_compare_tcp_info("pri",
  407. ppkt->tcp_seq, ppkt->tcp_ack,
  408. ppkt->header_size, ppkt->payload_size,
  409. ppkt->offset, ppkt->flags);
  410. trace_colo_compare_tcp_info("sec",
  411. spkt->tcp_seq, spkt->tcp_ack,
  412. spkt->header_size, spkt->payload_size,
  413. spkt->offset, spkt->flags);
  414. if (mark == COLO_COMPARE_FREE_PRIMARY) {
  415. conn->compare_seq = ppkt->seq_end;
  416. colo_release_primary_pkt(s, ppkt);
  417. g_queue_push_tail(&conn->secondary_list, spkt);
  418. goto pri;
  419. } else if (mark == COLO_COMPARE_FREE_SECONDARY) {
  420. conn->compare_seq = spkt->seq_end;
  421. packet_destroy(spkt, NULL);
  422. goto sec;
  423. } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
  424. conn->compare_seq = ppkt->seq_end;
  425. colo_release_primary_pkt(s, ppkt);
  426. packet_destroy(spkt, NULL);
  427. goto pri;
  428. }
  429. } else {
  430. g_queue_push_tail(&conn->primary_list, ppkt);
  431. g_queue_push_tail(&conn->secondary_list, spkt);
  432. #ifdef DEBUG_COLO_PACKETS
  433. qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
  434. qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
  435. #endif
  436. colo_compare_inconsistency_notify(s);
  437. }
  438. }
  439. /*
  440. * Called from the compare thread on the primary
  441. * for compare udp packet
  442. */
  443. static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
  444. {
  445. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  446. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  447. trace_colo_compare_main("compare udp");
  448. /*
  449. * Because of ppkt and spkt are both in the same connection,
  450. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  451. * same with spkt. In addition, IP header's Identification is a random
  452. * field, we can handle it in IP fragmentation function later.
  453. * COLO just concern the response net packet payload from primary guest
  454. * and secondary guest are same or not, So we ignored all IP header include
  455. * other field like TOS,TTL,IP Checksum. we only need to compare
  456. * the ip payload here.
  457. */
  458. if (ppkt->size != spkt->size) {
  459. trace_colo_compare_main("UDP: payload size of packets are different");
  460. return -1;
  461. }
  462. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  463. ppkt->size - offset)) {
  464. trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
  465. trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
  466. #ifdef DEBUG_COLO_PACKETS
  467. qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
  468. qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
  469. #endif
  470. return -1;
  471. } else {
  472. return 0;
  473. }
  474. }
  475. /*
  476. * Called from the compare thread on the primary
  477. * for compare icmp packet
  478. */
  479. static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
  480. {
  481. uint16_t network_header_length = ppkt->ip->ip_hl << 2;
  482. uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
  483. trace_colo_compare_main("compare icmp");
  484. /*
  485. * Because of ppkt and spkt are both in the same connection,
  486. * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
  487. * same with spkt. In addition, IP header's Identification is a random
  488. * field, we can handle it in IP fragmentation function later.
  489. * COLO just concern the response net packet payload from primary guest
  490. * and secondary guest are same or not, So we ignored all IP header include
  491. * other field like TOS,TTL,IP Checksum. we only need to compare
  492. * the ip payload here.
  493. */
  494. if (ppkt->size != spkt->size) {
  495. trace_colo_compare_main("ICMP: payload size of packets are different");
  496. return -1;
  497. }
  498. if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
  499. ppkt->size - offset)) {
  500. trace_colo_compare_icmp_miscompare("primary pkt size",
  501. ppkt->size);
  502. trace_colo_compare_icmp_miscompare("Secondary pkt size",
  503. spkt->size);
  504. #ifdef DEBUG_COLO_PACKETS
  505. qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
  506. qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
  507. #endif
  508. return -1;
  509. } else {
  510. return 0;
  511. }
  512. }
  513. /*
  514. * Called from the compare thread on the primary
  515. * for compare other packet
  516. */
  517. static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
  518. {
  519. uint16_t offset = ppkt->vnet_hdr_len;
  520. trace_colo_compare_main("compare other");
  521. if (ppkt->size != spkt->size) {
  522. trace_colo_compare_main("Other: payload size of packets are different");
  523. return -1;
  524. }
  525. return colo_compare_packet_payload(ppkt, spkt, offset, offset,
  526. ppkt->size - offset);
  527. }
  528. static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
  529. {
  530. int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  531. if ((now - pkt->creation_ms) > (*check_time)) {
  532. trace_colo_old_packet_check_found(pkt->creation_ms);
  533. return 0;
  534. } else {
  535. return 1;
  536. }
  537. }
  538. void colo_compare_register_notifier(Notifier *notify)
  539. {
  540. notifier_list_add(&colo_compare_notifiers, notify);
  541. }
  542. void colo_compare_unregister_notifier(Notifier *notify)
  543. {
  544. notifier_remove(notify);
  545. }
  546. static int colo_old_packet_check_one_conn(Connection *conn,
  547. CompareState *s)
  548. {
  549. if (!g_queue_is_empty(&conn->primary_list)) {
  550. if (g_queue_find_custom(&conn->primary_list,
  551. &s->compare_timeout,
  552. (GCompareFunc)colo_old_packet_check_one))
  553. goto out;
  554. }
  555. if (!g_queue_is_empty(&conn->secondary_list)) {
  556. if (g_queue_find_custom(&conn->secondary_list,
  557. &s->compare_timeout,
  558. (GCompareFunc)colo_old_packet_check_one))
  559. goto out;
  560. }
  561. return 1;
  562. out:
  563. /* Do checkpoint will flush old packet */
  564. colo_compare_inconsistency_notify(s);
  565. return 0;
  566. }
  567. /*
  568. * Look for old packets that the secondary hasn't matched,
  569. * if we have some then we have to checkpoint to wake
  570. * the secondary up.
  571. */
  572. static void colo_old_packet_check(void *opaque)
  573. {
  574. CompareState *s = opaque;
  575. /*
  576. * If we find one old packet, stop finding job and notify
  577. * COLO frame do checkpoint.
  578. */
  579. g_queue_find_custom(&s->conn_list, s,
  580. (GCompareFunc)colo_old_packet_check_one_conn);
  581. }
  582. static void colo_compare_packet(CompareState *s, Connection *conn,
  583. int (*HandlePacket)(Packet *spkt,
  584. Packet *ppkt))
  585. {
  586. Packet *pkt = NULL;
  587. GList *result = NULL;
  588. while (!g_queue_is_empty(&conn->primary_list) &&
  589. !g_queue_is_empty(&conn->secondary_list)) {
  590. pkt = g_queue_pop_tail(&conn->primary_list);
  591. result = g_queue_find_custom(&conn->secondary_list,
  592. pkt, (GCompareFunc)HandlePacket);
  593. if (result) {
  594. colo_release_primary_pkt(s, pkt);
  595. packet_destroy(result->data, NULL);
  596. g_queue_delete_link(&conn->secondary_list, result);
  597. } else {
  598. /*
  599. * If one packet arrive late, the secondary_list or
  600. * primary_list will be empty, so we can't compare it
  601. * until next comparison. If the packets in the list are
  602. * timeout, it will trigger a checkpoint request.
  603. */
  604. trace_colo_compare_main("packet different");
  605. g_queue_push_tail(&conn->primary_list, pkt);
  606. colo_compare_inconsistency_notify(s);
  607. break;
  608. }
  609. }
  610. }
  611. /*
  612. * Called from the compare thread on the primary
  613. * for compare packet with secondary list of the
  614. * specified connection when a new packet was
  615. * queued to it.
  616. */
  617. static void colo_compare_connection(void *opaque, void *user_data)
  618. {
  619. CompareState *s = user_data;
  620. Connection *conn = opaque;
  621. switch (conn->ip_proto) {
  622. case IPPROTO_TCP:
  623. colo_compare_tcp(s, conn);
  624. break;
  625. case IPPROTO_UDP:
  626. colo_compare_packet(s, conn, colo_packet_compare_udp);
  627. break;
  628. case IPPROTO_ICMP:
  629. colo_compare_packet(s, conn, colo_packet_compare_icmp);
  630. break;
  631. default:
  632. colo_compare_packet(s, conn, colo_packet_compare_other);
  633. break;
  634. }
  635. }
  636. static void coroutine_fn _compare_chr_send(void *opaque)
  637. {
  638. SendCo *sendco = opaque;
  639. CompareState *s = sendco->s;
  640. int ret = 0;
  641. while (!g_queue_is_empty(&sendco->send_list)) {
  642. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  643. uint32_t len = htonl(entry->size);
  644. ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
  645. if (ret != sizeof(len)) {
  646. g_free(entry->buf);
  647. g_slice_free(SendEntry, entry);
  648. goto err;
  649. }
  650. if (!sendco->notify_remote_frame && s->vnet_hdr) {
  651. /*
  652. * We send vnet header len make other module(like filter-redirector)
  653. * know how to parse net packet correctly.
  654. */
  655. len = htonl(entry->vnet_hdr_len);
  656. ret = qemu_chr_fe_write_all(sendco->chr,
  657. (uint8_t *)&len,
  658. sizeof(len));
  659. if (ret != sizeof(len)) {
  660. g_free(entry->buf);
  661. g_slice_free(SendEntry, entry);
  662. goto err;
  663. }
  664. }
  665. ret = qemu_chr_fe_write_all(sendco->chr,
  666. (uint8_t *)entry->buf,
  667. entry->size);
  668. if (ret != entry->size) {
  669. g_free(entry->buf);
  670. g_slice_free(SendEntry, entry);
  671. goto err;
  672. }
  673. g_free(entry->buf);
  674. g_slice_free(SendEntry, entry);
  675. }
  676. sendco->ret = 0;
  677. goto out;
  678. err:
  679. while (!g_queue_is_empty(&sendco->send_list)) {
  680. SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
  681. g_free(entry->buf);
  682. g_slice_free(SendEntry, entry);
  683. }
  684. sendco->ret = ret < 0 ? ret : -EIO;
  685. out:
  686. sendco->co = NULL;
  687. sendco->done = true;
  688. aio_wait_kick();
  689. }
  690. static int compare_chr_send(CompareState *s,
  691. uint8_t *buf,
  692. uint32_t size,
  693. uint32_t vnet_hdr_len,
  694. bool notify_remote_frame,
  695. bool zero_copy)
  696. {
  697. SendCo *sendco;
  698. SendEntry *entry;
  699. if (notify_remote_frame) {
  700. sendco = &s->notify_sendco;
  701. } else {
  702. sendco = &s->out_sendco;
  703. }
  704. if (!size) {
  705. return -1;
  706. }
  707. entry = g_slice_new(SendEntry);
  708. entry->size = size;
  709. entry->vnet_hdr_len = vnet_hdr_len;
  710. if (zero_copy) {
  711. entry->buf = buf;
  712. } else {
  713. entry->buf = g_malloc(size);
  714. memcpy(entry->buf, buf, size);
  715. }
  716. g_queue_push_tail(&sendco->send_list, entry);
  717. if (sendco->done) {
  718. sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
  719. sendco->done = false;
  720. qemu_coroutine_enter(sendco->co);
  721. if (sendco->done) {
  722. /* report early errors */
  723. return sendco->ret;
  724. }
  725. }
  726. /* assume success */
  727. return 0;
  728. }
  729. static int compare_chr_can_read(void *opaque)
  730. {
  731. return COMPARE_READ_LEN_MAX;
  732. }
  733. /*
  734. * Called from the main thread on the primary for packets
  735. * arriving over the socket from the primary.
  736. */
  737. static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
  738. {
  739. CompareState *s = COLO_COMPARE(opaque);
  740. int ret;
  741. ret = net_fill_rstate(&s->pri_rs, buf, size);
  742. if (ret == -1) {
  743. qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
  744. NULL, NULL, true);
  745. error_report("colo-compare primary_in error");
  746. }
  747. }
  748. /*
  749. * Called from the main thread on the primary for packets
  750. * arriving over the socket from the secondary.
  751. */
  752. static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
  753. {
  754. CompareState *s = COLO_COMPARE(opaque);
  755. int ret;
  756. ret = net_fill_rstate(&s->sec_rs, buf, size);
  757. if (ret == -1) {
  758. qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
  759. NULL, NULL, true);
  760. error_report("colo-compare secondary_in error");
  761. }
  762. }
  763. static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
  764. {
  765. CompareState *s = COLO_COMPARE(opaque);
  766. int ret;
  767. ret = net_fill_rstate(&s->notify_rs, buf, size);
  768. if (ret == -1) {
  769. qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
  770. NULL, NULL, true);
  771. error_report("colo-compare notify_dev error");
  772. }
  773. }
  774. /*
  775. * Check old packet regularly so it can watch for any packets
  776. * that the secondary hasn't produced equivalents of.
  777. */
  778. static void check_old_packet_regular(void *opaque)
  779. {
  780. CompareState *s = opaque;
  781. /* if have old packet we will notify checkpoint */
  782. colo_old_packet_check(s);
  783. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
  784. s->expired_scan_cycle);
  785. }
  786. /* Public API, Used for COLO frame to notify compare event */
  787. void colo_notify_compares_event(void *opaque, int event, Error **errp)
  788. {
  789. CompareState *s;
  790. qemu_mutex_lock(&colo_compare_mutex);
  791. if (!colo_compare_active) {
  792. qemu_mutex_unlock(&colo_compare_mutex);
  793. return;
  794. }
  795. qemu_mutex_lock(&event_mtx);
  796. QTAILQ_FOREACH(s, &net_compares, next) {
  797. s->event = event;
  798. qemu_bh_schedule(s->event_bh);
  799. event_unhandled_count++;
  800. }
  801. /* Wait all compare threads to finish handling this event */
  802. while (event_unhandled_count > 0) {
  803. qemu_cond_wait(&event_complete_cond, &event_mtx);
  804. }
  805. qemu_mutex_unlock(&event_mtx);
  806. qemu_mutex_unlock(&colo_compare_mutex);
  807. }
  808. static void colo_compare_timer_init(CompareState *s)
  809. {
  810. AioContext *ctx = iothread_get_aio_context(s->iothread);
  811. s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST,
  812. SCALE_MS, check_old_packet_regular,
  813. s);
  814. timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
  815. s->expired_scan_cycle);
  816. }
  817. static void colo_compare_timer_del(CompareState *s)
  818. {
  819. if (s->packet_check_timer) {
  820. timer_free(s->packet_check_timer);
  821. s->packet_check_timer = NULL;
  822. }
  823. }
  824. static void colo_flush_packets(void *opaque, void *user_data);
  825. static void colo_compare_handle_event(void *opaque)
  826. {
  827. CompareState *s = opaque;
  828. switch (s->event) {
  829. case COLO_EVENT_CHECKPOINT:
  830. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  831. break;
  832. case COLO_EVENT_FAILOVER:
  833. break;
  834. default:
  835. break;
  836. }
  837. qemu_mutex_lock(&event_mtx);
  838. assert(event_unhandled_count > 0);
  839. event_unhandled_count--;
  840. qemu_cond_broadcast(&event_complete_cond);
  841. qemu_mutex_unlock(&event_mtx);
  842. }
  843. static void colo_compare_iothread(CompareState *s)
  844. {
  845. AioContext *ctx = iothread_get_aio_context(s->iothread);
  846. object_ref(OBJECT(s->iothread));
  847. s->worker_context = iothread_get_g_main_context(s->iothread);
  848. qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
  849. compare_pri_chr_in, NULL, NULL,
  850. s, s->worker_context, true);
  851. qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
  852. compare_sec_chr_in, NULL, NULL,
  853. s, s->worker_context, true);
  854. if (s->notify_dev) {
  855. qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
  856. compare_notify_chr, NULL, NULL,
  857. s, s->worker_context, true);
  858. }
  859. colo_compare_timer_init(s);
  860. s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
  861. }
  862. static char *compare_get_pri_indev(Object *obj, Error **errp)
  863. {
  864. CompareState *s = COLO_COMPARE(obj);
  865. return g_strdup(s->pri_indev);
  866. }
  867. static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
  868. {
  869. CompareState *s = COLO_COMPARE(obj);
  870. g_free(s->pri_indev);
  871. s->pri_indev = g_strdup(value);
  872. }
  873. static char *compare_get_sec_indev(Object *obj, Error **errp)
  874. {
  875. CompareState *s = COLO_COMPARE(obj);
  876. return g_strdup(s->sec_indev);
  877. }
  878. static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
  879. {
  880. CompareState *s = COLO_COMPARE(obj);
  881. g_free(s->sec_indev);
  882. s->sec_indev = g_strdup(value);
  883. }
  884. static char *compare_get_outdev(Object *obj, Error **errp)
  885. {
  886. CompareState *s = COLO_COMPARE(obj);
  887. return g_strdup(s->outdev);
  888. }
  889. static void compare_set_outdev(Object *obj, const char *value, Error **errp)
  890. {
  891. CompareState *s = COLO_COMPARE(obj);
  892. g_free(s->outdev);
  893. s->outdev = g_strdup(value);
  894. }
  895. static bool compare_get_vnet_hdr(Object *obj, Error **errp)
  896. {
  897. CompareState *s = COLO_COMPARE(obj);
  898. return s->vnet_hdr;
  899. }
  900. static void compare_set_vnet_hdr(Object *obj,
  901. bool value,
  902. Error **errp)
  903. {
  904. CompareState *s = COLO_COMPARE(obj);
  905. s->vnet_hdr = value;
  906. }
  907. static char *compare_get_notify_dev(Object *obj, Error **errp)
  908. {
  909. CompareState *s = COLO_COMPARE(obj);
  910. return g_strdup(s->notify_dev);
  911. }
  912. static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
  913. {
  914. CompareState *s = COLO_COMPARE(obj);
  915. g_free(s->notify_dev);
  916. s->notify_dev = g_strdup(value);
  917. }
  918. static void compare_get_timeout(Object *obj, Visitor *v,
  919. const char *name, void *opaque,
  920. Error **errp)
  921. {
  922. CompareState *s = COLO_COMPARE(obj);
  923. uint64_t value = s->compare_timeout;
  924. visit_type_uint64(v, name, &value, errp);
  925. }
  926. static void compare_set_timeout(Object *obj, Visitor *v,
  927. const char *name, void *opaque,
  928. Error **errp)
  929. {
  930. CompareState *s = COLO_COMPARE(obj);
  931. uint32_t value;
  932. if (!visit_type_uint32(v, name, &value, errp)) {
  933. return;
  934. }
  935. if (!value) {
  936. error_setg(errp, "Property '%s.%s' requires a positive value",
  937. object_get_typename(obj), name);
  938. return;
  939. }
  940. s->compare_timeout = value;
  941. }
  942. static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
  943. const char *name, void *opaque,
  944. Error **errp)
  945. {
  946. CompareState *s = COLO_COMPARE(obj);
  947. uint32_t value = s->expired_scan_cycle;
  948. visit_type_uint32(v, name, &value, errp);
  949. }
  950. static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
  951. const char *name, void *opaque,
  952. Error **errp)
  953. {
  954. CompareState *s = COLO_COMPARE(obj);
  955. uint32_t value;
  956. if (!visit_type_uint32(v, name, &value, errp)) {
  957. return;
  958. }
  959. if (!value) {
  960. error_setg(errp, "Property '%s.%s' requires a positive value",
  961. object_get_typename(obj), name);
  962. return;
  963. }
  964. s->expired_scan_cycle = value;
  965. }
  966. static void get_max_queue_size(Object *obj, Visitor *v,
  967. const char *name, void *opaque,
  968. Error **errp)
  969. {
  970. uint32_t value = max_queue_size;
  971. visit_type_uint32(v, name, &value, errp);
  972. }
  973. static void set_max_queue_size(Object *obj, Visitor *v,
  974. const char *name, void *opaque,
  975. Error **errp)
  976. {
  977. uint64_t value;
  978. if (!visit_type_uint64(v, name, &value, errp)) {
  979. return;
  980. }
  981. if (!value) {
  982. error_setg(errp, "Property '%s.%s' requires a positive value",
  983. object_get_typename(obj), name);
  984. return;
  985. }
  986. max_queue_size = value;
  987. }
  988. static void compare_pri_rs_finalize(SocketReadState *pri_rs)
  989. {
  990. CompareState *s = container_of(pri_rs, CompareState, pri_rs);
  991. Connection *conn = NULL;
  992. if (packet_enqueue(s, PRIMARY_IN, &conn)) {
  993. trace_colo_compare_main("primary: unsupported packet in");
  994. compare_chr_send(s,
  995. pri_rs->buf,
  996. pri_rs->packet_len,
  997. pri_rs->vnet_hdr_len,
  998. false,
  999. false);
  1000. } else {
  1001. /* compare packet in the specified connection */
  1002. colo_compare_connection(conn, s);
  1003. }
  1004. }
  1005. static void compare_sec_rs_finalize(SocketReadState *sec_rs)
  1006. {
  1007. CompareState *s = container_of(sec_rs, CompareState, sec_rs);
  1008. Connection *conn = NULL;
  1009. if (packet_enqueue(s, SECONDARY_IN, &conn)) {
  1010. trace_colo_compare_main("secondary: unsupported packet in");
  1011. } else {
  1012. /* compare packet in the specified connection */
  1013. colo_compare_connection(conn, s);
  1014. }
  1015. }
  1016. static void compare_notify_rs_finalize(SocketReadState *notify_rs)
  1017. {
  1018. CompareState *s = container_of(notify_rs, CompareState, notify_rs);
  1019. const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
  1020. int ret;
  1021. if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
  1022. notify_rs->buf,
  1023. notify_rs->packet_len)) {
  1024. ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
  1025. if (ret < 0) {
  1026. error_report("Notify Xen COLO-frame INIT failed");
  1027. }
  1028. } else if (packet_matches_str("COLO_CHECKPOINT",
  1029. notify_rs->buf,
  1030. notify_rs->packet_len)) {
  1031. /* colo-compare do checkpoint, flush pri packet and remove sec packet */
  1032. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1033. } else {
  1034. error_report("COLO compare got unsupported instruction");
  1035. }
  1036. }
  1037. /*
  1038. * Return 0 is success.
  1039. * Return 1 is failed.
  1040. */
  1041. static int find_and_check_chardev(Chardev **chr,
  1042. char *chr_name,
  1043. Error **errp)
  1044. {
  1045. *chr = qemu_chr_find(chr_name);
  1046. if (*chr == NULL) {
  1047. error_setg(errp, "Device '%s' not found",
  1048. chr_name);
  1049. return 1;
  1050. }
  1051. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
  1052. error_setg(errp, "chardev \"%s\" is not reconnectable",
  1053. chr_name);
  1054. return 1;
  1055. }
  1056. if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
  1057. error_setg(errp, "chardev \"%s\" cannot switch context",
  1058. chr_name);
  1059. return 1;
  1060. }
  1061. return 0;
  1062. }
  1063. /*
  1064. * Called from the main thread on the primary
  1065. * to setup colo-compare.
  1066. */
  1067. static void colo_compare_complete(UserCreatable *uc, Error **errp)
  1068. {
  1069. CompareState *s = COLO_COMPARE(uc);
  1070. Chardev *chr;
  1071. if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
  1072. error_setg(errp, "colo compare needs 'primary_in' ,"
  1073. "'secondary_in','outdev','iothread' property set");
  1074. return;
  1075. } else if (!strcmp(s->pri_indev, s->outdev) ||
  1076. !strcmp(s->sec_indev, s->outdev) ||
  1077. !strcmp(s->pri_indev, s->sec_indev)) {
  1078. error_setg(errp, "'indev' and 'outdev' could not be same "
  1079. "for compare module");
  1080. return;
  1081. }
  1082. if (!s->compare_timeout) {
  1083. /* Set default value to 3000 MS */
  1084. s->compare_timeout = DEFAULT_TIME_OUT_MS;
  1085. }
  1086. if (!s->expired_scan_cycle) {
  1087. /* Set default value to 1000 MS */
  1088. s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
  1089. }
  1090. if (!max_queue_size) {
  1091. /* Set default queue size to 1024 */
  1092. max_queue_size = MAX_QUEUE_SIZE;
  1093. }
  1094. if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
  1095. !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
  1096. return;
  1097. }
  1098. if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
  1099. !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
  1100. return;
  1101. }
  1102. if (find_and_check_chardev(&chr, s->outdev, errp) ||
  1103. !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
  1104. return;
  1105. }
  1106. net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
  1107. net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
  1108. /* Try to enable remote notify chardev, currently just for Xen COLO */
  1109. if (s->notify_dev) {
  1110. if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
  1111. !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
  1112. return;
  1113. }
  1114. net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
  1115. s->vnet_hdr);
  1116. }
  1117. s->out_sendco.s = s;
  1118. s->out_sendco.chr = &s->chr_out;
  1119. s->out_sendco.notify_remote_frame = false;
  1120. s->out_sendco.done = true;
  1121. g_queue_init(&s->out_sendco.send_list);
  1122. if (s->notify_dev) {
  1123. s->notify_sendco.s = s;
  1124. s->notify_sendco.chr = &s->chr_notify_dev;
  1125. s->notify_sendco.notify_remote_frame = true;
  1126. s->notify_sendco.done = true;
  1127. g_queue_init(&s->notify_sendco.send_list);
  1128. }
  1129. g_queue_init(&s->conn_list);
  1130. s->connection_track_table = g_hash_table_new_full(connection_key_hash,
  1131. connection_key_equal,
  1132. g_free,
  1133. NULL);
  1134. colo_compare_iothread(s);
  1135. qemu_mutex_lock(&colo_compare_mutex);
  1136. if (!colo_compare_active) {
  1137. qemu_mutex_init(&event_mtx);
  1138. qemu_cond_init(&event_complete_cond);
  1139. colo_compare_active = true;
  1140. }
  1141. QTAILQ_INSERT_TAIL(&net_compares, s, next);
  1142. qemu_mutex_unlock(&colo_compare_mutex);
  1143. return;
  1144. }
  1145. static void colo_flush_packets(void *opaque, void *user_data)
  1146. {
  1147. CompareState *s = user_data;
  1148. Connection *conn = opaque;
  1149. Packet *pkt = NULL;
  1150. while (!g_queue_is_empty(&conn->primary_list)) {
  1151. pkt = g_queue_pop_tail(&conn->primary_list);
  1152. compare_chr_send(s,
  1153. pkt->data,
  1154. pkt->size,
  1155. pkt->vnet_hdr_len,
  1156. false,
  1157. true);
  1158. packet_destroy_partial(pkt, NULL);
  1159. }
  1160. while (!g_queue_is_empty(&conn->secondary_list)) {
  1161. pkt = g_queue_pop_tail(&conn->secondary_list);
  1162. packet_destroy(pkt, NULL);
  1163. }
  1164. }
  1165. static void colo_compare_class_init(ObjectClass *oc, void *data)
  1166. {
  1167. UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
  1168. ucc->complete = colo_compare_complete;
  1169. }
  1170. static void colo_compare_init(Object *obj)
  1171. {
  1172. CompareState *s = COLO_COMPARE(obj);
  1173. object_property_add_str(obj, "primary_in",
  1174. compare_get_pri_indev, compare_set_pri_indev);
  1175. object_property_add_str(obj, "secondary_in",
  1176. compare_get_sec_indev, compare_set_sec_indev);
  1177. object_property_add_str(obj, "outdev",
  1178. compare_get_outdev, compare_set_outdev);
  1179. object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
  1180. (Object **)&s->iothread,
  1181. object_property_allow_set_link,
  1182. OBJ_PROP_LINK_STRONG);
  1183. /* This parameter just for Xen COLO */
  1184. object_property_add_str(obj, "notify_dev",
  1185. compare_get_notify_dev, compare_set_notify_dev);
  1186. object_property_add(obj, "compare_timeout", "uint64",
  1187. compare_get_timeout,
  1188. compare_set_timeout, NULL, NULL);
  1189. object_property_add(obj, "expired_scan_cycle", "uint32",
  1190. compare_get_expired_scan_cycle,
  1191. compare_set_expired_scan_cycle, NULL, NULL);
  1192. object_property_add(obj, "max_queue_size", "uint32",
  1193. get_max_queue_size,
  1194. set_max_queue_size, NULL, NULL);
  1195. s->vnet_hdr = false;
  1196. object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
  1197. compare_set_vnet_hdr);
  1198. }
  1199. void colo_compare_cleanup(void)
  1200. {
  1201. CompareState *tmp = NULL;
  1202. CompareState *n = NULL;
  1203. QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) {
  1204. object_unparent(OBJECT(tmp));
  1205. }
  1206. }
  1207. static void colo_compare_finalize(Object *obj)
  1208. {
  1209. CompareState *s = COLO_COMPARE(obj);
  1210. CompareState *tmp = NULL;
  1211. qemu_mutex_lock(&colo_compare_mutex);
  1212. QTAILQ_FOREACH(tmp, &net_compares, next) {
  1213. if (tmp == s) {
  1214. QTAILQ_REMOVE(&net_compares, s, next);
  1215. break;
  1216. }
  1217. }
  1218. if (QTAILQ_EMPTY(&net_compares)) {
  1219. colo_compare_active = false;
  1220. qemu_mutex_destroy(&event_mtx);
  1221. qemu_cond_destroy(&event_complete_cond);
  1222. }
  1223. qemu_mutex_unlock(&colo_compare_mutex);
  1224. qemu_chr_fe_deinit(&s->chr_pri_in, false);
  1225. qemu_chr_fe_deinit(&s->chr_sec_in, false);
  1226. qemu_chr_fe_deinit(&s->chr_out, false);
  1227. if (s->notify_dev) {
  1228. qemu_chr_fe_deinit(&s->chr_notify_dev, false);
  1229. }
  1230. colo_compare_timer_del(s);
  1231. qemu_bh_delete(s->event_bh);
  1232. AioContext *ctx = iothread_get_aio_context(s->iothread);
  1233. aio_context_acquire(ctx);
  1234. AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
  1235. if (s->notify_dev) {
  1236. AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
  1237. }
  1238. aio_context_release(ctx);
  1239. /* Release all unhandled packets after compare thead exited */
  1240. g_queue_foreach(&s->conn_list, colo_flush_packets, s);
  1241. AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
  1242. g_queue_clear(&s->conn_list);
  1243. g_queue_clear(&s->out_sendco.send_list);
  1244. if (s->notify_dev) {
  1245. g_queue_clear(&s->notify_sendco.send_list);
  1246. }
  1247. if (s->connection_track_table) {
  1248. g_hash_table_destroy(s->connection_track_table);
  1249. }
  1250. object_unref(OBJECT(s->iothread));
  1251. g_free(s->pri_indev);
  1252. g_free(s->sec_indev);
  1253. g_free(s->outdev);
  1254. g_free(s->notify_dev);
  1255. }
  1256. static void __attribute__((__constructor__)) colo_compare_init_globals(void)
  1257. {
  1258. colo_compare_active = false;
  1259. qemu_mutex_init(&colo_compare_mutex);
  1260. }
  1261. static const TypeInfo colo_compare_info = {
  1262. .name = TYPE_COLO_COMPARE,
  1263. .parent = TYPE_OBJECT,
  1264. .instance_size = sizeof(CompareState),
  1265. .instance_init = colo_compare_init,
  1266. .instance_finalize = colo_compare_finalize,
  1267. .class_size = sizeof(CompareClass),
  1268. .class_init = colo_compare_class_init,
  1269. .interfaces = (InterfaceInfo[]) {
  1270. { TYPE_USER_CREATABLE },
  1271. { }
  1272. }
  1273. };
  1274. static void register_types(void)
  1275. {
  1276. type_register_static(&colo_compare_info);
  1277. }
  1278. type_init(register_types);