# under the terms of the GNU General Public License as published by the
# Free Software Foundation, version 3.
+from __future__ import annotations
+
from dataclasses import dataclass
from datetime import datetime
import os
import subprocess
+import sys
from typing import Any, Callable
import gi
@dataclass
-class Recording:
+class Stream:
process: subprocess.Popen[bytes]
+ @staticmethod
+ def start(command: list[str]) -> Stream:
+ # pylint: disable=consider-using-with
+ return Stream(process=subprocess.Popen(command, stdin=subprocess.PIPE))
+
+ def stop(self) -> None:
+ stdin = self.process.stdin
+ assert stdin is not None
+ try:
+ stdin.write(b'q')
+ stdin.flush()
+ except BrokenPipeError:
+ print("Stream already stopped?", file=sys.stderr)
+ self.process.wait()
+
-recording: Recording | None = None
+recording: Stream | None = None
def make_filename() -> str:
return str(n)
+def begin_monitoring_file_size(size_display: Gtk.Label, filename: str) -> None:
+ def update_size_display() -> Any:
+ done = recording is None
+ if done:
+ size_display.set_label('')
+ else:
+ try:
+ size = summarize_size(os.stat(filename).st_size)
+ except FileNotFoundError:
+ size = '--'
+ size_display.set_label(f'<big>{size}</big>')
+ return GLib.SOURCE_REMOVE if done else GLib.SOURCE_CONTINUE
+ GLib.timeout_add_seconds(1, update_size_display)
+
+
def on_start_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
global recording # pylint: disable=global-statement
assert recording is None
filename = make_filename()
- size_display = find_size_display(stack)
-
- def update_size_display() -> Any:
- try:
- size = summarize_size(os.stat(filename).st_size)
- except FileNotFoundError:
- size = '--'
- size_display.set_label(f'<big>{size}</big>')
- return GLib.SOURCE_REMOVE if recording is None else GLib.SOURCE_CONTINUE
- GLib.timeout_add_seconds(1, update_size_display)
+ begin_monitoring_file_size(find_size_display(stack), filename)
- command = (['ffmpeg', '-framerate', '25'] + video_source(stack) +
- ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
- # pylint: disable=consider-using-with
- recording = Recording(
- process=subprocess.Popen(command, stdin=subprocess.PIPE))
+ recording = Stream.start(
+ ['ffmpeg', '-framerate', '25'] + video_source(stack) +
+ ['-f', 'pulse', '-ac', '2', '-i', 'default', filename])
stack.set_visible_child_name("recording")
def on_stop_recording(_: Gtk.Button, stack: Gtk.Stack) -> None:
global recording # pylint: disable=global-statement
assert recording is not None
- stdin = recording.process.stdin
- assert stdin is not None
- stdin.write(b'q')
- stdin.flush()
- recording.process.wait()
+ recording.stop()
recording = None
stack.set_visible_child_name("not_recording")