]> git.scottworley.com Git - git-cache/blobdiff - git_cache.py
Specify license
[git-cache] / git_cache.py
index b0f992e76589666ee86f5f8d8e27f51142af71c5..a34c3291b3e6f359e4430a3e9d5720f6923487ce 100644 (file)
@@ -1,3 +1,10 @@
+# git-cache: Cache git content locally
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, version 3.
+
+
 # It would be nice if we could share the nix git cache, but as of the
 # time of writing it is transitioning from gitv2 (deprecated) to gitv3
 # (not ready yet), and trying to straddle them both is too far into nix
 # It would be nice if we could share the nix git cache, but as of the
 # time of writing it is transitioning from gitv2 (deprecated) to gitv3
 # (not ready yet), and trying to straddle them both is too far into nix
@@ -78,14 +85,14 @@ def verify_ancestry(
         ancestor: RefOrRev,
         force: bool = False) -> None:
     if not force and not is_ancestor(repo, descendant, ancestor):
         ancestor: RefOrRev,
         force: bool = False) -> None:
     if not force and not is_ancestor(repo, descendant, ancestor):
-        raise Exception('%s is not an ancestor of %s' % (ancestor, descendant))
+        raise Exception(f'{ancestor} is not an ancestor of {descendant}')
 
 
 def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
     filename = _log_filename(repo)
     if not os.path.exists(filename):
         return
 
 
 def _read_fetch_log(repo: Repo) -> Iterator[_LogEntry]:
     filename = _log_filename(repo)
     if not os.path.exists(filename):
         return
-    with open(filename, 'r') as f:
+    with open(filename, 'r', encoding='utf-8') as f:
         for line in f:
             _, _, rev, ref = line.strip().split(maxsplit=3)
             yield _LogEntry(ref, rev)
         for line in f:
             _, _, rev, ref = line.strip().split(maxsplit=3)
             yield _LogEntry(ref, rev)
@@ -107,10 +114,11 @@ def _log_fetch(repo: Repo, ref: Ref, rev: Rev, force: bool = False) -> None:
             verify_ancestry(repo, rev, prev_rev)
     filename = _log_filename(repo)
     os.makedirs(os.path.dirname(filename), exist_ok=True)
             verify_ancestry(repo, rev, prev_rev)
     filename = _log_filename(repo)
     os.makedirs(os.path.dirname(filename), exist_ok=True)
-    with open(filename, 'a') as f:
-        f.write('%s %s %s %s\n' %
-                (time.strftime('%Y-%m%d-%H:%M:%S%z'),
-                 ('FORCEDFETCH' if force else 'fetch'), rev, ref))
+    with open(filename, 'a', encoding='utf-8') as f:
+        f.write(
+            f'{time.strftime("%Y-%m%d-%H:%M:%S%z")} '
+            f'{"FORCEDFETCH" if force else "fetch"} {rev} {ref}\n'
+        )
 
 
 def _show_force_warning() -> None:
 
 
 def _show_force_warning() -> None:
@@ -150,13 +158,12 @@ def _show_force_warning() -> None:
 ''', end='', file=sys.stderr)
 
     for i in range(warn_time, 0, -1):
 ''', end='', file=sys.stderr)
 
     for i in range(warn_time, 0, -1):
-        msg = '* %-70s *' % ("Continuing in %d seconds..." % i)
+        msg = f'* {f"Continuing in {i} seconds...":-70s} *'
         print(msg, file=sys.stderr)
         time.sleep(1)
     print('*' * 74, file=sys.stderr)
 
 
         print(msg, file=sys.stderr)
         time.sleep(1)
     print('*' * 74, file=sys.stderr)
 
 
-
 @backoff.on_exception(
     backoff.expo,
     subprocess.CalledProcessError,
 @backoff.on_exception(
     backoff.expo,
     subprocess.CalledProcessError,
@@ -168,7 +175,7 @@ def _git_fetch(
         force: bool = False) -> None:
     subprocess.run(['git', '-C', cachedir, 'fetch'] +
                    (['--force'] if force else []) +
         force: bool = False) -> None:
     subprocess.run(['git', '-C', cachedir, 'fetch'] +
                    (['--force'] if force else []) +
-                   [repo, '%s:%s' % (ref, ref)], check=True)
+                   [repo, f'{ref}:{ref}'], check=True)
 
 
 def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
 
 
 def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
@@ -177,13 +184,19 @@ def fetch(repo: Repo, ref: Ref, force: bool = False) -> Tuple[Path, Rev]:
     cachedir = git_cachedir(repo)
     if not os.path.exists(cachedir):
         logging.debug("Initializing git repo")
     cachedir = git_cachedir(repo)
     if not os.path.exists(cachedir):
         logging.debug("Initializing git repo")
-        subprocess.run(['git', 'init', '--bare', cachedir],
-                       check=True, stdout=sys.stderr)
+        subprocess.run(['git',
+                        '-c',
+                        'init.defaultBranch=git-cache--no-default-branch',
+                        'init',
+                        '--bare',
+                        cachedir],
+                       check=True,
+                       stdout=sys.stderr)
 
     logging.debug('Fetching ref "%s" from %s', ref, repo)
     _git_fetch(cachedir, repo, ref, force=force)
 
 
     logging.debug('Fetching ref "%s" from %s', ref, repo)
     _git_fetch(cachedir, repo, ref, force=force)
 
-    with open(os.path.join(cachedir, 'refs', 'heads', ref)) as rev_file:
+    with open(os.path.join(cachedir, 'refs', 'heads', ref), encoding='utf-8') as rev_file:
         rev = Rev(rev_file.read(999).strip())
     verify_ancestry(repo, ref, rev, force=force)
     _log_fetch(repo, ref, rev, force=force)
         rev = Rev(rev_file.read(999).strip())
     verify_ancestry(repo, ref, rev, force=force)
     _log_fetch(repo, ref, rev, force=force)
@@ -239,7 +252,8 @@ def _main() -> None:
     args = parser.parse_args()
 
     if args.rev is None:
     args = parser.parse_args()
 
     if args.rev is None:
-        print('{1} {0}'.format(*fetch(args.repo, args.ref, force=args.force)))
+        cachedir, rev = fetch(args.repo, args.ref, force=args.force)
+        print(f'{rev} {cachedir}')
     else:
         print(
             ensure_rev_available(
     else:
         print(
             ensure_rev_available(