123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- from __future__ import annotations
- import configparser
- import os
- import threading
- import re
- from modules import shared, errors, cache, scripts
- from modules.gitpython_hack import Repo
- from modules.paths_internal import extensions_dir, extensions_builtin_dir, script_path # noqa: F401
- os.makedirs(extensions_dir, exist_ok=True)
- def active():
- if shared.cmd_opts.disable_all_extensions or shared.opts.disable_all_extensions == "all":
- return []
- elif shared.cmd_opts.disable_extra_extensions or shared.opts.disable_all_extensions == "extra":
- return [x for x in extensions if x.enabled and x.is_builtin]
- else:
- return [x for x in extensions if x.enabled]
- class ExtensionMetadata:
- filename = "metadata.ini"
- config: configparser.ConfigParser
- canonical_name: str
- requires: list
- def __init__(self, path, canonical_name):
- self.config = configparser.ConfigParser()
- filepath = os.path.join(path, self.filename)
- # `self.config.read()` will quietly swallow OSErrors (which FileNotFoundError is),
- # so no need to check whether the file exists beforehand.
- try:
- self.config.read(filepath)
- except Exception:
- errors.report(f"Error reading {self.filename} for extension {canonical_name}.", exc_info=True)
- self.canonical_name = self.config.get("Extension", "Name", fallback=canonical_name)
- self.canonical_name = canonical_name.lower().strip()
- self.requires = self.get_script_requirements("Requires", "Extension")
- def get_script_requirements(self, field, section, extra_section=None):
- """reads a list of requirements from the config; field is the name of the field in the ini file,
- like Requires or Before, and section is the name of the [section] in the ini file; additionally,
- reads more requirements from [extra_section] if specified."""
- x = self.config.get(section, field, fallback='')
- if extra_section:
- x = x + ', ' + self.config.get(extra_section, field, fallback='')
- return self.parse_list(x.lower())
- def parse_list(self, text):
- """converts a line from config ("ext1 ext2, ext3 ") into a python list (["ext1", "ext2", "ext3"])"""
- if not text:
- return []
- # both "," and " " are accepted as separator
- return [x for x in re.split(r"[,\s]+", text.strip()) if x]
- class Extension:
- lock = threading.Lock()
- cached_fields = ['remote', 'commit_date', 'branch', 'commit_hash', 'version']
- metadata: ExtensionMetadata
- def __init__(self, name, path, enabled=True, is_builtin=False, metadata=None):
- self.name = name
- self.path = path
- self.enabled = enabled
- self.status = ''
- self.can_update = False
- self.is_builtin = is_builtin
- self.commit_hash = ''
- self.commit_date = None
- self.version = ''
- self.branch = None
- self.remote = None
- self.have_info_from_repo = False
- self.metadata = metadata if metadata else ExtensionMetadata(self.path, name.lower())
- self.canonical_name = metadata.canonical_name
- def to_dict(self):
- return {x: getattr(self, x) for x in self.cached_fields}
- def from_dict(self, d):
- for field in self.cached_fields:
- setattr(self, field, d[field])
- def read_info_from_repo(self):
- if self.is_builtin or self.have_info_from_repo:
- return
- def read_from_repo():
- with self.lock:
- if self.have_info_from_repo:
- return
- self.do_read_info_from_repo()
- return self.to_dict()
- try:
- d = cache.cached_data_for_file('extensions-git', self.name, os.path.join(self.path, ".git"), read_from_repo)
- self.from_dict(d)
- except FileNotFoundError:
- pass
- self.status = 'unknown' if self.status == '' else self.status
- def do_read_info_from_repo(self):
- repo = None
- try:
- if os.path.exists(os.path.join(self.path, ".git")):
- repo = Repo(self.path)
- except Exception:
- errors.report(f"Error reading github repository info from {self.path}", exc_info=True)
- if repo is None or repo.bare:
- self.remote = None
- else:
- try:
- self.remote = next(repo.remote().urls, None)
- commit = repo.head.commit
- self.commit_date = commit.committed_date
- if repo.active_branch:
- self.branch = repo.active_branch.name
- self.commit_hash = commit.hexsha
- self.version = self.commit_hash[:8]
- except Exception:
- errors.report(f"Failed reading extension data from Git repository ({self.name})", exc_info=True)
- self.remote = None
- self.have_info_from_repo = True
- def list_files(self, subdir, extension):
- dirpath = os.path.join(self.path, subdir)
- if not os.path.isdir(dirpath):
- return []
- res = []
- for filename in sorted(os.listdir(dirpath)):
- res.append(scripts.ScriptFile(self.path, filename, os.path.join(dirpath, filename)))
- res = [x for x in res if os.path.splitext(x.path)[1].lower() == extension and os.path.isfile(x.path)]
- return res
- def check_updates(self):
- repo = Repo(self.path)
- for fetch in repo.remote().fetch(dry_run=True):
- if fetch.flags != fetch.HEAD_UPTODATE:
- self.can_update = True
- self.status = "new commits"
- return
- try:
- origin = repo.rev_parse('origin')
- if repo.head.commit != origin:
- self.can_update = True
- self.status = "behind HEAD"
- return
- except Exception:
- self.can_update = False
- self.status = "unknown (remote error)"
- return
- self.can_update = False
- self.status = "latest"
- def fetch_and_reset_hard(self, commit='origin'):
- repo = Repo(self.path)
- # Fix: `error: Your local changes to the following files would be overwritten by merge`,
- # because WSL2 Docker set 755 file permissions instead of 644, this results to the error.
- repo.git.fetch(all=True)
- repo.git.reset(commit, hard=True)
- self.have_info_from_repo = False
- def list_extensions():
- extensions.clear()
- if shared.cmd_opts.disable_all_extensions:
- print("*** \"--disable-all-extensions\" arg was used, will not load any extensions ***")
- elif shared.opts.disable_all_extensions == "all":
- print("*** \"Disable all extensions\" option was set, will not load any extensions ***")
- elif shared.cmd_opts.disable_extra_extensions:
- print("*** \"--disable-extra-extensions\" arg was used, will only load built-in extensions ***")
- elif shared.opts.disable_all_extensions == "extra":
- print("*** \"Disable all extensions\" option was set, will only load built-in extensions ***")
- loaded_extensions = {}
- # scan through extensions directory and load metadata
- for dirname in [extensions_builtin_dir, extensions_dir]:
- if not os.path.isdir(dirname):
- continue
- for extension_dirname in sorted(os.listdir(dirname)):
- path = os.path.join(dirname, extension_dirname)
- if not os.path.isdir(path):
- continue
- canonical_name = extension_dirname
- metadata = ExtensionMetadata(path, canonical_name)
- # check for duplicated canonical names
- already_loaded_extension = loaded_extensions.get(metadata.canonical_name)
- if already_loaded_extension is not None:
- errors.report(f'Duplicate canonical name "{canonical_name}" found in extensions "{extension_dirname}" and "{already_loaded_extension.name}". Former will be discarded.', exc_info=False)
- continue
- is_builtin = dirname == extensions_builtin_dir
- extension = Extension(name=extension_dirname, path=path, enabled=extension_dirname not in shared.opts.disabled_extensions, is_builtin=is_builtin, metadata=metadata)
- extensions.append(extension)
- loaded_extensions[canonical_name] = extension
- # check for requirements
- for extension in extensions:
- if not extension.enabled:
- continue
- for req in extension.metadata.requires:
- required_extension = loaded_extensions.get(req)
- if required_extension is None:
- errors.report(f'Extension "{extension.name}" requires "{req}" which is not installed.', exc_info=False)
- continue
- if not required_extension.enabled:
- errors.report(f'Extension "{extension.name}" requires "{required_extension.name}" which is disabled.', exc_info=False)
- continue
- extensions: list[Extension] = []
|