123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690 |
- # A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
- #
- # Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- from typing import Callable, List, Optional, Protocol, Set
- import string
- SECTOR_SIZE = 512
- DIRENTRY_SIZE = 32
- ALLOWED_FILE_CHARS = set(
- "!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase
- )
- class MBR:
- def __init__(self, data: bytes):
- assert len(data) == 512
- self.partition_table = []
- for i in range(4):
- partition = data[446 + i * 16 : 446 + (i + 1) * 16]
- self.partition_table.append(
- {
- "status": partition[0],
- "start_head": partition[1],
- "start_sector": partition[2] & 0x3F,
- "start_cylinder": ((partition[2] & 0xC0) << 2)
- | partition[3],
- "type": partition[4],
- "end_head": partition[5],
- "end_sector": partition[6] & 0x3F,
- "end_cylinder": ((partition[6] & 0xC0) << 2)
- | partition[7],
- "start_lba": int.from_bytes(partition[8:12], "little"),
- "size": int.from_bytes(partition[12:16], "little"),
- }
- )
- def __str__(self):
- return "\n".join(
- [
- f"{i}: {partition}"
- for i, partition in enumerate(self.partition_table)
- ]
- )
- class FatBootSector:
- # pylint: disable=too-many-instance-attributes
- def __init__(self, data: bytes):
- assert len(data) == 512
- self.bytes_per_sector = int.from_bytes(data[11:13], "little")
- self.sectors_per_cluster = data[13]
- self.reserved_sectors = int.from_bytes(data[14:16], "little")
- self.fat_count = data[16]
- self.root_entries = int.from_bytes(data[17:19], "little")
- total_sectors_16 = int.from_bytes(data[19:21], "little")
- self.media_descriptor = data[21]
- self.sectors_per_fat = int.from_bytes(data[22:24], "little")
- self.sectors_per_track = int.from_bytes(data[24:26], "little")
- self.heads = int.from_bytes(data[26:28], "little")
- self.hidden_sectors = int.from_bytes(data[28:32], "little")
- total_sectors_32 = int.from_bytes(data[32:36], "little")
- assert (
- total_sectors_16 == 0 or total_sectors_32 == 0
- ), "Both total sectors (16 and 32) fields are non-zero"
- self.total_sectors = total_sectors_16 or total_sectors_32
- self.drive_number = data[36]
- self.volume_id = int.from_bytes(data[39:43], "little")
- self.volume_label = data[43:54].decode("ascii").strip()
- self.fs_type = data[54:62].decode("ascii").strip()
- def root_dir_start(self):
- """
- Calculate the start sector of the root directory.
- """
- return self.reserved_sectors + self.fat_count * self.sectors_per_fat
- def root_dir_size(self):
- """
- Calculate the size of the root directory in sectors.
- """
- return (
- self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
- ) // self.bytes_per_sector
- def data_sector_start(self):
- """
- Calculate the start sector of the data region.
- """
- return self.root_dir_start() + self.root_dir_size()
- def first_sector_of_cluster(self, cluster: int) -> int:
- """
- Calculate the first sector of the given cluster.
- """
- return (
- self.data_sector_start() + (cluster - 2) * self.sectors_per_cluster
- )
- def cluster_bytes(self):
- """
- Calculate the number of bytes in a cluster.
- """
- return self.bytes_per_sector * self.sectors_per_cluster
- def __str__(self):
- return (
- f"Bytes per sector: {self.bytes_per_sector}\n"
- f"Sectors per cluster: {self.sectors_per_cluster}\n"
- f"Reserved sectors: {self.reserved_sectors}\n"
- f"FAT count: {self.fat_count}\n"
- f"Root entries: {self.root_entries}\n"
- f"Total sectors: {self.total_sectors}\n"
- f"Media descriptor: {self.media_descriptor}\n"
- f"Sectors per FAT: {self.sectors_per_fat}\n"
- f"Sectors per track: {self.sectors_per_track}\n"
- f"Heads: {self.heads}\n"
- f"Hidden sectors: {self.hidden_sectors}\n"
- f"Drive number: {self.drive_number}\n"
- f"Volume ID: {self.volume_id}\n"
- f"Volume label: {self.volume_label}\n"
- f"FS type: {self.fs_type}\n"
- )
- class FatDirectoryEntry:
- # pylint: disable=too-many-instance-attributes
- def __init__(self, data: bytes, sector: int, offset: int):
- self.name = data[0:8].decode("ascii").strip()
- self.ext = data[8:11].decode("ascii").strip()
- self.attributes = data[11]
- self.reserved = data[12]
- self.create_time_tenth = data[13]
- self.create_time = int.from_bytes(data[14:16], "little")
- self.create_date = int.from_bytes(data[16:18], "little")
- self.last_access_date = int.from_bytes(data[18:20], "little")
- high_cluster = int.from_bytes(data[20:22], "little")
- self.last_mod_time = int.from_bytes(data[22:24], "little")
- self.last_mod_date = int.from_bytes(data[24:26], "little")
- low_cluster = int.from_bytes(data[26:28], "little")
- self.cluster = (high_cluster << 16) | low_cluster
- self.size_bytes = int.from_bytes(data[28:32], "little")
- # extra (to help write back to disk)
- self.sector = sector
- self.offset = offset
- def as_bytes(self) -> bytes:
- return (
- self.name.ljust(8, " ").encode("ascii")
- + self.ext.ljust(3, " ").encode("ascii")
- + self.attributes.to_bytes(1, "little")
- + self.reserved.to_bytes(1, "little")
- + self.create_time_tenth.to_bytes(1, "little")
- + self.create_time.to_bytes(2, "little")
- + self.create_date.to_bytes(2, "little")
- + self.last_access_date.to_bytes(2, "little")
- + (self.cluster >> 16).to_bytes(2, "little")
- + self.last_mod_time.to_bytes(2, "little")
- + self.last_mod_date.to_bytes(2, "little")
- + (self.cluster & 0xFFFF).to_bytes(2, "little")
- + self.size_bytes.to_bytes(4, "little")
- )
- def whole_name(self):
- if self.ext:
- return f"{self.name}.{self.ext}"
- else:
- return self.name
- def __str__(self):
- return (
- f"Name: {self.name}\n"
- f"Ext: {self.ext}\n"
- f"Attributes: {self.attributes}\n"
- f"Reserved: {self.reserved}\n"
- f"Create time tenth: {self.create_time_tenth}\n"
- f"Create time: {self.create_time}\n"
- f"Create date: {self.create_date}\n"
- f"Last access date: {self.last_access_date}\n"
- f"Last mod time: {self.last_mod_time}\n"
- f"Last mod date: {self.last_mod_date}\n"
- f"Cluster: {self.cluster}\n"
- f"Size: {self.size_bytes}\n"
- )
- def __repr__(self):
- # convert to dict
- return str(vars(self))
- class SectorReader(Protocol):
- def __call__(self, start_sector: int, num_sectors: int = 1) -> bytes: ...
- # pylint: disable=broad-exception-raised
- class Fat16:
- def __init__(
- self,
- start_sector: int,
- size: int,
- sector_reader: SectorReader,
- sector_writer: Callable[[int, bytes], None]
- ):
- self.start_sector = start_sector
- self.size_in_sectors = size
- self.sector_reader = sector_reader
- self.sector_writer = sector_writer
- self.boot_sector = FatBootSector(self.sector_reader(start_sector, 1))
- fat_size_in_sectors = (
- self.boot_sector.sectors_per_fat * self.boot_sector.fat_count
- )
- self.fats = self.read_sectors(
- self.boot_sector.reserved_sectors, fat_size_in_sectors
- )
- self.fats_dirty_sectors: Set[int] = set()
- def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
- return self.sector_reader(start_sector + self.start_sector,
- num_sectors)
- def write_sectors(self, start_sector: int, data: bytes) -> None:
- return self.sector_writer(start_sector + self.start_sector, data)
- def directory_from_bytes(
- self, data: bytes, start_sector: int
- ) -> List[FatDirectoryEntry]:
- """
- Convert `bytes` into a list of `FatDirectoryEntry` objects.
- Will ignore long file names.
- Will stop when it encounters a 0x00 byte.
- """
- entries = []
- for i in range(0, len(data), DIRENTRY_SIZE):
- entry = data[i : i + DIRENTRY_SIZE]
- current_sector = start_sector + (i // SECTOR_SIZE)
- current_offset = i % SECTOR_SIZE
- if entry[0] == 0:
- break
- if entry[0] == 0xE5:
- # Deleted file
- continue
- if entry[11] & 0xF == 0xF:
- # Long file name
- continue
- entries.append(
- FatDirectoryEntry(entry, current_sector, current_offset)
- )
- return entries
- def read_root_directory(self) -> List[FatDirectoryEntry]:
- root_dir = self.read_sectors(
- self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
- )
- return self.directory_from_bytes(
- root_dir, self.boot_sector.root_dir_start()
- )
- def read_fat_entry(self, cluster: int) -> int:
- """
- Read the FAT entry for the given cluster.
- """
- fat_offset = cluster * 2 # FAT16
- return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
- def write_fat_entry(self, cluster: int, value: int) -> None:
- """
- Write the FAT entry for the given cluster.
- """
- fat_offset = cluster * 2
- self.fats = (
- self.fats[:fat_offset]
- + value.to_bytes(2, "little")
- + self.fats[fat_offset + 2 :]
- )
- self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
- def flush_fats(self) -> None:
- """
- Write the FATs back to the disk.
- """
- for sector in self.fats_dirty_sectors:
- data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
- sector = self.boot_sector.reserved_sectors + sector
- self.write_sectors(sector, data)
- self.fats_dirty_sectors = set()
- def next_cluster(self, cluster: int) -> Optional[int]:
- """
- Get the next cluster in the chain.
- If its `None`, then its the last cluster.
- The function will crash if the next cluster
- is `FREE` (unexpected) or invalid entry.
- """
- fat_entry = self.read_fat_entry(cluster)
- if fat_entry == 0:
- raise Exception("Unexpected: FREE cluster")
- if fat_entry == 1:
- raise Exception("Unexpected: RESERVED cluster")
- if fat_entry >= 0xFFF8:
- return None
- if fat_entry >= 0xFFF7:
- raise Exception("Invalid FAT entry")
- return fat_entry
- def next_free_cluster(self) -> int:
- """
- Find the next free cluster.
- """
- # simple linear search
- for i in range(2, 0xFFFF):
- if self.read_fat_entry(i) == 0:
- return i
- raise Exception("No free clusters")
- def next_free_cluster_non_continuous(self) -> int:
- """
- Find the next free cluster, but makes sure
- that the cluster before and after it are not allocated.
- """
- # simple linear search
- before = False
- for i in range(2, 0xFFFF):
- if self.read_fat_entry(i) == 0:
- if before and self.read_fat_entry(i + 1) == 0:
- return i
- else:
- before = True
- else:
- before = False
- raise Exception("No free clusters")
- def read_cluster(self, cluster: int) -> bytes:
- """
- Read the cluster at the given cluster.
- """
- return self.read_sectors(
- self.boot_sector.first_sector_of_cluster(cluster),
- self.boot_sector.sectors_per_cluster,
- )
- def write_cluster(self, cluster: int, data: bytes) -> None:
- """
- Write the cluster at the given cluster.
- """
- assert len(data) == self.boot_sector.cluster_bytes()
- self.write_sectors(
- self.boot_sector.first_sector_of_cluster(cluster),
- data,
- )
- def read_directory(
- self, cluster: Optional[int]
- ) -> List[FatDirectoryEntry]:
- """
- Read the directory at the given cluster.
- """
- entries = []
- while cluster is not None:
- data = self.read_cluster(cluster)
- entries.extend(
- self.directory_from_bytes(
- data, self.boot_sector.first_sector_of_cluster(cluster)
- )
- )
- cluster = self.next_cluster(cluster)
- return entries
- def add_direntry(
- self, cluster: Optional[int], name: str, ext: str, attributes: int
- ) -> FatDirectoryEntry:
- """
- Add a new directory entry to the given cluster.
- If the cluster is `None`, then it will be added to the root directory.
- """
- def find_free_entry(data: bytes) -> Optional[int]:
- for i in range(0, len(data), DIRENTRY_SIZE):
- entry = data[i : i + DIRENTRY_SIZE]
- if entry[0] == 0 or entry[0] == 0xE5:
- return i
- return None
- assert len(name) <= 8, "Name must be 8 characters or less"
- assert len(ext) <= 3, "Ext must be 3 characters or less"
- assert attributes % 0x15 != 0x15, "Invalid attributes"
- # initial dummy data
- new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
- new_entry.name = name.ljust(8, " ")
- new_entry.ext = ext.ljust(3, " ")
- new_entry.attributes = attributes
- new_entry.reserved = 0
- new_entry.create_time_tenth = 0
- new_entry.create_time = 0
- new_entry.create_date = 0
- new_entry.last_access_date = 0
- new_entry.last_mod_time = 0
- new_entry.last_mod_date = 0
- new_entry.cluster = self.next_free_cluster()
- new_entry.size_bytes = 0
- # mark as EOF
- self.write_fat_entry(new_entry.cluster, 0xFFFF)
- if cluster is None:
- for i in range(self.boot_sector.root_dir_size()):
- sector_data = self.read_sectors(
- self.boot_sector.root_dir_start() + i, 1
- )
- offset = find_free_entry(sector_data)
- if offset is not None:
- new_entry.sector = self.boot_sector.root_dir_start() + i
- new_entry.offset = offset
- self.update_direntry(new_entry)
- return new_entry
- else:
- while cluster is not None:
- data = self.read_cluster(cluster)
- offset = find_free_entry(data)
- if offset is not None:
- new_entry.sector = (
- self.boot_sector.first_sector_of_cluster(cluster)
- + (offset // SECTOR_SIZE))
- new_entry.offset = offset % SECTOR_SIZE
- self.update_direntry(new_entry)
- return new_entry
- cluster = self.next_cluster(cluster)
- raise Exception("No free directory entries")
- def update_direntry(self, entry: FatDirectoryEntry) -> None:
- """
- Write the directory entry back to the disk.
- """
- sector = self.read_sectors(entry.sector, 1)
- sector = (
- sector[: entry.offset]
- + entry.as_bytes()
- + sector[entry.offset + DIRENTRY_SIZE :]
- )
- self.write_sectors(entry.sector, sector)
- def find_direntry(self, path: str) -> Optional[FatDirectoryEntry]:
- """
- Find the directory entry for the given path.
- """
- assert path[0] == "/", "Path must start with /"
- path = path[1:] # remove the leading /
- parts = path.split("/")
- directory = self.read_root_directory()
- current_entry = None
- for i, part in enumerate(parts):
- is_last = i == len(parts) - 1
- for entry in directory:
- if entry.whole_name() == part:
- current_entry = entry
- break
- if current_entry is None:
- return None
- if is_last:
- return current_entry
- if current_entry.attributes & 0x10 == 0:
- raise Exception(
- f"{current_entry.whole_name()} is not a directory"
- )
- directory = self.read_directory(current_entry.cluster)
- assert False, "Exited loop with is_last == False"
- def read_file(self, entry: Optional[FatDirectoryEntry]) -> Optional[bytes]:
- """
- Read the content of the file at the given path.
- """
- if entry is None:
- return None
- if entry.attributes & 0x10 != 0:
- raise Exception(f"{entry.whole_name()} is a directory")
- data = b""
- cluster: Optional[int] = entry.cluster
- while cluster is not None and len(data) <= entry.size_bytes:
- data += self.read_cluster(cluster)
- cluster = self.next_cluster(cluster)
- return data[: entry.size_bytes]
- def truncate_file(
- self,
- entry: FatDirectoryEntry,
- new_size: int,
- allocate_non_continuous: bool = False,
- ) -> None:
- """
- Truncate the file at the given path to the new size.
- """
- if entry is None:
- raise Exception("entry is None")
- if entry.attributes & 0x10 != 0:
- raise Exception(f"{entry.whole_name()} is a directory")
- def clusters_from_size(size: int) -> int:
- return (
- size + self.boot_sector.cluster_bytes() - 1
- ) // self.boot_sector.cluster_bytes()
- # First, allocate new FATs if we need to
- required_clusters = clusters_from_size(new_size)
- current_clusters = clusters_from_size(entry.size_bytes)
- affected_clusters = set()
- # Keep at least one cluster, easier to manage this way
- if required_clusters == 0:
- required_clusters = 1
- if current_clusters == 0:
- current_clusters = 1
- cluster: Optional[int]
- if required_clusters > current_clusters:
- # Allocate new clusters
- cluster = entry.cluster
- to_add = required_clusters
- for _ in range(current_clusters - 1):
- to_add -= 1
- assert cluster is not None, "Cluster is None"
- affected_clusters.add(cluster)
- cluster = self.next_cluster(cluster)
- assert required_clusters > 0, "No new clusters to allocate"
- assert cluster is not None, "Cluster is None"
- assert (
- self.next_cluster(cluster) is None
- ), "Cluster is not the last cluster"
- # Allocate new clusters
- for _ in range(to_add - 1):
- if allocate_non_continuous:
- new_cluster = self.next_free_cluster_non_continuous()
- else:
- new_cluster = self.next_free_cluster()
- self.write_fat_entry(cluster, new_cluster)
- self.write_fat_entry(new_cluster, 0xFFFF)
- cluster = new_cluster
- elif required_clusters < current_clusters:
- # Truncate the file
- cluster = entry.cluster
- for _ in range(required_clusters - 1):
- assert cluster is not None, "Cluster is None"
- cluster = self.next_cluster(cluster)
- assert cluster is not None, "Cluster is None"
- next_cluster = self.next_cluster(cluster)
- # mark last as EOF
- self.write_fat_entry(cluster, 0xFFFF)
- # free the rest
- while next_cluster is not None:
- cluster = next_cluster
- next_cluster = self.next_cluster(next_cluster)
- self.write_fat_entry(cluster, 0)
- self.flush_fats()
- # verify number of clusters
- cluster = entry.cluster
- count = 0
- while cluster is not None:
- count += 1
- affected_clusters.add(cluster)
- cluster = self.next_cluster(cluster)
- assert (
- count == required_clusters
- ), f"Expected {required_clusters} clusters, got {count}"
- # update the size
- entry.size_bytes = new_size
- self.update_direntry(entry)
- # trigger every affected cluster
- for cluster in affected_clusters:
- first_sector = self.boot_sector.first_sector_of_cluster(cluster)
- first_sector_data = self.read_sectors(first_sector, 1)
- self.write_sectors(first_sector, first_sector_data)
- def write_file(self, entry: FatDirectoryEntry, data: bytes) -> None:
- """
- Write the content of the file at the given path.
- """
- if entry is None:
- raise Exception("entry is None")
- if entry.attributes & 0x10 != 0:
- raise Exception(f"{entry.whole_name()} is a directory")
- data_len = len(data)
- self.truncate_file(entry, data_len)
- cluster: Optional[int] = entry.cluster
- while cluster is not None:
- data_to_write = data[: self.boot_sector.cluster_bytes()]
- if len(data_to_write) < self.boot_sector.cluster_bytes():
- old_data = self.read_cluster(cluster)
- data_to_write += old_data[len(data_to_write) :]
- self.write_cluster(cluster, data_to_write)
- data = data[self.boot_sector.cluster_bytes() :]
- if len(data) == 0:
- break
- cluster = self.next_cluster(cluster)
- assert (
- len(data) == 0
- ), "Data was not written completely, clusters missing"
- def create_file(self, path: str) -> Optional[FatDirectoryEntry]:
- """
- Create a new file at the given path.
- """
- assert path[0] == "/", "Path must start with /"
- path = path[1:] # remove the leading /
- parts = path.split("/")
- directory_cluster = None
- directory = self.read_root_directory()
- parts, filename = parts[:-1], parts[-1]
- for _, part in enumerate(parts):
- current_entry = None
- for entry in directory:
- if entry.whole_name() == part:
- current_entry = entry
- break
- if current_entry is None:
- return None
- if current_entry.attributes & 0x10 == 0:
- raise Exception(
- f"{current_entry.whole_name()} is not a directory"
- )
- directory = self.read_directory(current_entry.cluster)
- directory_cluster = current_entry.cluster
- # add new entry to the directory
- filename, ext = filename.split(".")
- if len(ext) > 3:
- raise Exception("Ext must be 3 characters or less")
- if len(filename) > 8:
- raise Exception("Name must be 8 characters or less")
- for c in filename + ext:
- if c not in ALLOWED_FILE_CHARS:
- raise Exception("Invalid character in filename")
- return self.add_direntry(directory_cluster, filename, ext, 0)
|