]>
Commit | Line | Data |
---|---|---|
705973e7 SW |
1 | # paperdoorknob: Print glowfic |
2 | # | |
3 | # This program is free software: you can redistribute it and/or modify it | |
4 | # under the terms of the GNU General Public License as published by the | |
5 | # Free Software Foundation, version 3. | |
6 | ||
7 | ||
8 | from abc import ABC, abstractmethod | |
9 | from contextlib import contextmanager | |
75c5665e SW |
10 | from sys import stderr |
11 | from typing import IO, Iterator | |
705973e7 SW |
12 | |
13 | import requests | |
14 | import requests_cache | |
15 | ||
16 | ||
1e86dcaa SW |
17 | _headers = { |
18 | 'User-Agent': 'paperdoorknob/0.0.1 (https://git.scottworley.com/paperdoorknob/)'} | |
19 | ||
20 | ||
705973e7 SW |
21 | class Fetcher(ABC): |
22 | @abstractmethod | |
23 | def fetch(self, url: str) -> bytes: | |
24 | raise NotImplementedError() | |
25 | ||
26 | ||
27 | class _SessionFetcher(Fetcher): | |
28 | ||
29 | def __init__(self, session: requests.Session, timeout: int) -> None: | |
30 | self._session = session | |
31 | self._timeout = timeout | |
32 | ||
33 | def fetch(self, url: str) -> bytes: | |
1e86dcaa | 34 | with self._session.get(url, timeout=self._timeout, headers=_headers) as r: |
705973e7 SW |
35 | r.raise_for_status() |
36 | return r.content | |
37 | ||
38 | ||
75c5665e SW |
39 | class _CachingFetcher(Fetcher): |
40 | ||
41 | def __init__( | |
42 | self, | |
43 | session: requests_cache.CachedSession, | |
44 | timeout: int) -> None: | |
45 | self._session = session | |
46 | self._timeout = timeout | |
47 | self._request_count = 0 | |
48 | self._cache_hit_count = 0 | |
49 | ||
50 | def fetch(self, url: str) -> bytes: | |
1e86dcaa | 51 | with self._session.get(url, timeout=self._timeout, headers=_headers) as r: |
75c5665e SW |
52 | r.raise_for_status() |
53 | self._request_count += 1 | |
54 | self._cache_hit_count += int(r.from_cache) | |
55 | return r.content | |
56 | ||
57 | @property | |
58 | def request_count(self) -> int: | |
59 | return self._request_count | |
60 | ||
61 | @property | |
62 | def cache_hit_count(self) -> int: | |
63 | return self._cache_hit_count | |
64 | ||
65 | ||
705973e7 SW |
66 | @contextmanager |
67 | def DirectFetcher(timeout: int) -> Iterator[_SessionFetcher]: | |
68 | with requests.session() as session: | |
69 | yield _SessionFetcher(session, timeout) | |
70 | ||
71 | ||
72 | @contextmanager | |
75c5665e SW |
73 | def CachingFetcher( |
74 | cache_path: str, | |
75 | timeout: int, | |
76 | report_stream: IO[str] = stderr) -> Iterator[_CachingFetcher]: | |
705973e7 | 77 | with requests_cache.CachedSession(cache_path, cache_control=True) as session: |
75c5665e SW |
78 | fetcher = _CachingFetcher(session, timeout) |
79 | yield fetcher | |
80 | if fetcher.request_count > 0: | |
81 | percent = 100.0 * fetcher.cache_hit_count / fetcher.request_count | |
82 | print( | |
83 | f"Fetch cache hits: {fetcher.cache_hit_count} ({percent:.1f}%)", | |
84 | file=report_stream) | |
38621839 SW |
85 | |
86 | ||
87 | class FakeFetcher(Fetcher): | |
88 | ||
89 | def __init__(self, resources: dict[str, bytes]) -> None: | |
90 | self._resources = resources | |
91fe9916 | 91 | self._fetch_count = 0 |
38621839 SW |
92 | |
93 | def fetch(self, url: str) -> bytes: | |
91fe9916 | 94 | self._fetch_count += 1 |
38621839 | 95 | if url not in self._resources: |
ae7b6283 | 96 | raise requests.HTTPError("URL not found", url) |
38621839 | 97 | return self._resources[url] |
91fe9916 SW |
98 | |
99 | def request_count(self) -> int: | |
100 | return self._fetch_count |