2
1
Fork 1
Deckorder/main.py

307 lines
10 KiB
Python

# pyright: reportMissingModuleSource=false
# WIP GStreamer Recorder
# Pipeline largely based on https://github.com/Epictek/DeckyStream/blob/master/backend/GstreamerServiceShadow.cs
import sys
sys.path.append("/home/deck/repos/Deckorder/deps")
import time
import gbulb
import os
# import errno
# import shutil
import signal
import subprocess
# import socket
import logging
gbulb.install(gtk=False) # turn off gtk to remove ui
import gi
from asyncio import AbstractEventLoop, sleep, get_event_loop
# gi.require_version('Gtk', '3.0') # for ui
gi.require_version('GIRepository', '2.0')
from gi.repository import GIRepository
GIRepository.Repository.prepend_library_path(os.path.join(os.path.dirname(__file__), "libs"))
gi.require_version('Gst', '1.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gst
# os.environ['HOME'] = "/home/deck"
# os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000"
dirpath = "/home/deck/Videos/ReplaySorcery"
logging.basicConfig(
# filename="/tmp/recorder.log",
format='%(asctime)s %(levelname)s %(message)s',
filemode='w',
force=True)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Gst.init(["--verbose"])
Gst.debug_set_active(True)
Gst.debug_set_default_threshold(4)
# if "replay-sorcery.conf" not in os.listdir("/home/deck/.config"):
class Plugin:
def __del__(self):
pass
def __init__(self):
self.loop = get_event_loop()
# async def _start_recorder(self):
# pass
# async def start_recorder(self):
# if not self.recording:
# self.recording = True
# logger.info("Starting")
# asyncio.ensure_future(self._start_recorder(self))
# async def stop_recorder(self):
# if self.recording:
# self.recording = False
# logger.info("Stopping")
# # os.kill(self.process.pid, signal.SIGINT)
# async def toggle_recorder(self):
# if self.recording:
# return await self.stop_recorder(self)
# else:
# return await self.start_recorder(self)
# async def save(self):
# if self.recording and not self.paused:
# self.saving = asyncio.get_event_loop().create_future()
# logger.info("Saving")
# with open('/tmp/recorder.log', "a") as out:
# # subprocess.Popen([os.path.dirname(__file__) + "/out/replay-sorcery", "save"], stdin=subprocess.PIPE, stdout=out, stderr=out)
# logger.info("Done saving")
# return await self.saving
# return False
# async def suspend_pause(self):
# if self.recording:
# logger.info("Suspending")
# self.paused = True
# os.kill(self.process.pid, signal.SIGINT)
# async def suspend_resume(self):
# if self.recording and self.paused:
# logger.info("Resuming")
# self.paused = False
# asyncio.ensure_future(self._start_recorder(self))
# # await self.start_recorder(self)
# async def get_status(self):
# return self.recording
# # client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
# # client.connect("/tmp/replay-sorcery/control.sock")
# # client.close()
# async def get_videos_len(self):
# return len(os.listdir("/home/deck/Videos/ReplaySorcery"))
# async def get_videos(self):
# arr = sorted(filter(lambda s: os.path.isfile(os.path.join(dirpath, s)), os.listdir(dirpath)), key=lambda s: os.path.getmtime(os.path.join(dirpath, s)))
# arr.reverse()
# return arr
async def _standalone(self):
rec = Recorder(self.loop)
# self.loop.create_task(rec.start())
await rec.start()
await sleep(15)
await rec.save()
# self.loop.create_task(self._rec())
await rec.stop()
await sleep(2)
class Recorder:
def __init__(self, loop: AbstractEventLoop) -> None:
self.pipeline: Gst.Pipeline | None = None
self.is_saving: bool = False
self.buffer_count: int = 0
self.loop = loop
async def start(self):
self.pipeline = Gst.parse_launch("""
pipewiresrc name=videosrc do-timestamp=true ! vaapipostproc name=postproc ! queue name=vrecq ! vaapih264enc name=venc ! h264parse name=parse ! mp4mux name=muxer ! filesink async=false name=filesink
""")
self.venc: Gst.Element = self.pipeline.get_by_name("venc")
self.parse: Gst.Element = self.pipeline.get_by_name("parse")
self.videosrc: Gst.Element = self.pipeline.get_by_name("videosrc")
self.postproc: Gst.Element = self.pipeline.get_by_name("postproc")
self.vrecq: Gst.Element = self.pipeline.get_by_name("vrecq")
assert self.vrecq is not None
self.vrecq.set_property("max-size-time", 10 * Gst.SECOND)
self.vrecq.set_property("max-size-bytes", 0)
self.vrecq.set_property("max-size-buffers", 0)
self.vrecq.set_property("leaky", 2)
self.vrecq_src = self.vrecq.get_static_pad("src")
assert self.vrecq_src is not None
self.vrecq_src_probe_id = self.vrecq_src.add_probe(
Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb)
self.filesink: Gst.Element = self.pipeline.get_by_name("filesink")
self.update_filename()
self.muxer: Gst.Element = self.pipeline.get_by_name("muxer")
self.pipeline.set_state(Gst.State.PLAYING)
self.pipeline.bus.add_signal_watch()
self.pipeline.bus.enable_sync_message_emission()
self.pipeline.bus.connect("message", self.on_message)
# pipeline.bus.add_signal_watch()
# # pipeline.bus.enable_sync_message_emission()
# pipeline.bus.connect("message", self.on_msg)
# self.in_valve = pipeline.get_by_name("in-flow")
# self.queue = pipeline.get_by_name("buf")
# self.tee = pipeline.get_by_name("t")
# self.pipeline = pipeline
# assert self.in_valve is not None
# assert self.queue is not None
# assert self.tee is not None
# assert self.pipeline is not None
self.pipeline.set_state(Gst.State.PLAYING)
async def stop(self):
if self.pipeline is not None:
self.filesink.set_state(Gst.State.NULL)
self.muxer.set_state(Gst.State.NULL)
self.parse.set_state(Gst.State.NULL)
self.venc.set_state(Gst.State.NULL)
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.unref()
await sleep(1)
self.pipeline = None
async def save(self):
if self.is_saving:
return
logger.info("timeout, unblocking pad to save recording")
# need to hook up another probe to drop the initial old buffer stuck
# in the blocking pad probe
assert self.vrecq_src is not None
self.vrecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb)
if self.vrecq_src_probe_id != 0:
self.vrecq_src.remove_probe(self.vrecq_src_probe_id)
self.vrecq_src_probe_id = 0
await sleep(5)
self.vrecq_src_probe_id = self.vrecq_src.add_probe(
Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb)
self.loop.create_task(self.push_eos())
def block_probe_cb(self, _idk, _idk2):
return Gst.PadProbeReturn.OK
def probe_drop_one_cb(self, pad: Gst.Pad, info: Gst.PadProbeInfo):
buf: Gst.Buffer = info.get_buffer()
if self.buffer_count == 0:
self.buffer_count += 1;
logger.debug("Drop one buffer with ts " + str(buf.dts))
return Gst.PadProbeReturn.DROP
else:
is_keyframe = buf.has_flags(Gst.BufferFlags.DELTA_UNIT)
if is_keyframe:
logger.debug("Letting buffer through and removing drop probe")
return Gst.PadProbeReturn.REMOVE
else:
logger.debug("Dropping buffer, wait for a keyframe")
return Gst.PadProbeReturn.DROP
async def push_eos(self):
assert self.vrecq_src is not None
peer: Gst.Pad = self.vrecq_src.get_peer()
assert self.pipeline is not None
logger.debug(f"pushing EOS event on pad {peer.get_name()}")
self.pipeline.set_property("message-forward", True)
peer.send_event(Gst.Event.new_eos())
peer.unref()
self.is_saving = False
def on_message(self, bus: Gst.Bus, message: Gst.Message):
print("message", message, message.type)
match message.type:
case Gst.MessageType.ELEMENT:
struct = message.get_structure()
assert struct is not None
forwarded_message_dta = struct.get_value("message")
assert forwarded_message_dta is not None
forwarded_message: Gst.Message = forwarded_message_dta.get_value()
print(str(forwarded_message))
if forwarded_message.type == Gst.MessageType.EOS:
logger.debug("Forwarded EOS")
self.filesink.set_state(Gst.State.NULL)
self.muxer.set_state(Gst.State.NULL)
self.parse.set_state(Gst.State.NULL)
self.venc.set_state(Gst.State.NULL)
self.update_filename()
self.venc.set_state(Gst.State.PLAYING)
self.parse.set_state(Gst.State.PLAYING)
self.muxer.set_state(Gst.State.PLAYING)
self.filesink.set_state(Gst.State.PLAYING)
# Dispose forwarded_message?
def update_filename(self):
logger.debug("Updated filename!")
self.filesink.set_property('location', f"/home/deck/Videos/{time.strftime('%Y-%m-%d-%H-%M-%S')}.mp4")
def dispose(self):
# TODO
pass
# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
# async def _main(self):
# self.recording = True
# self.paused = True
# path = os.path.dirname(os.path.abspath(__file__))
# try:
# os.symlink("/home/deck/Videos/ReplaySorcery", path + "/dist/assets", target_is_directory=True)
# except OSError as e:
# if e.errno == errno.EEXIST:
# pass
# # await self.start_recorder(self)
# pass
if __name__ == "__main__":
get_event_loop().run_until_complete(Plugin()._standalone())