qcow2_format.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # Library for manipulations with qcow2 image
  2. #
  3. # Copyright (c) 2020 Virtuozzo International GmbH.
  4. # Copyright (C) 2012 Red Hat, Inc.
  5. #
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation; either version 2 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import struct
  20. import string
  21. import json
  22. class ComplexEncoder(json.JSONEncoder):
  23. def default(self, obj):
  24. if hasattr(obj, 'to_json'):
  25. return obj.to_json()
  26. else:
  27. return json.JSONEncoder.default(self, obj)
  28. class Qcow2Field:
  29. def __init__(self, value):
  30. self.value = value
  31. def __str__(self):
  32. return str(self.value)
  33. class Flags64(Qcow2Field):
  34. def __str__(self):
  35. bits = []
  36. for bit in range(64):
  37. if self.value & (1 << bit):
  38. bits.append(bit)
  39. return str(bits)
  40. class BitmapFlags(Qcow2Field):
  41. flags = {
  42. 0x1: 'in-use',
  43. 0x2: 'auto'
  44. }
  45. def __str__(self):
  46. bits = []
  47. for bit in range(64):
  48. flag = self.value & (1 << bit)
  49. if flag:
  50. bits.append(self.flags.get(flag, f'bit-{bit}'))
  51. return f'{self.value:#x} ({bits})'
  52. class Enum(Qcow2Field):
  53. def __str__(self):
  54. return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
  55. class Qcow2StructMeta(type):
  56. # Mapping from c types to python struct format
  57. ctypes = {
  58. 'u8': 'B',
  59. 'u16': 'H',
  60. 'u32': 'I',
  61. 'u64': 'Q'
  62. }
  63. def __init__(self, name, bases, attrs):
  64. if 'fields' in attrs:
  65. self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
  66. class Qcow2Struct(metaclass=Qcow2StructMeta):
  67. """Qcow2Struct: base class for qcow2 data structures
  68. Successors should define fields class variable, which is: list of tuples,
  69. each of three elements:
  70. - c-type (one of 'u8', 'u16', 'u32', 'u64')
  71. - format (format_spec to use with .format() when dump or 'mask' to dump
  72. bitmasks)
  73. - field name
  74. """
  75. def __init__(self, fd=None, offset=None, data=None):
  76. """
  77. Two variants:
  78. 1. Specify data. fd and offset must be None.
  79. 2. Specify fd and offset, data must be None. offset may be omitted
  80. in this case, than current position of fd is used.
  81. """
  82. if data is None:
  83. assert fd is not None
  84. buf_size = struct.calcsize(self.fmt)
  85. if offset is not None:
  86. fd.seek(offset)
  87. data = fd.read(buf_size)
  88. else:
  89. assert fd is None and offset is None
  90. values = struct.unpack(self.fmt, data)
  91. self.__dict__ = dict((field[2], values[i])
  92. for i, field in enumerate(self.fields))
  93. def dump(self, is_json=False):
  94. if is_json:
  95. print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
  96. return
  97. for f in self.fields:
  98. value = self.__dict__[f[2]]
  99. if isinstance(f[1], str):
  100. value_str = f[1].format(value)
  101. else:
  102. value_str = str(f[1](value))
  103. print('{:<25} {}'.format(f[2], value_str))
  104. def to_json(self):
  105. return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
  106. class Qcow2BitmapExt(Qcow2Struct):
  107. fields = (
  108. ('u32', '{}', 'nb_bitmaps'),
  109. ('u32', '{}', 'reserved32'),
  110. ('u64', '{:#x}', 'bitmap_directory_size'),
  111. ('u64', '{:#x}', 'bitmap_directory_offset')
  112. )
  113. def __init__(self, fd, cluster_size):
  114. super().__init__(fd=fd)
  115. tail = struct.calcsize(self.fmt) % 8
  116. if tail:
  117. fd.seek(8 - tail, 1)
  118. position = fd.tell()
  119. self.cluster_size = cluster_size
  120. self.read_bitmap_directory(fd)
  121. fd.seek(position)
  122. def read_bitmap_directory(self, fd):
  123. fd.seek(self.bitmap_directory_offset)
  124. self.bitmap_directory = \
  125. [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
  126. for _ in range(self.nb_bitmaps)]
  127. def dump(self):
  128. super().dump()
  129. for entry in self.bitmap_directory:
  130. print()
  131. entry.dump()
  132. def to_json(self):
  133. fields_dict = super().to_json()
  134. fields_dict['bitmap_directory'] = self.bitmap_directory
  135. return fields_dict
  136. class Qcow2BitmapDirEntry(Qcow2Struct):
  137. fields = (
  138. ('u64', '{:#x}', 'bitmap_table_offset'),
  139. ('u32', '{}', 'bitmap_table_size'),
  140. ('u32', BitmapFlags, 'flags'),
  141. ('u8', '{}', 'type'),
  142. ('u8', '{}', 'granularity_bits'),
  143. ('u16', '{}', 'name_size'),
  144. ('u32', '{}', 'extra_data_size')
  145. )
  146. def __init__(self, fd, cluster_size):
  147. super().__init__(fd=fd)
  148. self.cluster_size = cluster_size
  149. # Seek relative to the current position in the file
  150. fd.seek(self.extra_data_size, 1)
  151. bitmap_name = fd.read(self.name_size)
  152. self.name = bitmap_name.decode('ascii')
  153. # Move position to the end of the entry in the directory
  154. entry_raw_size = self.bitmap_dir_entry_raw_size()
  155. padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
  156. fd.seek(padding, 1)
  157. self.bitmap_table = Qcow2BitmapTable(fd=fd,
  158. offset=self.bitmap_table_offset,
  159. nb_entries=self.bitmap_table_size,
  160. cluster_size=self.cluster_size)
  161. def bitmap_dir_entry_raw_size(self):
  162. return struct.calcsize(self.fmt) + self.name_size + \
  163. self.extra_data_size
  164. def dump(self):
  165. print(f'{"Bitmap name":<25} {self.name}')
  166. super(Qcow2BitmapDirEntry, self).dump()
  167. self.bitmap_table.dump()
  168. def to_json(self):
  169. # Put the name ahead of the dict
  170. return {
  171. 'name': self.name,
  172. **super().to_json(),
  173. 'bitmap_table': self.bitmap_table
  174. }
  175. class Qcow2BitmapTableEntry(Qcow2Struct):
  176. fields = (
  177. ('u64', '{}', 'entry'),
  178. )
  179. BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
  180. BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
  181. BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
  182. def __init__(self, fd):
  183. super().__init__(fd=fd)
  184. self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
  185. self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
  186. if self.offset:
  187. if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
  188. self.type = 'invalid'
  189. else:
  190. self.type = 'serialized'
  191. elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
  192. self.type = 'all-ones'
  193. else:
  194. self.type = 'all-zeroes'
  195. def to_json(self):
  196. return {'type': self.type, 'offset': self.offset,
  197. 'reserved': self.reserved}
  198. class Qcow2BitmapTable:
  199. def __init__(self, fd, offset, nb_entries, cluster_size):
  200. self.cluster_size = cluster_size
  201. position = fd.tell()
  202. fd.seek(offset)
  203. self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
  204. fd.seek(position)
  205. def dump(self):
  206. bitmap_table = enumerate(self.entries)
  207. print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
  208. for i, entry in bitmap_table:
  209. if entry.type == 'serialized':
  210. size = self.cluster_size
  211. else:
  212. size = 0
  213. print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
  214. def to_json(self):
  215. return self.entries
  216. QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
  217. class QcowHeaderExtension(Qcow2Struct):
  218. class Magic(Enum):
  219. mapping = {
  220. 0xe2792aca: 'Backing format',
  221. 0x6803f857: 'Feature table',
  222. 0x0537be77: 'Crypto header',
  223. QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
  224. 0x44415441: 'Data file'
  225. }
  226. def to_json(self):
  227. return self.mapping.get(self.value, "<unknown>")
  228. fields = (
  229. ('u32', Magic, 'magic'),
  230. ('u32', '{}', 'length')
  231. # length bytes of data follows
  232. # then padding to next multiply of 8
  233. )
  234. def __init__(self, magic=None, length=None, data=None, fd=None,
  235. cluster_size=None):
  236. """
  237. Support both loading from fd and creation from user data.
  238. For fd-based creation current position in a file will be used to read
  239. the data.
  240. The cluster_size value may be obtained by dependent structures.
  241. This should be somehow refactored and functionality should be moved to
  242. superclass (to allow creation of any qcow2 struct), but then, fields
  243. of variable length (data here) should be supported in base class
  244. somehow. Note also, that we probably want to parse different
  245. extensions. Should they be subclasses of this class, or how to do it
  246. better? Should it be something like QAPI union with discriminator field
  247. (magic here). So, it's a TODO. We'll see how to properly refactor this
  248. when we have more qcow2 structures.
  249. """
  250. if fd is None:
  251. assert all(v is not None for v in (magic, length, data))
  252. self.magic = magic
  253. self.length = length
  254. if length % 8 != 0:
  255. padding = 8 - (length % 8)
  256. data += b'\0' * padding
  257. self.data = data
  258. else:
  259. assert all(v is None for v in (magic, length, data))
  260. super().__init__(fd=fd)
  261. if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
  262. self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
  263. self.data = None
  264. else:
  265. padded = (self.length + 7) & ~7
  266. self.data = fd.read(padded)
  267. assert self.data is not None
  268. self.obj = None
  269. if self.data is not None:
  270. data_str = self.data[:self.length]
  271. if all(c in string.printable.encode(
  272. 'ascii') for c in data_str):
  273. data_str = f"'{ data_str.decode('ascii') }'"
  274. else:
  275. data_str = '<binary>'
  276. self.data_str = data_str
  277. def dump(self):
  278. super().dump()
  279. if self.obj is None:
  280. print(f'{"data":<25} {self.data_str}')
  281. else:
  282. self.obj.dump()
  283. def to_json(self):
  284. # Put the name ahead of the dict
  285. res = {'name': self.Magic(self.magic), **super().to_json()}
  286. if self.obj is not None:
  287. res['data'] = self.obj
  288. else:
  289. res['data_str'] = self.data_str
  290. return res
  291. @classmethod
  292. def create(cls, magic, data):
  293. return QcowHeaderExtension(magic, len(data), data)
  294. class QcowHeader(Qcow2Struct):
  295. fields = (
  296. # Version 2 header fields
  297. ('u32', '{:#x}', 'magic'),
  298. ('u32', '{}', 'version'),
  299. ('u64', '{:#x}', 'backing_file_offset'),
  300. ('u32', '{:#x}', 'backing_file_size'),
  301. ('u32', '{}', 'cluster_bits'),
  302. ('u64', '{}', 'size'),
  303. ('u32', '{}', 'crypt_method'),
  304. ('u32', '{}', 'l1_size'),
  305. ('u64', '{:#x}', 'l1_table_offset'),
  306. ('u64', '{:#x}', 'refcount_table_offset'),
  307. ('u32', '{}', 'refcount_table_clusters'),
  308. ('u32', '{}', 'nb_snapshots'),
  309. ('u64', '{:#x}', 'snapshot_offset'),
  310. # Version 3 header fields
  311. ('u64', Flags64, 'incompatible_features'),
  312. ('u64', Flags64, 'compatible_features'),
  313. ('u64', Flags64, 'autoclear_features'),
  314. ('u32', '{}', 'refcount_order'),
  315. ('u32', '{}', 'header_length'),
  316. )
  317. def __init__(self, fd):
  318. super().__init__(fd=fd, offset=0)
  319. self.set_defaults()
  320. self.cluster_size = 1 << self.cluster_bits
  321. fd.seek(self.header_length)
  322. self.load_extensions(fd)
  323. if self.backing_file_offset:
  324. fd.seek(self.backing_file_offset)
  325. self.backing_file = fd.read(self.backing_file_size)
  326. else:
  327. self.backing_file = None
  328. def set_defaults(self):
  329. if self.version == 2:
  330. self.incompatible_features = 0
  331. self.compatible_features = 0
  332. self.autoclear_features = 0
  333. self.refcount_order = 4
  334. self.header_length = 72
  335. def load_extensions(self, fd):
  336. self.extensions = []
  337. if self.backing_file_offset != 0:
  338. end = min(self.cluster_size, self.backing_file_offset)
  339. else:
  340. end = self.cluster_size
  341. while fd.tell() < end:
  342. ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
  343. if ext.magic == 0:
  344. break
  345. else:
  346. self.extensions.append(ext)
  347. def update_extensions(self, fd):
  348. fd.seek(self.header_length)
  349. extensions = self.extensions
  350. extensions.append(QcowHeaderExtension(0, 0, b''))
  351. for ex in extensions:
  352. buf = struct.pack('>II', ex.magic, ex.length)
  353. fd.write(buf)
  354. fd.write(ex.data)
  355. if self.backing_file is not None:
  356. self.backing_file_offset = fd.tell()
  357. fd.write(self.backing_file)
  358. if fd.tell() > self.cluster_size:
  359. raise Exception('I think I just broke the image...')
  360. def update(self, fd):
  361. header_bytes = self.header_length
  362. self.update_extensions(fd)
  363. fd.seek(0)
  364. header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
  365. buf = struct.pack(QcowHeader.fmt, *header)
  366. buf = buf[0:header_bytes-1]
  367. fd.write(buf)
  368. def dump_extensions(self, is_json=False):
  369. if is_json:
  370. print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
  371. return
  372. for ex in self.extensions:
  373. print('Header extension:')
  374. ex.dump()
  375. print()