123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- #!/usr/bin/env python3
- # Copyright 2024 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """This scripts copies DEPS package information from one source onto
- destination.
- If the destination doesn't have packages, the script errors out.
- Example usage:
- roll_downstream_gcs_deps.py \
- --source some/repo/DEPS \
- --destination some/downstream/repo/DEPS \
- --package src/build/linux/debian_bullseye_amd64-sysroot \
- --package src/build/linux/debian_bullseye_arm64-sysroot
- """
- import argparse
- import ast
- import sys
- from typing import Dict, List
- def _get_deps(deps_ast: ast.Module) -> Dict[str, ast.Dict]:
- """Searches for the deps dict in a DEPS file AST.
- Args:
- deps_ast: AST of the DEPS file.
- Raises:
- Exception: If the deps dict is not found.
- Returns:
- The deps dict.
- """
- for statement in deps_ast.body:
- if not isinstance(statement, ast.Assign):
- continue
- if len(statement.targets) != 1:
- continue
- target = statement.targets[0]
- if not isinstance(target, ast.Name):
- continue
- if target.id != 'deps':
- continue
- if not isinstance(statement.value, ast.Dict):
- continue
- deps = {}
- for key, value in zip(statement.value.keys, statement.value.values):
- if not isinstance(key, ast.Constant):
- continue
- deps[key.value] = value
- return deps
- raise Exception('no deps found')
- def _get_gcs_object_list_ast(package_ast: ast.Dict) -> ast.List:
- """Searches for the objects list in a GCS package AST.
- Args:
- package_ast: AST of the GCS package.
- Raises:
- Exception: If the package is not a GCS package.
- Returns:
- AST of the objects list.
- """
- is_gcs = False
- result = None
- for key, value in zip(package_ast.keys, package_ast.values):
- if not isinstance(key, ast.Constant):
- continue
- if key.value == 'dep_type' and isinstance(
- value, ast.Constant) and value.value == 'gcs':
- is_gcs = True
- if key.value == 'objects' and isinstance(value, ast.List):
- result = value
- assert is_gcs, 'Not a GCS dependency!'
- assert result, 'No objects found!'
- return result
- def _replace_ast(destination: str, dest_ast: ast.Module, source: str,
- source_ast: ast.Module) -> str:
- """Replaces the content of dest_ast with the content of the
- same package in source_ast.
- Args:
- destination: Destination DEPS file content.
- dest_ast: AST in the destination DEPS file that will be replaced.
- source: Source DEPS file content.
- source_ast: AST in the source DEPS file that will replace content of
- destination.
- Returns:
- Content of destination DEPS file with replaced content.
- """
- source_lines = source.splitlines()
- lines = destination.splitlines()
- # Copy all lines before the replaced AST.
- result = '\n'.join(lines[:dest_ast.lineno - 1]) + '\n'
- # Partially copy the line content before AST's value.
- result += lines[dest_ast.lineno - 1][:dest_ast.col_offset]
- # Copy data from source AST.
- if source_ast.lineno == source_ast.end_lineno:
- # Starts and ends on the same line.
- result += source_lines[
- source_ast.lineno -
- 1][source_ast.col_offset:source_ast.end_col_offset]
- else:
- # Copy multiline content from source. The first line and the last line
- # of source AST should be partially copied as `result` has a partial
- # line from `destination`.
- # Partially copy the first line of source AST.
- result += source_lines[source_ast.lineno -
- 1][source_ast.col_offset:] + '\n'
- # Copy content in the middle.
- result += '\n'.join(
- source_lines[source_ast.lineno:source_ast.end_lineno - 1]) + '\n'
- # Partially copy the last line of source AST.
- result += source_lines[source_ast.end_lineno -
- 1][:source_ast.end_col_offset]
- # Copy the rest of the line after the package value.
- result += lines[dest_ast.end_lineno - 1][dest_ast.end_col_offset:] + '\n'
- # Copy the rest of the lines after the package value.
- result += '\n'.join(lines[dest_ast.end_lineno:])
- # Add trailing newline
- if destination.endswith('\n'):
- result += '\n'
- return result
- def copy_packages(source_content: str, destination_content: str,
- source_packages: List[str],
- destination_packages: List[str]) -> str:
- """Copies GCS packages from source to destination.
- Args:
- source: Source DEPS file content.
- destination: Destination DEPS file content.
- packages: List of GCS packages to copy. Only objects are copied.
- Returns:
- Destination DEPS file content with packages copied.
- """
- source_deps = _get_deps(ast.parse(source_content, mode='exec'))
- for i in range(len(source_packages)):
- source_package = source_packages[i]
- destination_package = destination_packages[i]
- if source_package not in source_deps:
- raise Exception('Package %s not found in source' % source_package)
- dest_deps = _get_deps(ast.parse(destination_content, mode='exec'))
- if destination_package not in dest_deps:
- raise Exception('Package %s not found in destination' %
- destination_package)
- destination_content = _replace_ast(
- destination_content,
- _get_gcs_object_list_ast(dest_deps[destination_package]),
- source_content,
- _get_gcs_object_list_ast(source_deps[source_package]))
- return destination_content
- def main():
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument('--source-deps',
- required=True,
- help='Source DEPS file where content will be copied '
- 'from')
- parser.add_argument('--source-package',
- action='append',
- required=True,
- help='List of DEPS packages to update')
- parser.add_argument('--destination-deps',
- required=True,
- help='Destination DEPS file, where content will be '
- 'saved')
- parser.add_argument('--destination-package',
- action='append',
- required=True,
- help='List of DEPS packages to update')
- args = parser.parse_args()
- if not args.source_package:
- parser.error('No source packages specified to roll, aborting...')
- if not args.destination_package:
- parser.error('No destination packages specified to roll, aborting...')
- if len(args.destination_package) != len(args.source_package):
- parser.error('Source and destination packages must be of the same '
- 'length, aborting...')
- with open(args.source_deps) as f:
- source_content = f.read()
- with open(args.destination_deps) as f:
- destination_content = f.read()
- new_content = copy_packages(source_content, destination_content,
- args.source_package, args.destination_package)
- with open(args.destination_deps, 'w') as f:
- f.write(new_content)
- print('Run:')
- print(' Destination DEPS file updated. You still need to create and '
- 'upload a change.')
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|