nbd-fault-injector.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. #!/usr/bin/env python3
  2. # NBD server - fault injection utility
  3. #
  4. # Configuration file syntax:
  5. # [inject-error "disconnect-neg1"]
  6. # event=neg1
  7. # io=readwrite
  8. # when=before
  9. #
  10. # Note that Python's ConfigParser squashes together all sections with the same
  11. # name, so give each [inject-error] a unique name.
  12. #
  13. # inject-error options:
  14. # event - name of the trigger event
  15. # "neg1" - first part of negotiation struct
  16. # "export" - export struct
  17. # "neg2" - second part of negotiation struct
  18. # "request" - NBD request struct
  19. # "reply" - NBD reply struct
  20. # "data" - request/reply data
  21. # io - I/O direction that triggers this rule:
  22. # "read", "write", or "readwrite"
  23. # default: readwrite
  24. # when - after how many bytes to inject the fault
  25. # -1 - inject error after I/O
  26. # 0 - inject error before I/O
  27. # integer - inject error after integer bytes
  28. # "before" - alias for 0
  29. # "after" - alias for -1
  30. # default: before
  31. #
  32. # Currently the only error injection action is to terminate the server process.
  33. # This resets the TCP connection and thus forces the client to handle
  34. # unexpected connection termination.
  35. #
  36. # Other error injection actions could be added in the future.
  37. #
  38. # Copyright Red Hat, Inc. 2014
  39. #
  40. # Authors:
  41. # Stefan Hajnoczi <stefanha@redhat.com>
  42. #
  43. # This work is licensed under the terms of the GNU GPL, version 2 or later.
  44. # See the COPYING file in the top-level directory.
  45. import sys
  46. import socket
  47. import struct
  48. import collections
  49. import configparser
  50. FAKE_DISK_SIZE = 8 * 1024 * 1024 * 1024 # 8 GB
  51. # Protocol constants
  52. NBD_CMD_READ = 0
  53. NBD_CMD_WRITE = 1
  54. NBD_CMD_DISC = 2
  55. NBD_REQUEST_MAGIC = 0x25609513
  56. NBD_SIMPLE_REPLY_MAGIC = 0x67446698
  57. NBD_PASSWD = 0x4e42444d41474943
  58. NBD_OPTS_MAGIC = 0x49484156454F5054
  59. NBD_CLIENT_MAGIC = 0x0000420281861253
  60. NBD_OPT_EXPORT_NAME = 1 << 0
  61. # Protocol structs
  62. neg_classic_struct = struct.Struct('>QQQI124x')
  63. neg1_struct = struct.Struct('>QQH')
  64. export_tuple = collections.namedtuple('Export', 'reserved magic opt len')
  65. export_struct = struct.Struct('>IQII')
  66. neg2_struct = struct.Struct('>QH124x')
  67. request_tuple = collections.namedtuple('Request', 'magic type handle from_ len')
  68. request_struct = struct.Struct('>IIQQI')
  69. reply_struct = struct.Struct('>IIQ')
  70. def err(msg):
  71. sys.stderr.write(msg + '\n')
  72. sys.exit(1)
  73. def recvall(sock, bufsize):
  74. received = 0
  75. chunks = []
  76. while received < bufsize:
  77. chunk = sock.recv(bufsize - received)
  78. if len(chunk) == 0:
  79. raise Exception('unexpected disconnect')
  80. chunks.append(chunk)
  81. received += len(chunk)
  82. return b''.join(chunks)
  83. class Rule(object):
  84. def __init__(self, name, event, io, when):
  85. self.name = name
  86. self.event = event
  87. self.io = io
  88. self.when = when
  89. def match(self, event, io):
  90. if event != self.event:
  91. return False
  92. if io != self.io and self.io != 'readwrite':
  93. return False
  94. return True
  95. class FaultInjectionSocket(object):
  96. def __init__(self, sock, rules):
  97. self.sock = sock
  98. self.rules = rules
  99. def check(self, event, io, bufsize=None):
  100. for rule in self.rules:
  101. if rule.match(event, io):
  102. if rule.when == 0 or bufsize is None:
  103. print('Closing connection on rule match %s' % rule.name)
  104. self.sock.close()
  105. sys.stdout.flush()
  106. sys.exit(0)
  107. if rule.when != -1:
  108. return rule.when
  109. return bufsize
  110. def send(self, buf, event):
  111. bufsize = self.check(event, 'write', bufsize=len(buf))
  112. self.sock.sendall(buf[:bufsize])
  113. self.check(event, 'write')
  114. def recv(self, bufsize, event):
  115. bufsize = self.check(event, 'read', bufsize=bufsize)
  116. data = recvall(self.sock, bufsize)
  117. self.check(event, 'read')
  118. return data
  119. def close(self):
  120. self.sock.close()
  121. def negotiate_classic(conn):
  122. buf = neg_classic_struct.pack(NBD_PASSWD, NBD_CLIENT_MAGIC,
  123. FAKE_DISK_SIZE, 0)
  124. conn.send(buf, event='neg-classic')
  125. def negotiate_export(conn):
  126. # Send negotiation part 1
  127. buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0)
  128. conn.send(buf, event='neg1')
  129. # Receive export option
  130. buf = conn.recv(export_struct.size, event='export')
  131. export = export_tuple._make(export_struct.unpack(buf))
  132. assert export.magic == NBD_OPTS_MAGIC
  133. assert export.opt == NBD_OPT_EXPORT_NAME
  134. name = conn.recv(export.len, event='export-name')
  135. # Send negotiation part 2
  136. buf = neg2_struct.pack(FAKE_DISK_SIZE, 0)
  137. conn.send(buf, event='neg2')
  138. def negotiate(conn, use_export):
  139. '''Negotiate export with client'''
  140. if use_export:
  141. negotiate_export(conn)
  142. else:
  143. negotiate_classic(conn)
  144. def read_request(conn):
  145. '''Parse NBD request from client'''
  146. buf = conn.recv(request_struct.size, event='request')
  147. req = request_tuple._make(request_struct.unpack(buf))
  148. assert req.magic == NBD_REQUEST_MAGIC
  149. return req
  150. def write_reply(conn, error, handle):
  151. buf = reply_struct.pack(NBD_SIMPLE_REPLY_MAGIC, error, handle)
  152. conn.send(buf, event='reply')
  153. def handle_connection(conn, use_export):
  154. negotiate(conn, use_export)
  155. while True:
  156. req = read_request(conn)
  157. if req.type == NBD_CMD_READ:
  158. write_reply(conn, 0, req.handle)
  159. conn.send(b'\0' * req.len, event='data')
  160. elif req.type == NBD_CMD_WRITE:
  161. _ = conn.recv(req.len, event='data')
  162. write_reply(conn, 0, req.handle)
  163. elif req.type == NBD_CMD_DISC:
  164. break
  165. else:
  166. print('unrecognized command type %#02x' % req.type)
  167. break
  168. conn.close()
  169. def run_server(sock, rules, use_export):
  170. while True:
  171. conn, _ = sock.accept()
  172. handle_connection(FaultInjectionSocket(conn, rules), use_export)
  173. def parse_inject_error(name, options):
  174. if 'event' not in options:
  175. err('missing \"event\" option in %s' % name)
  176. event = options['event']
  177. if event not in ('neg-classic', 'neg1', 'export', 'neg2', 'request', 'reply', 'data'):
  178. err('invalid \"event\" option value \"%s\" in %s' % (event, name))
  179. io = options.get('io', 'readwrite')
  180. if io not in ('read', 'write', 'readwrite'):
  181. err('invalid \"io\" option value \"%s\" in %s' % (io, name))
  182. when = options.get('when', 'before')
  183. try:
  184. when = int(when)
  185. except ValueError:
  186. if when == 'before':
  187. when = 0
  188. elif when == 'after':
  189. when = -1
  190. else:
  191. err('invalid \"when\" option value \"%s\" in %s' % (when, name))
  192. return Rule(name, event, io, when)
  193. def parse_config(config):
  194. rules = []
  195. for name in config.sections():
  196. if name.startswith('inject-error'):
  197. options = dict(config.items(name))
  198. rules.append(parse_inject_error(name, options))
  199. else:
  200. err('invalid config section name: %s' % name)
  201. return rules
  202. def load_rules(filename):
  203. config = configparser.RawConfigParser()
  204. with open(filename, 'rt') as f:
  205. config.read_file(f, filename)
  206. return parse_config(config)
  207. def open_socket(path):
  208. '''Open a TCP or UNIX domain listen socket'''
  209. if ':' in path:
  210. host, port = path.split(':', 1)
  211. sock = socket.socket()
  212. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  213. sock.bind((host, int(port)))
  214. # If given port was 0 the final port number is now available
  215. path = '%s:%d' % sock.getsockname()
  216. else:
  217. sock = socket.socket(socket.AF_UNIX)
  218. sock.bind(path)
  219. sock.listen(0)
  220. print('Listening on %s' % path)
  221. sys.stdout.flush() # another process may be waiting, show message now
  222. return sock
  223. def usage(args):
  224. sys.stderr.write('usage: %s [--classic-negotiation] <tcp-port>|<unix-path> <config-file>\n' % args[0])
  225. sys.stderr.write('Run an fault injector NBD server with rules defined in a config file.\n')
  226. sys.exit(1)
  227. def main(args):
  228. if len(args) != 3 and len(args) != 4:
  229. usage(args)
  230. use_export = True
  231. if args[1] == '--classic-negotiation':
  232. use_export = False
  233. elif len(args) == 4:
  234. usage(args)
  235. sock = open_socket(args[1 if use_export else 2])
  236. rules = load_rules(args[2 if use_export else 3])
  237. run_server(sock, rules, use_export)
  238. return 0
  239. if __name__ == '__main__':
  240. sys.exit(main(sys.argv))