123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 |
- # Copyright 2017 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- import ast
- import collections
- from io import StringIO
- import logging
- import sys
- import tokenize
- import gclient_utils
- from third_party import schema
- # TODO: Should fix these warnings.
- # pylint: disable=line-too-long
- # git_dependencies migration states. Used within the DEPS file to indicate
- # the current migration state.
- DEPS = 'DEPS'
- SYNC = 'SYNC'
- SUBMODULES = 'SUBMODULES'
- class ConstantString(object):
- def __init__(self, value):
- self.value = value
- def __format__(self, format_spec):
- del format_spec
- return self.value
- def __repr__(self):
- return "Str('" + self.value + "')"
- def __eq__(self, other):
- if isinstance(other, ConstantString):
- return self.value == other.value
- return self.value == other
- def __hash__(self):
- return self.value.__hash__()
- class _NodeDict(collections.abc.MutableMapping):
- """Dict-like type that also stores information on AST nodes and tokens."""
- def __init__(self, data=None, tokens=None):
- self.data = collections.OrderedDict(data or [])
- self.tokens = tokens
- def __str__(self):
- return str({k: v[0] for k, v in self.data.items()})
- def __repr__(self):
- return self.__str__()
- def __getitem__(self, key):
- return self.data[key][0]
- def __setitem__(self, key, value):
- self.data[key] = (value, None)
- def __delitem__(self, key):
- del self.data[key]
- def __iter__(self):
- return iter(self.data)
- def __len__(self):
- return len(self.data)
- def MoveTokens(self, origin, delta):
- if self.tokens:
- new_tokens = {}
- for pos, token in self.tokens.items():
- if pos[0] >= origin:
- pos = (pos[0] + delta, pos[1])
- token = token[:2] + (pos, ) + token[3:]
- new_tokens[pos] = token
- for value, node in self.data.values():
- if node.lineno >= origin:
- node.lineno += delta
- if isinstance(value, _NodeDict):
- value.MoveTokens(origin, delta)
- def GetNode(self, key):
- return self.data[key][1]
- def SetNode(self, key, value, node):
- self.data[key] = (value, node)
- def _NodeDictSchema(dict_schema):
- """Validate dict_schema after converting _NodeDict to a regular dict."""
- def validate(d):
- schema.Schema(dict_schema).validate(dict(d))
- return True
- return validate
- # See https://github.com/keleshev/schema for docs how to configure schema.
- _GCLIENT_DEPS_SCHEMA = _NodeDictSchema({
- schema.Optional(str):
- schema.Or(
- None,
- str,
- _NodeDictSchema({
- # Repo and revision to check out under the path
- # (same as if no dict was used).
- 'url':
- schema.Or(None, str),
- # Optional condition string. The dep will only be processed
- # if the condition evaluates to True.
- schema.Optional('condition'):
- str,
- schema.Optional('dep_type', default='git'):
- str,
- }),
- # CIPD package.
- _NodeDictSchema({
- 'packages': [_NodeDictSchema({
- 'package': str,
- 'version': str,
- })],
- schema.Optional('condition'):
- str,
- schema.Optional('dep_type', default='cipd'):
- str,
- }),
- # GCS content.
- _NodeDictSchema({
- 'bucket':
- str,
- 'objects': [
- _NodeDictSchema({
- 'object_name':
- str,
- 'sha256sum':
- str,
- 'size_bytes':
- int,
- 'generation':
- int,
- schema.Optional('output_file'):
- str,
- # The object will only be processed if the condition
- # evaluates to True. This is AND with the parent condition.
- schema.Optional('condition'):
- str,
- })
- ],
- schema.Optional('condition'):
- str,
- schema.Optional('dep_type', default='gcs'):
- str,
- }),
- ),
- })
- _GCLIENT_HOOKS_SCHEMA = [
- _NodeDictSchema({
- # Hook action: list of command-line arguments to invoke.
- 'action': [schema.Or(str)],
- # Name of the hook. Doesn't affect operation.
- schema.Optional('name'):
- str,
- # Hook pattern (regex). Originally intended to limit some hooks to run
- # only when files matching the pattern have changed. In practice, with
- # git, gclient runs all the hooks regardless of this field.
- schema.Optional('pattern'):
- str,
- # Working directory where to execute the hook.
- schema.Optional('cwd'):
- str,
- # Optional condition string. The hook will only be run
- # if the condition evaluates to True.
- schema.Optional('condition'):
- str,
- })
- ]
- _GCLIENT_SCHEMA = schema.Schema(
- _NodeDictSchema({
- # Current state of the git submodule migration.
- # git_dependencies = [DEPS (default) | SUBMODULES | SYNC]
- schema.Optional('git_dependencies'):
- schema.Or(DEPS, SYNC, SUBMODULES),
- # List of host names from which dependencies are allowed (allowlist).
- # NOTE: when not present, all hosts are allowed.
- # NOTE: scoped to current DEPS file, not recursive.
- schema.Optional('allowed_hosts'): [schema.Optional(str)],
- # Mapping from paths to repo and revision to check out under that path.
- # Applying this mapping to the on-disk checkout is the main purpose
- # of gclient, and also why the config file is called DEPS.
- #
- # The following functions are allowed:
- #
- # Var(): allows variable substitution (either from 'vars' dict below,
- # or command-line override)
- schema.Optional('deps'):
- _GCLIENT_DEPS_SCHEMA,
- # Similar to 'deps' (see above) - also keyed by OS (e.g. 'linux').
- # Also see 'target_os'.
- schema.Optional('deps_os'):
- _NodeDictSchema({
- schema.Optional(str): _GCLIENT_DEPS_SCHEMA,
- }),
- # Dependency to get gclient_gn_args* settings from. This allows these
- # values to be set in a recursedeps file, rather than requiring that
- # they exist in the top-level solution.
- schema.Optional('gclient_gn_args_from'):
- str,
- # Path to GN args file to write selected variables.
- schema.Optional('gclient_gn_args_file'):
- str,
- # Subset of variables to write to the GN args file (see above).
- schema.Optional('gclient_gn_args'): [schema.Optional(str)],
- # Hooks executed after gclient sync (unless suppressed), or explicitly
- # on gclient hooks. See _GCLIENT_HOOKS_SCHEMA for details.
- # Also see 'pre_deps_hooks'.
- schema.Optional('hooks'):
- _GCLIENT_HOOKS_SCHEMA,
- # Similar to 'hooks', also keyed by OS.
- schema.Optional('hooks_os'):
- _NodeDictSchema({schema.Optional(str): _GCLIENT_HOOKS_SCHEMA}),
- # Rules which #includes are allowed in the directory.
- # Also see 'skip_child_includes' and 'specific_include_rules'.
- schema.Optional('include_rules'): [schema.Optional(str)],
- # Commits that add include_rules entries on the paths with this set
- # will require an OWNERS review from them.
- schema.Optional('new_usages_require_review'):
- bool,
- # Optionally discards rules from parent directories, similar to
- # "noparent" in OWNERS files. For example, if
- # //base/allocator/partition_allocator has "noparent = True" then it
- # will not inherit rules from //base/DEPS and //base/allocator/DEPS,
- # forcing each //base/allocator/partition_allocator/{foo,bar,...} to
- # declare all its dependencies.
- schema.Optional('noparent'):
- bool,
- # Hooks executed before processing DEPS. See 'hooks' for more details.
- schema.Optional('pre_deps_hooks'):
- _GCLIENT_HOOKS_SCHEMA,
- # Recursion limit for nested DEPS.
- schema.Optional('recursion'):
- int,
- # Allowlists deps for which recursion should be enabled.
- schema.Optional('recursedeps'): [
- schema.Optional(schema.Or(str, (str, str), [str, str])),
- ],
- # Blocklists directories for checking 'include_rules'.
- schema.Optional('skip_child_includes'): [schema.Optional(str)],
- # Mapping from paths to include rules specific for that path.
- # See 'include_rules' for more details.
- schema.Optional('specific_include_rules'):
- _NodeDictSchema({schema.Optional(str): [str]}),
- # List of additional OS names to consider when selecting dependencies
- # from deps_os.
- schema.Optional('target_os'): [schema.Optional(str)],
- # For recursed-upon sub-dependencies, check out their own dependencies
- # relative to the parent's path, rather than relative to the .gclient
- # file.
- schema.Optional('use_relative_paths'):
- bool,
- # For recursed-upon sub-dependencies, run their hooks relative to the
- # parent's path instead of relative to the .gclient file.
- schema.Optional('use_relative_hooks'):
- bool,
- # Variables that can be referenced using Var() - see 'deps'.
- schema.Optional('vars'):
- _NodeDictSchema({
- schema.Optional(str):
- schema.Or(ConstantString, str, bool),
- }),
- }))
- def _gclient_eval(node_or_string, filename='<unknown>', vars_dict=None):
- """Safely evaluates a single expression. Returns the result."""
- _allowed_names = {'None': None, 'True': True, 'False': False}
- if isinstance(node_or_string, ConstantString):
- return node_or_string.value
- if isinstance(node_or_string, str):
- node_or_string = ast.parse(node_or_string,
- filename=filename,
- mode='eval')
- if isinstance(node_or_string, ast.Expression):
- node_or_string = node_or_string.body
- def _convert(node):
- if isinstance(node, ast.Str):
- if vars_dict is None:
- return node.s
- try:
- return node.s.format(**vars_dict)
- except KeyError as e:
- raise KeyError(
- '%s was used as a variable, but was not declared in the vars dict '
- '(file %r, line %s)' %
- (e.args[0], filename, getattr(node, 'lineno', '<unknown>')))
- elif isinstance(node, ast.Num):
- return node.n
- elif isinstance(node, ast.Tuple):
- return tuple(map(_convert, node.elts))
- elif isinstance(node, ast.List):
- return list(map(_convert, node.elts))
- elif isinstance(node, ast.Dict):
- node_dict = _NodeDict()
- for key_node, value_node in zip(node.keys, node.values):
- key = _convert(key_node)
- if key in node_dict:
- raise ValueError(
- 'duplicate key in dictionary: %s (file %r, line %s)' %
- (key, filename, getattr(key_node, 'lineno',
- '<unknown>')))
- node_dict.SetNode(key, _convert(value_node), value_node)
- return node_dict
- elif isinstance(node, ast.Name):
- if node.id not in _allowed_names:
- raise ValueError(
- 'invalid name %r (file %r, line %s)' %
- (node.id, filename, getattr(node, 'lineno', '<unknown>')))
- return _allowed_names[node.id]
- elif not sys.version_info[:2] < (3, 4) and isinstance(
- node, ast.NameConstant): # Since Python 3.4
- return node.value
- elif isinstance(node, ast.Call):
- if (not isinstance(node.func, ast.Name)
- or (node.func.id not in ('Str', 'Var'))):
- raise ValueError(
- 'Str and Var are the only allowed functions (file %r, line %s)'
- % (filename, getattr(node, 'lineno', '<unknown>')))
- if node.keywords or getattr(node, 'starargs', None) or getattr(
- node, 'kwargs', None) or len(node.args) != 1:
- raise ValueError(
- '%s takes exactly one argument (file %r, line %s)' %
- (node.func.id, filename, getattr(node, 'lineno',
- '<unknown>')))
- if node.func.id == 'Str':
- if isinstance(node.args[0], ast.Str):
- return ConstantString(node.args[0].s)
- raise ValueError(
- 'Passed a non-string to Str() (file %r, line%s)' %
- (filename, getattr(node, 'lineno', '<unknown>')))
- arg = _convert(node.args[0])
- if not isinstance(arg, str):
- raise ValueError(
- 'Var\'s argument must be a variable name (file %r, line %s)'
- % (filename, getattr(node, 'lineno', '<unknown>')))
- if vars_dict is None:
- return '{' + arg + '}'
- if arg not in vars_dict:
- raise KeyError(
- '%s was used as a variable, but was not declared in the vars dict '
- '(file %r, line %s)' %
- (arg, filename, getattr(node, 'lineno', '<unknown>')))
- val = vars_dict[arg]
- if isinstance(val, ConstantString):
- val = val.value
- return val
- elif isinstance(node, ast.BinOp) and isinstance(node.op, ast.Add):
- return _convert(node.left) + _convert(node.right)
- elif isinstance(node, ast.BinOp) and isinstance(node.op, ast.Mod):
- return _convert(node.left) % _convert(node.right)
- else:
- raise ValueError('unexpected AST node: %s %s (file %r, line %s)' %
- (node, ast.dump(node), filename,
- getattr(node, 'lineno', '<unknown>')))
- return _convert(node_or_string)
- def Exec(content, filename='<unknown>', vars_override=None, builtin_vars=None):
- """Safely execs a set of assignments."""
- def _validate_statement(node, local_scope):
- if not isinstance(node, ast.Assign):
- raise ValueError('unexpected AST node: %s %s (file %r, line %s)' %
- (node, ast.dump(node), filename,
- getattr(node, 'lineno', '<unknown>')))
- if len(node.targets) != 1:
- raise ValueError(
- 'invalid assignment: use exactly one target (file %r, line %s)'
- % (filename, getattr(node, 'lineno', '<unknown>')))
- target = node.targets[0]
- if not isinstance(target, ast.Name):
- raise ValueError(
- 'invalid assignment: target should be a name (file %r, line %s)'
- % (filename, getattr(node, 'lineno', '<unknown>')))
- if target.id in local_scope:
- raise ValueError(
- 'invalid assignment: overrides var %r (file %r, line %s)' %
- (target.id, filename, getattr(node, 'lineno', '<unknown>')))
- node_or_string = ast.parse(content, filename=filename, mode='exec')
- if isinstance(node_or_string, ast.Expression):
- node_or_string = node_or_string.body
- if not isinstance(node_or_string, ast.Module):
- raise ValueError('unexpected AST node: %s %s (file %r, line %s)' %
- (node_or_string, ast.dump(node_or_string), filename,
- getattr(node_or_string, 'lineno', '<unknown>')))
- statements = {}
- for statement in node_or_string.body:
- _validate_statement(statement, statements)
- statements[statement.targets[0].id] = statement.value
- tokens = {
- token[2]: list(token)
- for token in tokenize.generate_tokens(StringIO(content).readline)
- }
- local_scope = _NodeDict({}, tokens)
- # Process vars first, so we can expand variables in the rest of the DEPS
- # file.
- vars_dict = {}
- if 'vars' in statements:
- vars_statement = statements['vars']
- value = _gclient_eval(vars_statement, filename)
- local_scope.SetNode('vars', value, vars_statement)
- # Update the parsed vars with the overrides, but only if they are
- # already present (overrides do not introduce new variables).
- vars_dict.update(value)
- if builtin_vars:
- vars_dict.update(builtin_vars)
- if vars_override:
- vars_dict.update(
- {k: v
- for k, v in vars_override.items() if k in vars_dict})
- for name, node in statements.items():
- value = _gclient_eval(node, filename, vars_dict)
- local_scope.SetNode(name, value, node)
- try:
- return _GCLIENT_SCHEMA.validate(local_scope)
- except schema.SchemaError as e:
- raise gclient_utils.Error(str(e))
- def _StandardizeDeps(deps_dict, vars_dict):
- """"Standardizes the deps_dict.
- For each dependency:
- - Expands the variable in the dependency name.
- - Ensures the dependency is a dictionary.
- - Set's the 'dep_type' to be 'git' by default.
- """
- new_deps_dict = {}
- for dep_name, dep_info in deps_dict.items():
- dep_name = dep_name.format(**vars_dict)
- if not isinstance(dep_info, collections.abc.Mapping):
- dep_info = {'url': dep_info}
- dep_info.setdefault('dep_type', 'git')
- new_deps_dict[dep_name] = dep_info
- return new_deps_dict
- def _MergeDepsOs(deps_dict, os_deps_dict, os_name):
- """Merges the deps in os_deps_dict into conditional dependencies in deps_dict.
- The dependencies in os_deps_dict are transformed into conditional dependencies
- using |'checkout_' + os_name|.
- If the dependency is already present, the URL and revision must coincide.
- """
- for dep_name, dep_info in os_deps_dict.items():
- # Make this condition very visible, so it's not a silent failure.
- # It's unclear how to support None override in deps_os.
- if dep_info['url'] is None:
- logging.error('Ignoring %r:%r in %r deps_os', dep_name, dep_info,
- os_name)
- continue
- os_condition = 'checkout_' + (os_name if os_name != 'unix' else 'linux')
- UpdateCondition(dep_info, 'and', os_condition)
- if dep_name in deps_dict:
- if deps_dict[dep_name]['url'] != dep_info['url']:
- raise gclient_utils.Error(
- 'Value from deps_os (%r; %r: %r) conflicts with existing deps '
- 'entry (%r).' %
- (os_name, dep_name, dep_info, deps_dict[dep_name]))
- UpdateCondition(dep_info, 'or',
- deps_dict[dep_name].get('condition'))
- deps_dict[dep_name] = dep_info
- def UpdateCondition(info_dict, op, new_condition):
- """Updates info_dict's condition with |new_condition|.
- An absent value is treated as implicitly True.
- """
- curr_condition = info_dict.get('condition')
- # Easy case: Both are present.
- if curr_condition and new_condition:
- info_dict['condition'] = '(%s) %s (%s)' % (curr_condition, op,
- new_condition)
- # If |op| == 'and', and at least one condition is present, then use it.
- elif op == 'and' and (curr_condition or new_condition):
- info_dict['condition'] = curr_condition or new_condition
- # Otherwise, no condition should be set
- elif curr_condition:
- del info_dict['condition']
- def Parse(content, filename, vars_override=None, builtin_vars=None):
- """Parses DEPS strings.
- Executes the Python-like string stored in content, resulting in a Python
- dictionary specified by the schema above. Supports syntax validation and
- variable expansion.
- Args:
- content: str. DEPS file stored as a string.
- filename: str. The name of the DEPS file, or a string describing the source
- of the content, e.g. '<string>', '<unknown>'.
- vars_override: dict, optional. A dictionary with overrides for the variables
- defined by the DEPS file.
- builtin_vars: dict, optional. A dictionary with variables that are provided
- by default.
- Returns:
- A Python dict with the parsed contents of the DEPS file, as specified by the
- schema above.
- """
- result = Exec(content, filename, vars_override, builtin_vars)
- vars_dict = result.get('vars', {})
- if 'deps' in result:
- result['deps'] = _StandardizeDeps(result['deps'], vars_dict)
- if 'deps_os' in result:
- deps = result.setdefault('deps', {})
- for os_name, os_deps in result['deps_os'].items():
- os_deps = _StandardizeDeps(os_deps, vars_dict)
- _MergeDepsOs(deps, os_deps, os_name)
- del result['deps_os']
- if 'hooks_os' in result:
- hooks = result.setdefault('hooks', [])
- for os_name, os_hooks in result['hooks_os'].items():
- for hook in os_hooks:
- UpdateCondition(hook, 'and', 'checkout_' + os_name)
- hooks.extend(os_hooks)
- del result['hooks_os']
- return result
- def EvaluateCondition(condition, variables, referenced_variables=None):
- """Safely evaluates a boolean condition. Returns the result."""
- if not referenced_variables:
- referenced_variables = set()
- _allowed_names = {'None': None, 'True': True, 'False': False}
- main_node = ast.parse(condition, mode='eval')
- if isinstance(main_node, ast.Expression):
- main_node = main_node.body
- def _convert(node, allow_tuple=False):
- if isinstance(node, ast.Str):
- return node.s
- if isinstance(node, ast.Tuple) and allow_tuple:
- return tuple(map(_convert, node.elts))
- if isinstance(node, ast.Name):
- if node.id in referenced_variables:
- raise ValueError('invalid cyclic reference to %r (inside %r)' %
- (node.id, condition))
- if node.id in _allowed_names:
- return _allowed_names[node.id]
- if node.id in variables:
- value = variables[node.id]
- # Allow using "native" types, without wrapping everything in
- # strings. Note that schema constraints still apply to
- # variables.
- if not isinstance(value, str):
- return value
- # Recursively evaluate the variable reference.
- return EvaluateCondition(variables[node.id], variables,
- referenced_variables.union([node.id]))
- # Implicitly convert unrecognized names to strings.
- # If we want to change this, we'll need to explicitly distinguish
- # between arguments for GN to be passed verbatim, and ones to
- # be evaluated.
- return node.id
- if not sys.version_info[:2] < (3, 4) and isinstance(
- node, ast.NameConstant): # Since Python 3.4
- return node.value
- if isinstance(node, ast.BoolOp) and isinstance(node.op, ast.Or):
- bool_values = []
- for value in node.values:
- bool_values.append(_convert(value))
- if not isinstance(bool_values[-1], bool):
- raise ValueError('invalid "or" operand %r (inside %r)' %
- (bool_values[-1], condition))
- return any(bool_values)
- if isinstance(node, ast.BoolOp) and isinstance(node.op, ast.And):
- bool_values = []
- for value in node.values:
- bool_values.append(_convert(value))
- if not isinstance(bool_values[-1], bool):
- raise ValueError('invalid "and" operand %r (inside %r)' %
- (bool_values[-1], condition))
- return all(bool_values)
- if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not):
- value = _convert(node.operand)
- if not isinstance(value, bool):
- raise ValueError('invalid "not" operand %r (inside %r)' %
- (value, condition))
- return not value
- if isinstance(node, ast.Compare):
- if len(node.ops) != 1:
- raise ValueError(
- 'invalid compare: exactly 1 operator required (inside %r)' %
- (condition))
- if len(node.comparators) != 1:
- raise ValueError(
- 'invalid compare: exactly 1 comparator required (inside %r)'
- % (condition))
- left = _convert(node.left)
- right = _convert(node.comparators[0],
- allow_tuple=isinstance(node.ops[0], ast.In))
- if isinstance(node.ops[0], ast.Eq):
- return left == right
- if isinstance(node.ops[0], ast.NotEq):
- return left != right
- if isinstance(node.ops[0], ast.In):
- return left in right
- raise ValueError('unexpected operator: %s %s (inside %r)' %
- (node.ops[0], ast.dump(node), condition))
- raise ValueError('unexpected AST node: %s %s (inside %r)' %
- (node, ast.dump(node), condition))
- return _convert(main_node)
- def RenderDEPSFile(gclient_dict):
- contents = sorted(gclient_dict.tokens.values(), key=lambda token: token[2])
- return tokenize.untokenize(contents)
- def _UpdateAstString(tokens, node, value):
- if isinstance(node, ast.Call):
- node = node.args[0]
- position = node.lineno, node.col_offset
- quote_char = ''
- if isinstance(node, ast.Str):
- quote_char = tokens[position][1][0]
- value = value.encode('unicode_escape').decode('utf-8')
- tokens[position][1] = quote_char + value + quote_char
- node.s = value
- def _ShiftLinesInTokens(tokens, delta, start):
- new_tokens = {}
- for token in tokens.values():
- if token[2][0] >= start:
- token[2] = token[2][0] + delta, token[2][1]
- token[3] = token[3][0] + delta, token[3][1]
- new_tokens[token[2]] = token
- return new_tokens
- def AddVar(gclient_dict, var_name, value):
- if not isinstance(gclient_dict, _NodeDict) or gclient_dict.tokens is None:
- raise ValueError(
- "Can't use SetVar for the given gclient dict. It contains no "
- "formatting information.")
- if 'vars' not in gclient_dict:
- raise KeyError("vars dict is not defined.")
- if var_name in gclient_dict['vars']:
- raise ValueError(
- "%s has already been declared in the vars dict. Consider using SetVar "
- "instead." % var_name)
- if not gclient_dict['vars']:
- raise ValueError('vars dict is empty. This is not yet supported.')
- # We will attempt to add the var right after 'vars = {'.
- node = gclient_dict.GetNode('vars')
- if node is None:
- raise ValueError("The vars dict has no formatting information." %
- var_name)
- line = node.lineno + 1
- # We will try to match the new var's indentation to the next variable.
- col = node.keys[0].col_offset
- # We use a minimal Python dictionary, so that ast can parse it.
- var_content = '{\n%s"%s": "%s",\n}\n' % (' ' * col, var_name, value)
- var_ast = ast.parse(var_content).body[0].value
- # Set the ast nodes for the key and value.
- vars_node = gclient_dict.GetNode('vars')
- var_name_node = var_ast.keys[0]
- var_name_node.lineno += line - 2
- vars_node.keys.insert(0, var_name_node)
- value_node = var_ast.values[0]
- value_node.lineno += line - 2
- vars_node.values.insert(0, value_node)
- # Update the tokens.
- var_tokens = list(tokenize.generate_tokens(StringIO(var_content).readline))
- var_tokens = {
- token[2]: list(token)
- # Ignore the tokens corresponding to braces and new lines.
- for token in var_tokens[2:-3]
- }
- gclient_dict.tokens = _ShiftLinesInTokens(gclient_dict.tokens, 1, line)
- gclient_dict.tokens.update(_ShiftLinesInTokens(var_tokens, line - 2, 0))
- def SetVar(gclient_dict, var_name, value):
- if not isinstance(gclient_dict, _NodeDict) or gclient_dict.tokens is None:
- raise ValueError(
- "Can't use SetVar for the given gclient dict. It contains no "
- "formatting information.")
- tokens = gclient_dict.tokens
- if 'vars' not in gclient_dict:
- raise KeyError("vars dict is not defined.")
- if var_name not in gclient_dict['vars']:
- raise ValueError(
- "%s has not been declared in the vars dict. Consider using AddVar "
- "instead." % var_name)
- node = gclient_dict['vars'].GetNode(var_name)
- if node is None:
- raise ValueError(
- "The vars entry for %s has no formatting information." % var_name)
- _UpdateAstString(tokens, node, value)
- gclient_dict['vars'].SetNode(var_name, value, node)
- def _GetVarName(node):
- if isinstance(node, ast.Call):
- return node.args[0].s
- if node.s.endswith('}'):
- last_brace = node.s.rfind('{')
- return node.s[last_brace + 1:-1]
- return None
- def SetGCS(gclient_dict, dep_name, new_objects):
- if not isinstance(gclient_dict, _NodeDict) or gclient_dict.tokens is None:
- raise ValueError(
- "Can't use SetGCS for the given gclient dict. It contains no "
- "formatting information.")
- tokens = gclient_dict.tokens
- if 'deps' not in gclient_dict or dep_name not in gclient_dict['deps']:
- raise KeyError("Could not find any dependency called %s." % dep_name)
- node = gclient_dict['deps'][dep_name]
- objects_node = node.GetNode('objects')
- if len(objects_node.elts) != len(new_objects):
- raise ValueError("Number of revision objects must match the current "
- "number of objects.")
- # Allow only `keys_to_update` to be updated.
- keys_to_update = ('object_name', 'sha256sum', 'size_bytes', 'generation')
- for index, object_node in enumerate(objects_node.elts):
- for key, value in zip(object_node.keys, object_node.values):
- if key.s not in keys_to_update:
- continue
- _UpdateAstString(tokens, value, new_objects[index][key.s])
- node.SetNode('objects', new_objects, objects_node)
- def SetCIPD(gclient_dict, dep_name, package_name, new_version):
- if not isinstance(gclient_dict, _NodeDict) or gclient_dict.tokens is None:
- raise ValueError(
- "Can't use SetCIPD for the given gclient dict. It contains no "
- "formatting information.")
- tokens = gclient_dict.tokens
- if 'deps' not in gclient_dict or dep_name not in gclient_dict['deps']:
- raise KeyError("Could not find any dependency called %s." % dep_name)
- # Find the package with the given name
- packages = [
- package for package in gclient_dict['deps'][dep_name]['packages']
- if package['package'] == package_name
- ]
- if len(packages) != 1:
- raise ValueError(
- "There must be exactly one package with the given name (%s), "
- "%s were found." % (package_name, len(packages)))
- # TODO(ehmaldonado): Support Var in package's version.
- node = packages[0].GetNode('version')
- if node is None:
- raise ValueError(
- "The deps entry for %s:%s has no formatting information." %
- (dep_name, package_name))
- if not isinstance(node, ast.Call) and not isinstance(node, ast.Str):
- raise ValueError(
- "Unsupported dependency revision format. Please file a bug to the "
- "Infra>SDK component in crbug.com")
- var_name = _GetVarName(node)
- if var_name is not None:
- SetVar(gclient_dict, var_name, new_version)
- else:
- _UpdateAstString(tokens, node, new_version)
- packages[0].SetNode('version', new_version, node)
- def SetRevision(gclient_dict, dep_name, new_revision):
- def _UpdateRevision(dep_dict, dep_key, new_revision):
- dep_node = dep_dict.GetNode(dep_key)
- if dep_node is None:
- raise ValueError(
- "The deps entry for %s has no formatting information." %
- dep_name)
- node = dep_node
- if isinstance(node, ast.BinOp):
- node = node.right
- if isinstance(node, ast.Str):
- token = _gclient_eval(tokens[node.lineno, node.col_offset][1])
- if token != node.s:
- raise ValueError(
- 'Can\'t update value for %s. Multiline strings and implicitly '
- 'concatenated strings are not supported.\n'
- 'Consider reformatting the DEPS file.' % dep_key)
- if not isinstance(node, ast.Call) and not isinstance(node, ast.Str):
- raise ValueError(
- "Unsupported dependency revision format. Please file a bug to the "
- "Infra>SDK component in crbug.com")
- var_name = _GetVarName(node)
- if var_name is not None:
- SetVar(gclient_dict, var_name, new_revision)
- else:
- if '@' in node.s:
- # '@' is part of the last string, which we want to modify.
- # Discard whatever was after the '@' and put the new revision in
- # its place.
- new_revision = node.s.split('@')[0] + '@' + new_revision
- elif '@' not in dep_dict[dep_key]:
- # '@' is not part of the URL at all. This mean the dependency is
- # unpinned and we should pin it.
- new_revision = node.s + '@' + new_revision
- _UpdateAstString(tokens, node, new_revision)
- dep_dict.SetNode(dep_key, new_revision, node)
- if not isinstance(gclient_dict, _NodeDict) or gclient_dict.tokens is None:
- raise ValueError(
- "Can't use SetRevision for the given gclient dict. It contains no "
- "formatting information.")
- tokens = gclient_dict.tokens
- if 'deps' not in gclient_dict or dep_name not in gclient_dict['deps']:
- raise KeyError("Could not find any dependency called %s." % dep_name)
- if isinstance(gclient_dict['deps'][dep_name], _NodeDict):
- _UpdateRevision(gclient_dict['deps'][dep_name], 'url', new_revision)
- else:
- _UpdateRevision(gclient_dict['deps'], dep_name, new_revision)
- def GetVar(gclient_dict, var_name):
- if 'vars' not in gclient_dict or var_name not in gclient_dict['vars']:
- raise KeyError("Could not find any variable called %s." % var_name)
- val = gclient_dict['vars'][var_name]
- if isinstance(val, ConstantString):
- return val.value
- return val
- def GetCIPD(gclient_dict, dep_name, package_name):
- if 'deps' not in gclient_dict or dep_name not in gclient_dict['deps']:
- raise KeyError("Could not find any dependency called %s." % dep_name)
- # Find the package with the given name
- packages = [
- package for package in gclient_dict['deps'][dep_name]['packages']
- if package['package'] == package_name
- ]
- if len(packages) != 1:
- raise ValueError(
- "There must be exactly one package with the given name (%s), "
- "%s were found." % (package_name, len(packages)))
- return packages[0]['version']
- def GetRevision(gclient_dict, dep_name):
- if 'deps' not in gclient_dict or dep_name not in gclient_dict['deps']:
- suggestions = []
- if 'deps' in gclient_dict:
- for key in gclient_dict['deps']:
- if dep_name in key:
- suggestions.append(key)
- if suggestions:
- raise KeyError(
- "Could not find any dependency called %s. Did you mean %s" %
- (dep_name, ' or '.join(suggestions)))
- raise KeyError("Could not find any dependency called %s." % dep_name)
- dep = gclient_dict['deps'][dep_name]
- if dep is None:
- return None
- if isinstance(dep, str):
- _, _, revision = dep.partition('@')
- return revision or None
- if isinstance(dep, collections.abc.Mapping) and 'url' in dep:
- _, _, revision = dep['url'].partition('@')
- return revision or None
- if isinstance(gclient_dict, _NodeDict) and 'objects' in dep:
- return dep['objects']
- raise ValueError('%s is not a valid git or gcs dependency.' % dep_name)
|