123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758 |
- import os
- import re
- import sys
- import inspect
- from collections import namedtuple
- from dataclasses import dataclass
- import gradio as gr
- from modules import shared, paths, script_callbacks, extensions, script_loading, scripts_postprocessing, errors, timer
- AlwaysVisible = object()
- class PostprocessImageArgs:
- def __init__(self, image):
- self.image = image
- class PostprocessBatchListArgs:
- def __init__(self, images):
- self.images = images
- @dataclass
- class OnComponent:
- component: gr.blocks.Block
- class Script:
- name = None
- """script's internal name derived from title"""
- section = None
- """name of UI section that the script's controls will be placed into"""
- filename = None
- args_from = None
- args_to = None
- alwayson = False
- is_txt2img = False
- is_img2img = False
- tabname = None
- group = None
- """A gr.Group component that has all script's UI inside it."""
- create_group = True
- """If False, for alwayson scripts, a group component will not be created."""
- infotext_fields = None
- """if set in ui(), this is a list of pairs of gradio component + text; the text will be used when
- parsing infotext to set the value for the component; see ui.py's txt2img_paste_fields for an example
- """
- paste_field_names = None
- """if set in ui(), this is a list of names of infotext fields; the fields will be sent through the
- various "Send to <X>" buttons when clicked
- """
- api_info = None
- """Generated value of type modules.api.models.ScriptInfo with information about the script for API"""
- on_before_component_elem_id = None
- """list of callbacks to be called before a component with an elem_id is created"""
- on_after_component_elem_id = None
- """list of callbacks to be called after a component with an elem_id is created"""
- setup_for_ui_only = False
- """If true, the script setup will only be run in Gradio UI, not in API"""
- def title(self):
- """this function should return the title of the script. This is what will be displayed in the dropdown menu."""
- raise NotImplementedError()
- def ui(self, is_img2img):
- """this function should create gradio UI elements. See https://gradio.app/docs/#components
- The return value should be an array of all components that are used in processing.
- Values of those returned components will be passed to run() and process() functions.
- """
- pass
- def show(self, is_img2img):
- """
- is_img2img is True if this function is called for the img2img interface, and Fasle otherwise
- This function should return:
- - False if the script should not be shown in UI at all
- - True if the script should be shown in UI if it's selected in the scripts dropdown
- - script.AlwaysVisible if the script should be shown in UI at all times
- """
- return True
- def run(self, p, *args):
- """
- This function is called if the script has been selected in the script dropdown.
- It must do all processing and return the Processed object with results, same as
- one returned by processing.process_images.
- Usually the processing is done by calling the processing.process_images function.
- args contains all values returned by components from ui()
- """
- pass
- def setup(self, p, *args):
- """For AlwaysVisible scripts, this function is called when the processing object is set up, before any processing starts.
- args contains all values returned by components from ui().
- """
- pass
- def before_process(self, p, *args):
- """
- This function is called very early during processing begins for AlwaysVisible scripts.
- You can modify the processing object (p) here, inject hooks, etc.
- args contains all values returned by components from ui()
- """
- pass
- def process(self, p, *args):
- """
- This function is called before processing begins for AlwaysVisible scripts.
- You can modify the processing object (p) here, inject hooks, etc.
- args contains all values returned by components from ui()
- """
- pass
- def before_process_batch(self, p, *args, **kwargs):
- """
- Called before extra networks are parsed from the prompt, so you can add
- new extra network keywords to the prompt with this callback.
- **kwargs will have those items:
- - batch_number - index of current batch, from 0 to number of batches-1
- - prompts - list of prompts for current batch; you can change contents of this list but changing the number of entries will likely break things
- - seeds - list of seeds for current batch
- - subseeds - list of subseeds for current batch
- """
- pass
- def after_extra_networks_activate(self, p, *args, **kwargs):
- """
- Called after extra networks activation, before conds calculation
- allow modification of the network after extra networks activation been applied
- won't be call if p.disable_extra_networks
- **kwargs will have those items:
- - batch_number - index of current batch, from 0 to number of batches-1
- - prompts - list of prompts for current batch; you can change contents of this list but changing the number of entries will likely break things
- - seeds - list of seeds for current batch
- - subseeds - list of subseeds for current batch
- - extra_network_data - list of ExtraNetworkParams for current stage
- """
- pass
- def process_batch(self, p, *args, **kwargs):
- """
- Same as process(), but called for every batch.
- **kwargs will have those items:
- - batch_number - index of current batch, from 0 to number of batches-1
- - prompts - list of prompts for current batch; you can change contents of this list but changing the number of entries will likely break things
- - seeds - list of seeds for current batch
- - subseeds - list of subseeds for current batch
- """
- pass
- def postprocess_batch(self, p, *args, **kwargs):
- """
- Same as process_batch(), but called for every batch after it has been generated.
- **kwargs will have same items as process_batch, and also:
- - batch_number - index of current batch, from 0 to number of batches-1
- - images - torch tensor with all generated images, with values ranging from 0 to 1;
- """
- pass
- def postprocess_batch_list(self, p, pp: PostprocessBatchListArgs, *args, **kwargs):
- """
- Same as postprocess_batch(), but receives batch images as a list of 3D tensors instead of a 4D tensor.
- This is useful when you want to update the entire batch instead of individual images.
- You can modify the postprocessing object (pp) to update the images in the batch, remove images, add images, etc.
- If the number of images is different from the batch size when returning,
- then the script has the responsibility to also update the following attributes in the processing object (p):
- - p.prompts
- - p.negative_prompts
- - p.seeds
- - p.subseeds
- **kwargs will have same items as process_batch, and also:
- - batch_number - index of current batch, from 0 to number of batches-1
- """
- pass
- def postprocess_image(self, p, pp: PostprocessImageArgs, *args):
- """
- Called for every image after it has been generated.
- """
- pass
- def postprocess(self, p, processed, *args):
- """
- This function is called after processing ends for AlwaysVisible scripts.
- args contains all values returned by components from ui()
- """
- pass
- def before_component(self, component, **kwargs):
- """
- Called before a component is created.
- Use elem_id/label fields of kwargs to figure out which component it is.
- This can be useful to inject your own components somewhere in the middle of vanilla UI.
- You can return created components in the ui() function to add them to the list of arguments for your processing functions
- """
- pass
- def after_component(self, component, **kwargs):
- """
- Called after a component is created. Same as above.
- """
- pass
- def on_before_component(self, callback, *, elem_id):
- """
- Calls callback before a component is created. The callback function is called with a single argument of type OnComponent.
- May be called in show() or ui() - but it may be too late in latter as some components may already be created.
- This function is an alternative to before_component in that it also cllows to run before a component is created, but
- it doesn't require to be called for every created component - just for the one you need.
- """
- if self.on_before_component_elem_id is None:
- self.on_before_component_elem_id = []
- self.on_before_component_elem_id.append((elem_id, callback))
- def on_after_component(self, callback, *, elem_id):
- """
- Calls callback after a component is created. The callback function is called with a single argument of type OnComponent.
- """
- if self.on_after_component_elem_id is None:
- self.on_after_component_elem_id = []
- self.on_after_component_elem_id.append((elem_id, callback))
- def describe(self):
- """unused"""
- return ""
- def elem_id(self, item_id):
- """helper function to generate id for a HTML element, constructs final id out of script name, tab and user-supplied item_id"""
- need_tabname = self.show(True) == self.show(False)
- tabkind = 'img2img' if self.is_img2img else 'txt2img'
- tabname = f"{tabkind}_" if need_tabname else ""
- title = re.sub(r'[^a-z_0-9]', '', re.sub(r'\s', '_', self.title().lower()))
- return f'script_{tabname}{title}_{item_id}'
- def before_hr(self, p, *args):
- """
- This function is called before hires fix start.
- """
- pass
- class ScriptBuiltinUI(Script):
- setup_for_ui_only = True
- def elem_id(self, item_id):
- """helper function to generate id for a HTML element, constructs final id out of tab and user-supplied item_id"""
- need_tabname = self.show(True) == self.show(False)
- tabname = ('img2img' if self.is_img2img else 'txt2img') + "_" if need_tabname else ""
- return f'{tabname}{item_id}'
- current_basedir = paths.script_path
- def basedir():
- """returns the base directory for the current script. For scripts in the main scripts directory,
- this is the main directory (where webui.py resides), and for scripts in extensions directory
- (ie extensions/aesthetic/script/aesthetic.py), this is extension's directory (extensions/aesthetic)
- """
- return current_basedir
- ScriptFile = namedtuple("ScriptFile", ["basedir", "filename", "path"])
- scripts_data = []
- postprocessing_scripts_data = []
- ScriptClassData = namedtuple("ScriptClassData", ["script_class", "path", "basedir", "module"])
- def list_scripts(scriptdirname, extension, *, include_extensions=True):
- scripts_list = []
- basedir = os.path.join(paths.script_path, scriptdirname)
- if os.path.exists(basedir):
- for filename in sorted(os.listdir(basedir)):
- scripts_list.append(ScriptFile(paths.script_path, filename, os.path.join(basedir, filename)))
- if include_extensions:
- for ext in extensions.active():
- scripts_list += ext.list_files(scriptdirname, extension)
- scripts_list = [x for x in scripts_list if os.path.splitext(x.path)[1].lower() == extension and os.path.isfile(x.path)]
- return scripts_list
- def list_files_with_name(filename):
- res = []
- dirs = [paths.script_path] + [ext.path for ext in extensions.active()]
- for dirpath in dirs:
- if not os.path.isdir(dirpath):
- continue
- path = os.path.join(dirpath, filename)
- if os.path.isfile(path):
- res.append(path)
- return res
- def load_scripts():
- global current_basedir
- scripts_data.clear()
- postprocessing_scripts_data.clear()
- script_callbacks.clear_callbacks()
- scripts_list = list_scripts("scripts", ".py") + list_scripts("modules/processing_scripts", ".py", include_extensions=False)
- syspath = sys.path
- def register_scripts_from_module(module):
- for script_class in module.__dict__.values():
- if not inspect.isclass(script_class):
- continue
- if issubclass(script_class, Script):
- scripts_data.append(ScriptClassData(script_class, scriptfile.path, scriptfile.basedir, module))
- elif issubclass(script_class, scripts_postprocessing.ScriptPostprocessing):
- postprocessing_scripts_data.append(ScriptClassData(script_class, scriptfile.path, scriptfile.basedir, module))
- def orderby(basedir):
- # 1st webui, 2nd extensions-builtin, 3rd extensions
- priority = {os.path.join(paths.script_path, "extensions-builtin"):1, paths.script_path:0}
- for key in priority:
- if basedir.startswith(key):
- return priority[key]
- return 9999
- for scriptfile in sorted(scripts_list, key=lambda x: [orderby(x.basedir), x]):
- try:
- if scriptfile.basedir != paths.script_path:
- sys.path = [scriptfile.basedir] + sys.path
- current_basedir = scriptfile.basedir
- script_module = script_loading.load_module(scriptfile.path)
- register_scripts_from_module(script_module)
- except Exception:
- errors.report(f"Error loading script: {scriptfile.filename}", exc_info=True)
- finally:
- sys.path = syspath
- current_basedir = paths.script_path
- timer.startup_timer.record(scriptfile.filename)
- global scripts_txt2img, scripts_img2img, scripts_postproc
- scripts_txt2img = ScriptRunner()
- scripts_img2img = ScriptRunner()
- scripts_postproc = scripts_postprocessing.ScriptPostprocessingRunner()
- def wrap_call(func, filename, funcname, *args, default=None, **kwargs):
- try:
- return func(*args, **kwargs)
- except Exception:
- errors.report(f"Error calling: {filename}/{funcname}", exc_info=True)
- return default
- class ScriptRunner:
- def __init__(self):
- self.scripts = []
- self.selectable_scripts = []
- self.alwayson_scripts = []
- self.titles = []
- self.title_map = {}
- self.infotext_fields = []
- self.paste_field_names = []
- self.inputs = [None]
- self.on_before_component_elem_id = {}
- """dict of callbacks to be called before an element is created; key=elem_id, value=list of callbacks"""
- self.on_after_component_elem_id = {}
- """dict of callbacks to be called after an element is created; key=elem_id, value=list of callbacks"""
- def initialize_scripts(self, is_img2img):
- from modules import scripts_auto_postprocessing
- self.scripts.clear()
- self.alwayson_scripts.clear()
- self.selectable_scripts.clear()
- auto_processing_scripts = scripts_auto_postprocessing.create_auto_preprocessing_script_data()
- for script_data in auto_processing_scripts + scripts_data:
- script = script_data.script_class()
- script.filename = script_data.path
- script.is_txt2img = not is_img2img
- script.is_img2img = is_img2img
- script.tabname = "img2img" if is_img2img else "txt2img"
- visibility = script.show(script.is_img2img)
- if visibility == AlwaysVisible:
- self.scripts.append(script)
- self.alwayson_scripts.append(script)
- script.alwayson = True
- elif visibility:
- self.scripts.append(script)
- self.selectable_scripts.append(script)
- self.apply_on_before_component_callbacks()
- def apply_on_before_component_callbacks(self):
- for script in self.scripts:
- on_before = script.on_before_component_elem_id or []
- on_after = script.on_after_component_elem_id or []
- for elem_id, callback in on_before:
- if elem_id not in self.on_before_component_elem_id:
- self.on_before_component_elem_id[elem_id] = []
- self.on_before_component_elem_id[elem_id].append((callback, script))
- for elem_id, callback in on_after:
- if elem_id not in self.on_after_component_elem_id:
- self.on_after_component_elem_id[elem_id] = []
- self.on_after_component_elem_id[elem_id].append((callback, script))
- on_before.clear()
- on_after.clear()
- def create_script_ui(self, script):
- import modules.api.models as api_models
- script.args_from = len(self.inputs)
- script.args_to = len(self.inputs)
- controls = wrap_call(script.ui, script.filename, "ui", script.is_img2img)
- if controls is None:
- return
- script.name = wrap_call(script.title, script.filename, "title", default=script.filename).lower()
- api_args = []
- for control in controls:
- control.custom_script_source = os.path.basename(script.filename)
- arg_info = api_models.ScriptArg(label=control.label or "")
- for field in ("value", "minimum", "maximum", "step", "choices"):
- v = getattr(control, field, None)
- if v is not None:
- setattr(arg_info, field, v)
- api_args.append(arg_info)
- script.api_info = api_models.ScriptInfo(
- name=script.name,
- is_img2img=script.is_img2img,
- is_alwayson=script.alwayson,
- args=api_args,
- )
- if script.infotext_fields is not None:
- self.infotext_fields += script.infotext_fields
- if script.paste_field_names is not None:
- self.paste_field_names += script.paste_field_names
- self.inputs += controls
- script.args_to = len(self.inputs)
- def setup_ui_for_section(self, section, scriptlist=None):
- if scriptlist is None:
- scriptlist = self.alwayson_scripts
- for script in scriptlist:
- if script.alwayson and script.section != section:
- continue
- if script.create_group:
- with gr.Group(visible=script.alwayson) as group:
- self.create_script_ui(script)
- script.group = group
- else:
- self.create_script_ui(script)
- def prepare_ui(self):
- self.inputs = [None]
- def setup_ui(self):
- all_titles = [wrap_call(script.title, script.filename, "title") or script.filename for script in self.scripts]
- self.title_map = {title.lower(): script for title, script in zip(all_titles, self.scripts)}
- self.titles = [wrap_call(script.title, script.filename, "title") or f"{script.filename} [error]" for script in self.selectable_scripts]
- self.setup_ui_for_section(None)
- dropdown = gr.Dropdown(label="Script", elem_id="script_list", choices=["None"] + self.titles, value="None", type="index")
- self.inputs[0] = dropdown
- self.setup_ui_for_section(None, self.selectable_scripts)
- def select_script(script_index):
- selected_script = self.selectable_scripts[script_index - 1] if script_index>0 else None
- return [gr.update(visible=selected_script == s) for s in self.selectable_scripts]
- def init_field(title):
- """called when an initial value is set from ui-config.json to show script's UI components"""
- if title == 'None':
- return
- script_index = self.titles.index(title)
- self.selectable_scripts[script_index].group.visible = True
- dropdown.init_field = init_field
- dropdown.change(
- fn=select_script,
- inputs=[dropdown],
- outputs=[script.group for script in self.selectable_scripts]
- )
- self.script_load_ctr = 0
- def onload_script_visibility(params):
- title = params.get('Script', None)
- if title:
- title_index = self.titles.index(title)
- visibility = title_index == self.script_load_ctr
- self.script_load_ctr = (self.script_load_ctr + 1) % len(self.titles)
- return gr.update(visible=visibility)
- else:
- return gr.update(visible=False)
- self.infotext_fields.append((dropdown, lambda x: gr.update(value=x.get('Script', 'None'))))
- self.infotext_fields.extend([(script.group, onload_script_visibility) for script in self.selectable_scripts])
- self.apply_on_before_component_callbacks()
- return self.inputs
- def run(self, p, *args):
- script_index = args[0]
- if script_index == 0:
- return None
- script = self.selectable_scripts[script_index-1]
- if script is None:
- return None
- script_args = args[script.args_from:script.args_to]
- processed = script.run(p, *script_args)
- shared.total_tqdm.clear()
- return processed
- def before_process(self, p):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.before_process(p, *script_args)
- except Exception:
- errors.report(f"Error running before_process: {script.filename}", exc_info=True)
- def process(self, p):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.process(p, *script_args)
- except Exception:
- errors.report(f"Error running process: {script.filename}", exc_info=True)
- def before_process_batch(self, p, **kwargs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.before_process_batch(p, *script_args, **kwargs)
- except Exception:
- errors.report(f"Error running before_process_batch: {script.filename}", exc_info=True)
- def after_extra_networks_activate(self, p, **kwargs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.after_extra_networks_activate(p, *script_args, **kwargs)
- except Exception:
- errors.report(f"Error running after_extra_networks_activate: {script.filename}", exc_info=True)
- def process_batch(self, p, **kwargs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.process_batch(p, *script_args, **kwargs)
- except Exception:
- errors.report(f"Error running process_batch: {script.filename}", exc_info=True)
- def postprocess(self, p, processed):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.postprocess(p, processed, *script_args)
- except Exception:
- errors.report(f"Error running postprocess: {script.filename}", exc_info=True)
- def postprocess_batch(self, p, images, **kwargs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.postprocess_batch(p, *script_args, images=images, **kwargs)
- except Exception:
- errors.report(f"Error running postprocess_batch: {script.filename}", exc_info=True)
- def postprocess_batch_list(self, p, pp: PostprocessBatchListArgs, **kwargs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.postprocess_batch_list(p, pp, *script_args, **kwargs)
- except Exception:
- errors.report(f"Error running postprocess_batch_list: {script.filename}", exc_info=True)
- def postprocess_image(self, p, pp: PostprocessImageArgs):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.postprocess_image(p, pp, *script_args)
- except Exception:
- errors.report(f"Error running postprocess_image: {script.filename}", exc_info=True)
- def before_component(self, component, **kwargs):
- for callback, script in self.on_before_component_elem_id.get(kwargs.get("elem_id"), []):
- try:
- callback(OnComponent(component=component))
- except Exception:
- errors.report(f"Error running on_before_component: {script.filename}", exc_info=True)
- for script in self.scripts:
- try:
- script.before_component(component, **kwargs)
- except Exception:
- errors.report(f"Error running before_component: {script.filename}", exc_info=True)
- def after_component(self, component, **kwargs):
- for callback, script in self.on_after_component_elem_id.get(component.elem_id, []):
- try:
- callback(OnComponent(component=component))
- except Exception:
- errors.report(f"Error running on_after_component: {script.filename}", exc_info=True)
- for script in self.scripts:
- try:
- script.after_component(component, **kwargs)
- except Exception:
- errors.report(f"Error running after_component: {script.filename}", exc_info=True)
- def script(self, title):
- return self.title_map.get(title.lower())
- def reload_sources(self, cache):
- for si, script in list(enumerate(self.scripts)):
- args_from = script.args_from
- args_to = script.args_to
- filename = script.filename
- module = cache.get(filename, None)
- if module is None:
- module = script_loading.load_module(script.filename)
- cache[filename] = module
- for script_class in module.__dict__.values():
- if type(script_class) == type and issubclass(script_class, Script):
- self.scripts[si] = script_class()
- self.scripts[si].filename = filename
- self.scripts[si].args_from = args_from
- self.scripts[si].args_to = args_to
- def before_hr(self, p):
- for script in self.alwayson_scripts:
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.before_hr(p, *script_args)
- except Exception:
- errors.report(f"Error running before_hr: {script.filename}", exc_info=True)
- def setup_scrips(self, p, *, is_ui=True):
- for script in self.alwayson_scripts:
- if not is_ui and script.setup_for_ui_only:
- continue
- try:
- script_args = p.script_args[script.args_from:script.args_to]
- script.setup(p, *script_args)
- except Exception:
- errors.report(f"Error running setup: {script.filename}", exc_info=True)
- scripts_txt2img: ScriptRunner = None
- scripts_img2img: ScriptRunner = None
- scripts_postproc: scripts_postprocessing.ScriptPostprocessingRunner = None
- scripts_current: ScriptRunner = None
- def reload_script_body_only():
- cache = {}
- scripts_txt2img.reload_sources(cache)
- scripts_img2img.reload_sources(cache)
- reload_scripts = load_scripts # compatibility alias
|