extensions.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. from __future__ import annotations
  2. import configparser
  3. import dataclasses
  4. import os
  5. import threading
  6. import re
  7. from modules import shared, errors, cache, scripts
  8. from modules.gitpython_hack import Repo
  9. from modules.paths_internal import extensions_dir, extensions_builtin_dir, script_path # noqa: F401
  10. extensions: list[Extension] = []
  11. extension_paths: dict[str, Extension] = {}
  12. loaded_extensions: dict[str, Exception] = {}
  13. os.makedirs(extensions_dir, exist_ok=True)
  14. def active():
  15. if shared.cmd_opts.disable_all_extensions or shared.opts.disable_all_extensions == "all":
  16. return []
  17. elif shared.cmd_opts.disable_extra_extensions or shared.opts.disable_all_extensions == "extra":
  18. return [x for x in extensions if x.enabled and x.is_builtin]
  19. else:
  20. return [x for x in extensions if x.enabled]
  21. @dataclasses.dataclass
  22. class CallbackOrderInfo:
  23. name: str
  24. before: list
  25. after: list
  26. class ExtensionMetadata:
  27. filename = "metadata.ini"
  28. config: configparser.ConfigParser
  29. canonical_name: str
  30. requires: list
  31. def __init__(self, path, canonical_name):
  32. self.config = configparser.ConfigParser()
  33. filepath = os.path.join(path, self.filename)
  34. # `self.config.read()` will quietly swallow OSErrors (which FileNotFoundError is),
  35. # so no need to check whether the file exists beforehand.
  36. try:
  37. self.config.read(filepath)
  38. except Exception:
  39. errors.report(f"Error reading {self.filename} for extension {canonical_name}.", exc_info=True)
  40. self.canonical_name = self.config.get("Extension", "Name", fallback=canonical_name)
  41. self.canonical_name = canonical_name.lower().strip()
  42. self.requires = None
  43. def get_script_requirements(self, field, section, extra_section=None):
  44. """reads a list of requirements from the config; field is the name of the field in the ini file,
  45. like Requires or Before, and section is the name of the [section] in the ini file; additionally,
  46. reads more requirements from [extra_section] if specified."""
  47. x = self.config.get(section, field, fallback='')
  48. if extra_section:
  49. x = x + ', ' + self.config.get(extra_section, field, fallback='')
  50. listed_requirements = self.parse_list(x.lower())
  51. res = []
  52. for requirement in listed_requirements:
  53. loaded_requirements = (x for x in requirement.split("|") if x in loaded_extensions)
  54. relevant_requirement = next(loaded_requirements, requirement)
  55. res.append(relevant_requirement)
  56. return res
  57. def parse_list(self, text):
  58. """converts a line from config ("ext1 ext2, ext3 ") into a python list (["ext1", "ext2", "ext3"])"""
  59. if not text:
  60. return []
  61. # both "," and " " are accepted as separator
  62. return [x for x in re.split(r"[,\s]+", text.strip()) if x]
  63. def list_callback_order_instructions(self):
  64. for section in self.config.sections():
  65. if not section.startswith("callbacks/"):
  66. continue
  67. callback_name = section[10:]
  68. if not callback_name.startswith(self.canonical_name):
  69. errors.report(f"Callback order section for extension {self.canonical_name} is referencing the wrong extension: {section}")
  70. continue
  71. before = self.parse_list(self.config.get(section, 'Before', fallback=''))
  72. after = self.parse_list(self.config.get(section, 'After', fallback=''))
  73. yield CallbackOrderInfo(callback_name, before, after)
  74. class Extension:
  75. lock = threading.Lock()
  76. cached_fields = ['remote', 'commit_date', 'branch', 'commit_hash', 'version']
  77. metadata: ExtensionMetadata
  78. def __init__(self, name, path, enabled=True, is_builtin=False, metadata=None):
  79. self.name = name
  80. self.path = path
  81. self.enabled = enabled
  82. self.status = ''
  83. self.can_update = False
  84. self.is_builtin = is_builtin
  85. self.commit_hash = ''
  86. self.commit_date = None
  87. self.version = ''
  88. self.branch = None
  89. self.remote = None
  90. self.have_info_from_repo = False
  91. self.metadata = metadata if metadata else ExtensionMetadata(self.path, name.lower())
  92. self.canonical_name = metadata.canonical_name
  93. def to_dict(self):
  94. return {x: getattr(self, x) for x in self.cached_fields}
  95. def from_dict(self, d):
  96. for field in self.cached_fields:
  97. setattr(self, field, d[field])
  98. def read_info_from_repo(self):
  99. if self.is_builtin or self.have_info_from_repo:
  100. return
  101. def read_from_repo():
  102. with self.lock:
  103. if self.have_info_from_repo:
  104. return
  105. self.do_read_info_from_repo()
  106. return self.to_dict()
  107. try:
  108. d = cache.cached_data_for_file('extensions-git', self.name, os.path.join(self.path, ".git"), read_from_repo)
  109. self.from_dict(d)
  110. except FileNotFoundError:
  111. pass
  112. self.status = 'unknown' if self.status == '' else self.status
  113. def do_read_info_from_repo(self):
  114. repo = None
  115. try:
  116. if os.path.exists(os.path.join(self.path, ".git")):
  117. repo = Repo(self.path)
  118. except Exception:
  119. errors.report(f"Error reading github repository info from {self.path}", exc_info=True)
  120. if repo is None or repo.bare:
  121. self.remote = None
  122. else:
  123. try:
  124. self.remote = next(repo.remote().urls, None)
  125. commit = repo.head.commit
  126. self.commit_date = commit.committed_date
  127. if repo.active_branch:
  128. self.branch = repo.active_branch.name
  129. self.commit_hash = commit.hexsha
  130. self.version = self.commit_hash[:8]
  131. except Exception:
  132. errors.report(f"Failed reading extension data from Git repository ({self.name})", exc_info=True)
  133. self.remote = None
  134. self.have_info_from_repo = True
  135. def list_files(self, subdir, extension):
  136. dirpath = os.path.join(self.path, subdir)
  137. if not os.path.isdir(dirpath):
  138. return []
  139. res = []
  140. for filename in sorted(os.listdir(dirpath)):
  141. res.append(scripts.ScriptFile(self.path, filename, os.path.join(dirpath, filename)))
  142. res = [x for x in res if os.path.splitext(x.path)[1].lower() == extension and os.path.isfile(x.path)]
  143. return res
  144. def check_updates(self):
  145. repo = Repo(self.path)
  146. for fetch in repo.remote().fetch(dry_run=True):
  147. if self.branch and fetch.name != f'{repo.remote().name}/{self.branch}':
  148. continue
  149. if fetch.flags != fetch.HEAD_UPTODATE:
  150. self.can_update = True
  151. self.status = "new commits"
  152. return
  153. try:
  154. origin = repo.rev_parse('origin')
  155. if repo.head.commit != origin:
  156. self.can_update = True
  157. self.status = "behind HEAD"
  158. return
  159. except Exception:
  160. self.can_update = False
  161. self.status = "unknown (remote error)"
  162. return
  163. self.can_update = False
  164. self.status = "latest"
  165. def fetch_and_reset_hard(self, commit='origin'):
  166. repo = Repo(self.path)
  167. # Fix: `error: Your local changes to the following files would be overwritten by merge`,
  168. # because WSL2 Docker set 755 file permissions instead of 644, this results to the error.
  169. repo.git.fetch(all=True)
  170. repo.git.reset(commit, hard=True)
  171. self.have_info_from_repo = False
  172. def list_extensions():
  173. extensions.clear()
  174. extension_paths.clear()
  175. loaded_extensions.clear()
  176. if shared.cmd_opts.disable_all_extensions:
  177. print("*** \"--disable-all-extensions\" arg was used, will not load any extensions ***")
  178. elif shared.opts.disable_all_extensions == "all":
  179. print("*** \"Disable all extensions\" option was set, will not load any extensions ***")
  180. elif shared.cmd_opts.disable_extra_extensions:
  181. print("*** \"--disable-extra-extensions\" arg was used, will only load built-in extensions ***")
  182. elif shared.opts.disable_all_extensions == "extra":
  183. print("*** \"Disable all extensions\" option was set, will only load built-in extensions ***")
  184. # scan through extensions directory and load metadata
  185. for dirname in [extensions_builtin_dir, extensions_dir]:
  186. if not os.path.isdir(dirname):
  187. continue
  188. for extension_dirname in sorted(os.listdir(dirname)):
  189. path = os.path.join(dirname, extension_dirname)
  190. if not os.path.isdir(path):
  191. continue
  192. canonical_name = extension_dirname
  193. metadata = ExtensionMetadata(path, canonical_name)
  194. # check for duplicated canonical names
  195. already_loaded_extension = loaded_extensions.get(metadata.canonical_name)
  196. if already_loaded_extension is not None:
  197. errors.report(f'Duplicate canonical name "{canonical_name}" found in extensions "{extension_dirname}" and "{already_loaded_extension.name}". Former will be discarded.', exc_info=False)
  198. continue
  199. is_builtin = dirname == extensions_builtin_dir
  200. extension = Extension(name=extension_dirname, path=path, enabled=extension_dirname not in shared.opts.disabled_extensions, is_builtin=is_builtin, metadata=metadata)
  201. extensions.append(extension)
  202. extension_paths[extension.path] = extension
  203. loaded_extensions[canonical_name] = extension
  204. for extension in extensions:
  205. extension.metadata.requires = extension.metadata.get_script_requirements("Requires", "Extension")
  206. # check for requirements
  207. for extension in extensions:
  208. if not extension.enabled:
  209. continue
  210. for req in extension.metadata.requires:
  211. required_extension = loaded_extensions.get(req)
  212. if required_extension is None:
  213. errors.report(f'Extension "{extension.name}" requires "{req}" which is not installed.', exc_info=False)
  214. continue
  215. if not required_extension.enabled:
  216. errors.report(f'Extension "{extension.name}" requires "{required_extension.name}" which is disabled.', exc_info=False)
  217. continue
  218. def find_extension(filename):
  219. parentdir = os.path.dirname(os.path.realpath(filename))
  220. while parentdir != filename:
  221. extension = extension_paths.get(parentdir)
  222. if extension is not None:
  223. return extension
  224. filename = parentdir
  225. parentdir = os.path.dirname(filename)
  226. return None