+def git_checkout(
+ v: Verification,
+ channel: TarrableSearchPath,
+ pin: GitPin,
+ dest: str) -> None:
+ v.status('Checking out corresponding git revision')
+ with subprocess.Popen(
+ ['git', '-C', git_cache.git_cachedir(channel.git_repo), 'archive', pin.git_revision],
+ stdout=subprocess.PIPE) as git:
+ with subprocess.Popen(['tar', 'x', '-C', dest, '-f', '-'], stdin=git.stdout) as tar:
+ if git.stdout:
+ git.stdout.close()
+ tar.wait()
+ git.wait()
+ v.result(git.returncode == 0 and tar.returncode == 0)
+
+
+def git_get_tarball(
+ v: Verification,
+ channel: TarrableSearchPath,
+ pin: GitPin) -> str:
+ cache_file = tarball_cache_file(channel, pin)
+ if os.path.exists(cache_file):
+ with open(cache_file, encoding='utf-8') as f:
+ cached_tarball = f.read(9999)
+ if os.path.exists(cached_tarball):
+ return cached_tarball
+
+ with tempfile.TemporaryDirectory() as output_dir:
+ output_filename = os.path.join(
+ output_dir, pin.release_name + '.tar.xz')
+ with open(output_filename, 'w', encoding='utf-8') as output_file:
+ v.status(
+ 'Generating tarball for git revision %s' %
+ pin.git_revision)
+ with subprocess.Popen(
+ ['git', '-C', git_cache.git_cachedir(channel.git_repo),
+ 'archive', '--prefix=%s/' % pin.release_name, pin.git_revision],
+ stdout=subprocess.PIPE) as git:
+ with subprocess.Popen(['xz'], stdin=git.stdout, stdout=output_file) as xz:
+ xz.wait()
+ git.wait()
+ v.result(git.returncode == 0 and xz.returncode == 0)
+
+ store_tarball = copy_to_nix_store(v, output_filename)
+
+ os.makedirs(os.path.dirname(cache_file), exist_ok=True)
+ with open(cache_file, 'w', encoding='utf-8') as f:
+ f.write(store_tarball)
+ return store_tarball # type: ignore # (for old mypy)
+
+
+def check_channel_metadata(
+ v: Verification,
+ pin: GitPin,
+ channel_contents: str) -> None:
+ v.status('Verifying git commit in channel tarball')
+ with open(os.path.join(channel_contents, pin.release_name, '.git-revision'),
+ encoding='utf-8') as f:
+ v.result(f.read(999) == pin.git_revision)
+
+ v.status(
+ 'Verifying version-suffix is a suffix of release name %s:' %
+ pin.release_name)
+ with open(os.path.join(channel_contents, pin.release_name, '.version-suffix'),
+ encoding='utf-8') as f:
+ version_suffix = f.read(999)
+ v.status(version_suffix)
+ v.result(pin.release_name.endswith(version_suffix))
+
+
+def check_channel_contents(
+ v: Verification,
+ channel: TarrableSearchPath,
+ table: Dict[str, ChannelTableEntry],
+ pin: GitPin) -> None:
+ with tempfile.TemporaryDirectory() as channel_contents, \
+ tempfile.TemporaryDirectory() as git_contents:
+
+ extract_tarball(v, table, channel_contents)
+ check_channel_metadata(v, pin, channel_contents)
+
+ git_checkout(v, channel, pin, git_contents)
+
+ compare_tarball_and_git(v, pin, channel_contents, git_contents)
+
+ v.status('Removing temporary directories')
+ v.ok()
+
+
+def git_revision_name(
+ v: Verification,
+ channel: TarrableSearchPath,
+ git_revision: str) -> str:
+ v.status('Getting commit date')
+ process = subprocess.run(['git',
+ '-C',
+ git_cache.git_cachedir(channel.git_repo),
+ 'log',
+ '-n1',
+ '--format=%ct-%h',
+ '--abbrev=11',
+ '--no-show-signature',
+ git_revision],
+ stdout=subprocess.PIPE)
+ v.result(process.returncode == 0 and process.stdout != b'')
+ return '%s-%s' % (os.path.basename(channel.git_repo),
+ process.stdout.decode().strip())
+
+
+K = TypeVar('K')
+V = TypeVar('V')
+
+
+def partition_dict(pred: Callable[[K, V], bool],
+ d: Dict[K, V]) -> Tuple[Dict[K, V], Dict[K, V]]:
+ selected: Dict[K, V] = {}
+ remaining: Dict[K, V] = {}
+ for k, v in d.items():
+ if pred(k, v):
+ selected[k] = v
+ else:
+ remaining[k] = v
+ return selected, remaining
+
+
+def filter_dict(d: Dict[K, V], fields: Set[K]
+ ) -> Tuple[Dict[K, V], Dict[K, V]]:
+ return partition_dict(lambda k, v: k in fields, d)
+
+
+def read_config_section(
+ conf: configparser.SectionProxy) -> Tuple[SearchPath, Optional[Pin]]:
+ mapping: Mapping[str, Tuple[Type[SearchPath], Type[Pin]]] = {
+ 'alias': (AliasSearchPath, AliasPin),
+ 'channel': (ChannelSearchPath, ChannelPin),
+ 'git': (GitSearchPath, GitPin),
+ 'symlink': (SymlinkSearchPath, SymlinkPin),
+ }
+ SP, P = mapping[conf['type']]
+ _, all_fields = filter_dict(dict(conf.items()), set(['type']))
+ pin_fields, remaining_fields = filter_dict(all_fields, set(P._fields))
+ # Error suppression works around https://github.com/python/mypy/issues/9007
+ pin_present = pin_fields != {} or P._fields == ()
+ pin = P(**pin_fields) if pin_present else None # type: ignore
+ return SP(**remaining_fields), pin
+
+
+def read_pinned_config_section(
+ section: str, conf: configparser.SectionProxy) -> Tuple[SearchPath, Pin]:
+ sp, pin = read_config_section(conf)
+ if pin is None:
+ raise Exception(
+ 'Cannot update unpinned channel "%s" (Run "pin" before "update")' %
+ section)
+ return sp, pin
+
+
+def read_config(filename: str) -> configparser.ConfigParser:
+ config = configparser.ConfigParser()
+ with open(filename, encoding='utf-8') as f:
+ config.read_file(f, filename)
+ return config
+
+
+def read_config_files(
+ filenames: Iterable[str]) -> Dict[str, configparser.SectionProxy]:
+ merged_config: Dict[str, configparser.SectionProxy] = {}
+ for file in filenames:
+ config = read_config(file)
+ for section in config.sections():
+ if section in merged_config:
+ raise Exception('Duplicate channel "%s"' % section)
+ merged_config[section] = config[section]
+ return merged_config
+
+
+def pinCommand(args: argparse.Namespace) -> None:
+ v = Verification()
+ config = read_config(args.channels_file)
+ for section in config.sections():
+ if args.channels and section not in args.channels:
+ continue
+
+ sp, old_pin = read_config_section(config[section])
+
+ config[section].update(sp.pin(v, old_pin)._asdict())
+
+ with open(args.channels_file, 'w', encoding='utf-8') as configfile:
+ config.write(configfile)
+
+
+def updateCommand(args: argparse.Namespace) -> None: