fat16.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. # A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
  2. #
  3. # Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
  4. #
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 2 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. from typing import Callable, List, Optional, Protocol, Set
  18. import string
  19. SECTOR_SIZE = 512
  20. DIRENTRY_SIZE = 32
  21. ALLOWED_FILE_CHARS = set(
  22. "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase
  23. )
  24. class MBR:
  25. def __init__(self, data: bytes):
  26. assert len(data) == 512
  27. self.partition_table = []
  28. for i in range(4):
  29. partition = data[446 + i * 16 : 446 + (i + 1) * 16]
  30. self.partition_table.append(
  31. {
  32. "status": partition[0],
  33. "start_head": partition[1],
  34. "start_sector": partition[2] & 0x3F,
  35. "start_cylinder": ((partition[2] & 0xC0) << 2)
  36. | partition[3],
  37. "type": partition[4],
  38. "end_head": partition[5],
  39. "end_sector": partition[6] & 0x3F,
  40. "end_cylinder": ((partition[6] & 0xC0) << 2)
  41. | partition[7],
  42. "start_lba": int.from_bytes(partition[8:12], "little"),
  43. "size": int.from_bytes(partition[12:16], "little"),
  44. }
  45. )
  46. def __str__(self):
  47. return "\n".join(
  48. [
  49. f"{i}: {partition}"
  50. for i, partition in enumerate(self.partition_table)
  51. ]
  52. )
  53. class FatBootSector:
  54. # pylint: disable=too-many-instance-attributes
  55. def __init__(self, data: bytes):
  56. assert len(data) == 512
  57. self.bytes_per_sector = int.from_bytes(data[11:13], "little")
  58. self.sectors_per_cluster = data[13]
  59. self.reserved_sectors = int.from_bytes(data[14:16], "little")
  60. self.fat_count = data[16]
  61. self.root_entries = int.from_bytes(data[17:19], "little")
  62. total_sectors_16 = int.from_bytes(data[19:21], "little")
  63. self.media_descriptor = data[21]
  64. self.sectors_per_fat = int.from_bytes(data[22:24], "little")
  65. self.sectors_per_track = int.from_bytes(data[24:26], "little")
  66. self.heads = int.from_bytes(data[26:28], "little")
  67. self.hidden_sectors = int.from_bytes(data[28:32], "little")
  68. total_sectors_32 = int.from_bytes(data[32:36], "little")
  69. assert (
  70. total_sectors_16 == 0 or total_sectors_32 == 0
  71. ), "Both total sectors (16 and 32) fields are non-zero"
  72. self.total_sectors = total_sectors_16 or total_sectors_32
  73. self.drive_number = data[36]
  74. self.volume_id = int.from_bytes(data[39:43], "little")
  75. self.volume_label = data[43:54].decode("ascii").strip()
  76. self.fs_type = data[54:62].decode("ascii").strip()
  77. def root_dir_start(self):
  78. """
  79. Calculate the start sector of the root directory.
  80. """
  81. return self.reserved_sectors + self.fat_count * self.sectors_per_fat
  82. def root_dir_size(self):
  83. """
  84. Calculate the size of the root directory in sectors.
  85. """
  86. return (
  87. self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
  88. ) // self.bytes_per_sector
  89. def data_sector_start(self):
  90. """
  91. Calculate the start sector of the data region.
  92. """
  93. return self.root_dir_start() + self.root_dir_size()
  94. def first_sector_of_cluster(self, cluster: int) -> int:
  95. """
  96. Calculate the first sector of the given cluster.
  97. """
  98. return (
  99. self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster
  100. )
  101. def cluster_bytes(self):
  102. """
  103. Calculate the number of bytes in a cluster.
  104. """
  105. return self.bytes_per_sector * self.sectors_per_cluster
  106. def __str__(self):
  107. return (
  108. f"Bytes per sector: {self.bytes_per_sector}\n"
  109. f"Sectors per cluster: {self.sectors_per_cluster}\n"
  110. f"Reserved sectors: {self.reserved_sectors}\n"
  111. f"FAT count: {self.fat_count}\n"
  112. f"Root entries: {self.root_entries}\n"
  113. f"Total sectors: {self.total_sectors}\n"
  114. f"Media descriptor: {self.media_descriptor}\n"
  115. f"Sectors per FAT: {self.sectors_per_fat}\n"
  116. f"Sectors per track: {self.sectors_per_track}\n"
  117. f"Heads: {self.heads}\n"
  118. f"Hidden sectors: {self.hidden_sectors}\n"
  119. f"Drive number: {self.drive_number}\n"
  120. f"Volume ID: {self.volume_id}\n"
  121. f"Volume label: {self.volume_label}\n"
  122. f"FS type: {self.fs_type}\n"
  123. )
  124. class FatDirectoryEntry:
  125. # pylint: disable=too-many-instance-attributes
  126. def __init__(self, data: bytes, sector: int, offset: int):
  127. self.name = data[0:8].decode("ascii").strip()
  128. self.ext = data[8:11].decode("ascii").strip()
  129. self.attributes = data[11]
  130. self.reserved = data[12]
  131. self.create_time_tenth = data[13]
  132. self.create_time = int.from_bytes(data[14:16], "little")
  133. self.create_date = int.from_bytes(data[16:18], "little")
  134. self.last_access_date = int.from_bytes(data[18:20], "little")
  135. high_cluster = int.from_bytes(data[20:22], "little")
  136. self.last_mod_time = int.from_bytes(data[22:24], "little")
  137. self.last_mod_date = int.from_bytes(data[24:26], "little")
  138. low_cluster = int.from_bytes(data[26:28], "little")
  139. self.cluster = (high_cluster << 16) | low_cluster
  140. self.size_bytes = int.from_bytes(data[28:32], "little")
  141. # extra (to help write back to disk)
  142. self.sector = sector
  143. self.offset = offset
  144. def as_bytes(self) -> bytes:
  145. return (
  146. self.name.ljust(8, " ").encode("ascii")
  147. + self.ext.ljust(3, " ").encode("ascii")
  148. + self.attributes.to_bytes(1, "little")
  149. + self.reserved.to_bytes(1, "little")
  150. + self.create_time_tenth.to_bytes(1, "little")
  151. + self.create_time.to_bytes(2, "little")
  152. + self.create_date.to_bytes(2, "little")
  153. + self.last_access_date.to_bytes(2, "little")
  154. + (self.cluster >> 16).to_bytes(2, "little")
  155. + self.last_mod_time.to_bytes(2, "little")
  156. + self.last_mod_date.to_bytes(2, "little")
  157. + (self.cluster & 0xFFFF).to_bytes(2, "little")
  158. + self.size_bytes.to_bytes(4, "little")
  159. )
  160. def whole_name(self):
  161. if self.ext:
  162. return f"{self.name}.{self.ext}"
  163. else:
  164. return self.name
  165. def __str__(self):
  166. return (
  167. f"Name: {self.name}\n"
  168. f"Ext: {self.ext}\n"
  169. f"Attributes: {self.attributes}\n"
  170. f"Reserved: {self.reserved}\n"
  171. f"Create time tenth: {self.create_time_tenth}\n"
  172. f"Create time: {self.create_time}\n"
  173. f"Create date: {self.create_date}\n"
  174. f"Last access date: {self.last_access_date}\n"
  175. f"Last mod time: {self.last_mod_time}\n"
  176. f"Last mod date: {self.last_mod_date}\n"
  177. f"Cluster: {self.cluster}\n"
  178. f"Size: {self.size_bytes}\n"
  179. )
  180. def __repr__(self):
  181. # convert to dict
  182. return str(vars(self))
  183. class SectorReader(Protocol):
  184. def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ...
  185. # pylint: disable=broad-exception-raised
  186. class Fat16:
  187. def __init__(
  188. self,
  189. start_sector: int,
  190. size: int,
  191. sector_reader: SectorReader,
  192. sector_writer: Callable[[int, bytes], None]
  193. ):
  194. self.start_sector = start_sector
  195. self.size_in_sectors = size
  196. self.sector_reader = sector_reader
  197. self.sector_writer = sector_writer
  198. self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1))
  199. fat_size_in_sectors = (
  200. self.boot_sector.sectors_per_fat * self.boot_sector.fat_count
  201. )
  202. self.fats = self.read_sectors(
  203. self.boot_sector.reserved_sectors, fat_size_in_sectors
  204. )
  205. self.fats_dirty_sectors: Set[int] = set()
  206. def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
  207. return self.sector_reader(start_sector + self.start_sector,
  208. num_sectors)
  209. def write_sectors(self, start_sector: int, data: bytes) -> None:
  210. return self.sector_writer(start_sector + self.start_sector, data)
  211. def directory_from_bytes(
  212. self, data: bytes, start_sector: int
  213. ) -> List[FatDirectoryEntry]:
  214. """
  215. Convert `bytes` into a list of `FatDirectoryEntry` objects.
  216. Will ignore long file names.
  217. Will stop when it encounters a 0x00 byte.
  218. """
  219. entries = []
  220. for i in range(0, len(data), DIRENTRY_SIZE):
  221. entry = data[i : i + DIRENTRY_SIZE]
  222. current_sector = start_sector + (i // SECTOR_SIZE)
  223. current_offset = i % SECTOR_SIZE
  224. if entry[0] == 0:
  225. break
  226. if entry[0] == 0xE5:
  227. # Deleted file
  228. continue
  229. if entry[11] & 0xF == 0xF:
  230. # Long file name
  231. continue
  232. entries.append(
  233. FatDirectoryEntry(entry, current_sector, current_offset)
  234. )
  235. return entries
  236. def read_root_directory(self) -> List[FatDirectoryEntry]:
  237. root_dir = self.read_sectors(
  238. self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
  239. )
  240. return self.directory_from_bytes(
  241. root_dir, self.boot_sector.root_dir_start()
  242. )
  243. def read_fat_entry(self, cluster: int) -> int:
  244. """
  245. Read the FAT entry for the given cluster.
  246. """
  247. fat_offset = cluster * 2 # FAT16
  248. return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
  249. def write_fat_entry(self, cluster: int, value: int) -> None:
  250. """
  251. Write the FAT entry for the given cluster.
  252. """
  253. fat_offset = cluster * 2
  254. self.fats = (
  255. self.fats[:fat_offset]
  256. + value.to_bytes(2, "little")
  257. + self.fats[fat_offset + 2 :]
  258. )
  259. self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
  260. def flush_fats(self) -> None:
  261. """
  262. Write the FATs back to the disk.
  263. """
  264. for sector in self.fats_dirty_sectors:
  265. data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
  266. sector = self.boot_sector.reserved_sectors + sector
  267. self.write_sectors(sector, data)
  268. self.fats_dirty_sectors = set()
  269. def next_cluster(self, cluster: int) -> Optional[int]:
  270. """
  271. Get the next cluster in the chain.
  272. If its `None`, then its the last cluster.
  273. The function will crash if the next cluster
  274. is `FREE` (unexpected) or invalid entry.
  275. """
  276. fat_entry = self.read_fat_entry(cluster)
  277. if fat_entry == 0:
  278. raise Exception("Unexpected: FREE cluster")
  279. if fat_entry == 1:
  280. raise Exception("Unexpected: RESERVED cluster")
  281. if fat_entry >= 0xFFF8:
  282. return None
  283. if fat_entry >= 0xFFF7:
  284. raise Exception("Invalid FAT entry")
  285. return fat_entry
  286. def next_free_cluster(self) -> int:
  287. """
  288. Find the next free cluster.
  289. """
  290. # simple linear search
  291. for i in range(2, 0xFFFF):
  292. if self.read_fat_entry(i) == 0:
  293. return i
  294. raise Exception("No free clusters")
  295. def next_free_cluster_non_continuous(self) -> int:
  296. """
  297. Find the next free cluster, but makes sure
  298. that the cluster before and after it are not allocated.
  299. """
  300. # simple linear search
  301. before = False
  302. for i in range(2, 0xFFFF):
  303. if self.read_fat_entry(i) == 0:
  304. if before and self.read_fat_entry(i + 1) == 0:
  305. return i
  306. else:
  307. before = True
  308. else:
  309. before = False
  310. raise Exception("No free clusters")
  311. def read_cluster(self, cluster: int) -> bytes:
  312. """
  313. Read the cluster at the given cluster.
  314. """
  315. return self.read_sectors(
  316. self.boot_sector.first_sector_of_cluster(cluster),
  317. self.boot_sector.sectors_per_cluster,
  318. )
  319. def write_cluster(self, cluster: int, data: bytes) -> None:
  320. """
  321. Write the cluster at the given cluster.
  322. """
  323. assert len(data) == self.boot_sector.cluster_bytes()
  324. self.write_sectors(
  325. self.boot_sector.first_sector_of_cluster(cluster),
  326. data,
  327. )
  328. def read_directory(
  329. self, cluster: Optional[int]
  330. ) -> List[FatDirectoryEntry]:
  331. """
  332. Read the directory at the given cluster.
  333. """
  334. entries = []
  335. while cluster is not None:
  336. data = self.read_cluster(cluster)
  337. entries.extend(
  338. self.directory_from_bytes(
  339. data, self.boot_sector.first_sector_of_cluster(cluster)
  340. )
  341. )
  342. cluster = self.next_cluster(cluster)
  343. return entries
  344. def add_direntry(
  345. self, cluster: Optional[int], name: str, ext: str, attributes: int
  346. ) -> FatDirectoryEntry:
  347. """
  348. Add a new directory entry to the given cluster.
  349. If the cluster is `None`, then it will be added to the root directory.
  350. """
  351. def find_free_entry(data: bytes) -> Optional[int]:
  352. for i in range(0, len(data), DIRENTRY_SIZE):
  353. entry = data[i : i + DIRENTRY_SIZE]
  354. if entry[0] == 0 or entry[0] == 0xE5:
  355. return i
  356. return None
  357. assert len(name) <= 8, "Name must be 8 characters or less"
  358. assert len(ext) <= 3, "Ext must be 3 characters or less"
  359. assert attributes % 0x15 != 0x15, "Invalid attributes"
  360. # initial dummy data
  361. new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
  362. new_entry.name = name.ljust(8, " ")
  363. new_entry.ext = ext.ljust(3, " ")
  364. new_entry.attributes = attributes
  365. new_entry.reserved = 0
  366. new_entry.create_time_tenth = 0
  367. new_entry.create_time = 0
  368. new_entry.create_date = 0
  369. new_entry.last_access_date = 0
  370. new_entry.last_mod_time = 0
  371. new_entry.last_mod_date = 0
  372. new_entry.cluster = self.next_free_cluster()
  373. new_entry.size_bytes = 0
  374. # mark as EOF
  375. self.write_fat_entry(new_entry.cluster, 0xFFFF)
  376. if cluster is None:
  377. for i in range(self.boot_sector.root_dir_size()):
  378. sector_data = self.read_sectors(
  379. self.boot_sector.root_dir_start() + i, 1
  380. )
  381. offset = find_free_entry(sector_data)
  382. if offset is not None:
  383. new_entry.sector = self.boot_sector.root_dir_start() + i
  384. new_entry.offset = offset
  385. self.update_direntry(new_entry)
  386. return new_entry
  387. else:
  388. while cluster is not None:
  389. data = self.read_cluster(cluster)
  390. offset = find_free_entry(data)
  391. if offset is not None:
  392. new_entry.sector = (
  393. self.boot_sector.first_sector_of_cluster(cluster)
  394. + (offset // SECTOR_SIZE))
  395. new_entry.offset = offset % SECTOR_SIZE
  396. self.update_direntry(new_entry)
  397. return new_entry
  398. cluster = self.next_cluster(cluster)
  399. raise Exception("No free directory entries")
  400. def update_direntry(self, entry: FatDirectoryEntry) -> None:
  401. """
  402. Write the directory entry back to the disk.
  403. """
  404. sector = self.read_sectors(entry.sector, 1)
  405. sector = (
  406. sector[: entry.offset]
  407. + entry.as_bytes()
  408. + sector[entry.offset + DIRENTRY_SIZE :]
  409. )
  410. self.write_sectors(entry.sector, sector)
  411. def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]:
  412. """
  413. Find the directory entry for the given path.
  414. """
  415. assert path[0] == "/", "Path must start with /"
  416. path = path[1:] # remove the leading /
  417. parts = path.split("/")
  418. directory = self.read_root_directory()
  419. current_entry = None
  420. for i, part in enumerate(parts):
  421. is_last = i == len(parts) - 1
  422. for entry in directory:
  423. if entry.whole_name() == part:
  424. current_entry = entry
  425. break
  426. if current_entry is None:
  427. return None
  428. if is_last:
  429. return current_entry
  430. if current_entry.attributes & 0x10 == 0:
  431. raise Exception(
  432. f"{current_entry.whole_name()} is not a directory"
  433. )
  434. directory = self.read_directory(current_entry.cluster)
  435. assert False, "Exited loop with is_last == False"
  436. def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]:
  437. """
  438. Read the content of the file at the given path.
  439. """
  440. if entry is None:
  441. return None
  442. if entry.attributes & 0x10 != 0:
  443. raise Exception(f"{entry.whole_name()} is a directory")
  444. data = b""
  445. cluster: Optional[int] = entry.cluster
  446. while cluster is not None and len(data) <= entry.size_bytes:
  447. data += self.read_cluster(cluster)
  448. cluster = self.next_cluster(cluster)
  449. return data[: entry.size_bytes]
  450. def truncate_file(
  451. self,
  452. entry: FatDirectoryEntry,
  453. new_size: int,
  454. allocate_non_continuous: bool = False,
  455. ) -> None:
  456. """
  457. Truncate the file at the given path to the new size.
  458. """
  459. if entry is None:
  460. raise Exception("entry is None")
  461. if entry.attributes & 0x10 != 0:
  462. raise Exception(f"{entry.whole_name()} is a directory")
  463. def clusters_from_size(size: int) -> int:
  464. return (
  465. size + self.boot_sector.cluster_bytes() - 1
  466. ) // self.boot_sector.cluster_bytes()
  467. # First, allocate new FATs if we need to
  468. required_clusters = clusters_from_size(new_size)
  469. current_clusters = clusters_from_size(entry.size_bytes)
  470. affected_clusters = set()
  471. # Keep at least one cluster, easier to manage this way
  472. if required_clusters == 0:
  473. required_clusters = 1
  474. if current_clusters == 0:
  475. current_clusters = 1
  476. cluster: Optional[int]
  477. if required_clusters > current_clusters:
  478. # Allocate new clusters
  479. cluster = entry.cluster
  480. to_add = required_clusters
  481. for _ in range(current_clusters - 1):
  482. to_add -= 1
  483. assert cluster is not None, "Cluster is None"
  484. affected_clusters.add(cluster)
  485. cluster = self.next_cluster(cluster)
  486. assert required_clusters > 0, "No new clusters to allocate"
  487. assert cluster is not None, "Cluster is None"
  488. assert (
  489. self.next_cluster(cluster) is None
  490. ), "Cluster is not the last cluster"
  491. # Allocate new clusters
  492. for _ in range(to_add - 1):
  493. if allocate_non_continuous:
  494. new_cluster = self.next_free_cluster_non_continuous()
  495. else:
  496. new_cluster = self.next_free_cluster()
  497. self.write_fat_entry(cluster, new_cluster)
  498. self.write_fat_entry(new_cluster, 0xFFFF)
  499. cluster = new_cluster
  500. elif required_clusters < current_clusters:
  501. # Truncate the file
  502. cluster = entry.cluster
  503. for _ in range(required_clusters - 1):
  504. assert cluster is not None, "Cluster is None"
  505. cluster = self.next_cluster(cluster)
  506. assert cluster is not None, "Cluster is None"
  507. next_cluster = self.next_cluster(cluster)
  508. # mark last as EOF
  509. self.write_fat_entry(cluster, 0xFFFF)
  510. # free the rest
  511. while next_cluster is not None:
  512. cluster = next_cluster
  513. next_cluster = self.next_cluster(next_cluster)
  514. self.write_fat_entry(cluster, 0)
  515. self.flush_fats()
  516. # verify number of clusters
  517. cluster = entry.cluster
  518. count = 0
  519. while cluster is not None:
  520. count += 1
  521. affected_clusters.add(cluster)
  522. cluster = self.next_cluster(cluster)
  523. assert (
  524. count == required_clusters
  525. ), f"Expected {required_clusters} clusters, got {count}"
  526. # update the size
  527. entry.size_bytes = new_size
  528. self.update_direntry(entry)
  529. # trigger every affected cluster
  530. for cluster in affected_clusters:
  531. first_sector = self.boot_sector.first_sector_of_cluster(cluster)
  532. first_sector_data = self.read_sectors(first_sector, 1)
  533. self.write_sectors(first_sector, first_sector_data)
  534. def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None:
  535. """
  536. Write the content of the file at the given path.
  537. """
  538. if entry is None:
  539. raise Exception("entry is None")
  540. if entry.attributes & 0x10 != 0:
  541. raise Exception(f"{entry.whole_name()} is a directory")
  542. data_len = len(data)
  543. self.truncate_file(entry, data_len)
  544. cluster: Optional[int] = entry.cluster
  545. while cluster is not None:
  546. data_to_write = data[: self.boot_sector.cluster_bytes()]
  547. if len(data_to_write) < self.boot_sector.cluster_bytes():
  548. old_data = self.read_cluster(cluster)
  549. data_to_write += old_data[len(data_to_write) :]
  550. self.write_cluster(cluster, data_to_write)
  551. data = data[self.boot_sector.cluster_bytes() :]
  552. if len(data) == 0:
  553. break
  554. cluster = self.next_cluster(cluster)
  555. assert (
  556. len(data) == 0
  557. ), "Data was not written completely, clusters missing"
  558. def create_file(self, path: str) -> Optional[FatDirectoryEntry]:
  559. """
  560. Create a new file at the given path.
  561. """
  562. assert path[0] == "/", "Path must start with /"
  563. path = path[1:] # remove the leading /
  564. parts = path.split("/")
  565. directory_cluster = None
  566. directory = self.read_root_directory()
  567. parts, filename = parts[:-1], parts[-1]
  568. for _, part in enumerate(parts):
  569. current_entry = None
  570. for entry in directory:
  571. if entry.whole_name() == part:
  572. current_entry = entry
  573. break
  574. if current_entry is None:
  575. return None
  576. if current_entry.attributes & 0x10 == 0:
  577. raise Exception(
  578. f"{current_entry.whole_name()} is not a directory"
  579. )
  580. directory = self.read_directory(current_entry.cluster)
  581. directory_cluster = current_entry.cluster
  582. # add new entry to the directory
  583. filename, ext = filename.split(".")
  584. if len(ext) > 3:
  585. raise Exception("Ext must be 3 characters or less")
  586. if len(filename) > 8:
  587. raise Exception("Name must be 8 characters or less")
  588. for c in filename + ext:
  589. if c not in ALLOWED_FILE_CHARS:
  590. raise Exception("Invalid character in filename")
  591. return self.add_direntry(directory_cluster, filename, ext, 0)