]>
Commit | Line | Data |
---|---|---|
705973e7 SW |
1 | # paperdoorknob: Print glowfic |
2 | # | |
3 | # This program is free software: you can redistribute it and/or modify it | |
4 | # under the terms of the GNU General Public License as published by the | |
5 | # Free Software Foundation, version 3. | |
6 | ||
7 | ||
8 | from abc import ABC, abstractmethod | |
9 | from contextlib import contextmanager | |
75c5665e | 10 | from sys import stderr |
47e94008 | 11 | from typing import Callable, Iterator |
705973e7 SW |
12 | |
13 | import requests | |
14 | import requests_cache | |
15 | ||
e3b5fdf0 | 16 | from version import paperdoorknob_version |
705973e7 | 17 | |
e3b5fdf0 SW |
18 | |
19 | _headers = {'User-Agent': f'paperdoorknob/{paperdoorknob_version} ' + | |
20 | '(https://git.scottworley.com/paperdoorknob/)'} | |
1e86dcaa SW |
21 | |
22 | ||
705973e7 SW |
23 | class Fetcher(ABC): |
24 | @abstractmethod | |
25 | def fetch(self, url: str) -> bytes: | |
26 | raise NotImplementedError() | |
27 | ||
28 | ||
29 | class _SessionFetcher(Fetcher): | |
30 | ||
31 | def __init__(self, session: requests.Session, timeout: int) -> None: | |
32 | self._session = session | |
33 | self._timeout = timeout | |
34 | ||
35 | def fetch(self, url: str) -> bytes: | |
1e86dcaa | 36 | with self._session.get(url, timeout=self._timeout, headers=_headers) as r: |
705973e7 SW |
37 | r.raise_for_status() |
38 | return r.content | |
39 | ||
40 | ||
75c5665e SW |
41 | class _CachingFetcher(Fetcher): |
42 | ||
43 | def __init__( | |
44 | self, | |
45 | session: requests_cache.CachedSession, | |
46 | timeout: int) -> None: | |
47 | self._session = session | |
48 | self._timeout = timeout | |
49 | self._request_count = 0 | |
50 | self._cache_hit_count = 0 | |
51 | ||
52 | def fetch(self, url: str) -> bytes: | |
1e86dcaa | 53 | with self._session.get(url, timeout=self._timeout, headers=_headers) as r: |
75c5665e SW |
54 | r.raise_for_status() |
55 | self._request_count += 1 | |
56 | self._cache_hit_count += int(r.from_cache) | |
57 | return r.content | |
58 | ||
59 | @property | |
60 | def request_count(self) -> int: | |
61 | return self._request_count | |
62 | ||
63 | @property | |
64 | def cache_hit_count(self) -> int: | |
65 | return self._cache_hit_count | |
66 | ||
67 | ||
705973e7 SW |
68 | @contextmanager |
69 | def DirectFetcher(timeout: int) -> Iterator[_SessionFetcher]: | |
70 | with requests.session() as session: | |
71 | yield _SessionFetcher(session, timeout) | |
72 | ||
73 | ||
74 | @contextmanager | |
75c5665e | 75 | def CachingFetcher( |
47e94008 SW |
76 | cache_path: str, |
77 | timeout: int, | |
78 | log: Callable[[str], None] = lambda x: print(x, file=stderr), | |
79 | ) -> Iterator[_CachingFetcher]: | |
705973e7 | 80 | with requests_cache.CachedSession(cache_path, cache_control=True) as session: |
75c5665e SW |
81 | fetcher = _CachingFetcher(session, timeout) |
82 | yield fetcher | |
83 | if fetcher.request_count > 0: | |
84 | percent = 100.0 * fetcher.cache_hit_count / fetcher.request_count | |
47e94008 | 85 | log(f"Fetch cache hits: {fetcher.cache_hit_count} ({percent:.1f}%)") |
38621839 SW |
86 | |
87 | ||
88 | class FakeFetcher(Fetcher): | |
89 | ||
90 | def __init__(self, resources: dict[str, bytes]) -> None: | |
91 | self._resources = resources | |
91fe9916 | 92 | self._fetch_count = 0 |
38621839 SW |
93 | |
94 | def fetch(self, url: str) -> bytes: | |
91fe9916 | 95 | self._fetch_count += 1 |
38621839 | 96 | if url not in self._resources: |
ae7b6283 | 97 | raise requests.HTTPError("URL not found", url) |
38621839 | 98 | return self._resources[url] |
91fe9916 SW |
99 | |
100 | def request_count(self) -> int: | |
101 | return self._fetch_count |