307 lines
10 KiB
Python
307 lines
10 KiB
Python
# pyright: reportMissingModuleSource=false
|
|
# WIP GStreamer Recorder
|
|
# Pipeline largely based on https://github.com/Epictek/DeckyStream/blob/master/backend/GstreamerServiceShadow.cs
|
|
import sys
|
|
sys.path.append("/home/deck/repos/Deckorder/deps")
|
|
import time
|
|
import gbulb
|
|
import os
|
|
# import errno
|
|
# import shutil
|
|
import signal
|
|
import subprocess
|
|
# import socket
|
|
import logging
|
|
gbulb.install(gtk=False) # turn off gtk to remove ui
|
|
import gi
|
|
from asyncio import AbstractEventLoop, sleep, get_event_loop
|
|
# gi.require_version('Gtk', '3.0') # for ui
|
|
gi.require_version('GIRepository', '2.0')
|
|
from gi.repository import GIRepository
|
|
GIRepository.Repository.prepend_library_path(os.path.join(os.path.dirname(__file__), "libs"))
|
|
gi.require_version('Gst', '1.0')
|
|
gi.require_version('GstVideo', '1.0')
|
|
|
|
from gi.repository import Gst
|
|
|
|
# os.environ['HOME'] = "/home/deck"
|
|
# os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000"
|
|
|
|
dirpath = "/home/deck/Videos/ReplaySorcery"
|
|
|
|
logging.basicConfig(
|
|
# filename="/tmp/recorder.log",
|
|
format='%(asctime)s %(levelname)s %(message)s',
|
|
filemode='w',
|
|
force=True)
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
Gst.init(["--verbose"])
|
|
|
|
Gst.debug_set_active(True)
|
|
Gst.debug_set_default_threshold(4)
|
|
|
|
# if "replay-sorcery.conf" not in os.listdir("/home/deck/.config"):
|
|
|
|
|
|
class Plugin:
|
|
def __del__(self):
|
|
pass
|
|
|
|
def __init__(self):
|
|
self.loop = get_event_loop()
|
|
|
|
# async def _start_recorder(self):
|
|
# pass
|
|
|
|
# async def start_recorder(self):
|
|
# if not self.recording:
|
|
# self.recording = True
|
|
# logger.info("Starting")
|
|
# asyncio.ensure_future(self._start_recorder(self))
|
|
|
|
# async def stop_recorder(self):
|
|
# if self.recording:
|
|
# self.recording = False
|
|
# logger.info("Stopping")
|
|
# # os.kill(self.process.pid, signal.SIGINT)
|
|
|
|
# async def toggle_recorder(self):
|
|
# if self.recording:
|
|
# return await self.stop_recorder(self)
|
|
# else:
|
|
# return await self.start_recorder(self)
|
|
|
|
# async def save(self):
|
|
# if self.recording and not self.paused:
|
|
# self.saving = asyncio.get_event_loop().create_future()
|
|
# logger.info("Saving")
|
|
# with open('/tmp/recorder.log', "a") as out:
|
|
# # subprocess.Popen([os.path.dirname(__file__) + "/out/replay-sorcery", "save"], stdin=subprocess.PIPE, stdout=out, stderr=out)
|
|
# logger.info("Done saving")
|
|
# return await self.saving
|
|
# return False
|
|
|
|
# async def suspend_pause(self):
|
|
# if self.recording:
|
|
# logger.info("Suspending")
|
|
# self.paused = True
|
|
# os.kill(self.process.pid, signal.SIGINT)
|
|
|
|
# async def suspend_resume(self):
|
|
# if self.recording and self.paused:
|
|
# logger.info("Resuming")
|
|
# self.paused = False
|
|
# asyncio.ensure_future(self._start_recorder(self))
|
|
# # await self.start_recorder(self)
|
|
|
|
# async def get_status(self):
|
|
# return self.recording
|
|
# # client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
# # client.connect("/tmp/replay-sorcery/control.sock")
|
|
# # client.close()
|
|
|
|
# async def get_videos_len(self):
|
|
# return len(os.listdir("/home/deck/Videos/ReplaySorcery"))
|
|
|
|
# async def get_videos(self):
|
|
# arr = sorted(filter(lambda s: os.path.isfile(os.path.join(dirpath, s)), os.listdir(dirpath)), key=lambda s: os.path.getmtime(os.path.join(dirpath, s)))
|
|
# arr.reverse()
|
|
# return arr
|
|
|
|
async def _standalone(self):
|
|
rec = Recorder(self.loop)
|
|
# self.loop.create_task(rec.start())
|
|
await rec.start()
|
|
await sleep(15)
|
|
await rec.save()
|
|
# self.loop.create_task(self._rec())
|
|
await rec.stop()
|
|
|
|
await sleep(2)
|
|
class Recorder:
|
|
def __init__(self, loop: AbstractEventLoop) -> None:
|
|
self.pipeline: Gst.Pipeline | None = None
|
|
self.is_saving: bool = False
|
|
self.buffer_count: int = 0
|
|
self.loop = loop
|
|
|
|
async def start(self):
|
|
self.pipeline = Gst.parse_launch("""
|
|
pipewiresrc name=videosrc do-timestamp=true ! vaapipostproc name=postproc ! queue name=vrecq ! vaapih264enc name=venc ! h264parse name=parse ! mp4mux name=muxer ! filesink async=false name=filesink
|
|
""")
|
|
|
|
self.venc: Gst.Element = self.pipeline.get_by_name("venc")
|
|
self.parse: Gst.Element = self.pipeline.get_by_name("parse")
|
|
self.videosrc: Gst.Element = self.pipeline.get_by_name("videosrc")
|
|
self.postproc: Gst.Element = self.pipeline.get_by_name("postproc")
|
|
|
|
self.vrecq: Gst.Element = self.pipeline.get_by_name("vrecq")
|
|
|
|
assert self.vrecq is not None
|
|
|
|
self.vrecq.set_property("max-size-time", 10 * Gst.SECOND)
|
|
self.vrecq.set_property("max-size-bytes", 0)
|
|
self.vrecq.set_property("max-size-buffers", 0)
|
|
|
|
self.vrecq.set_property("leaky", 2)
|
|
|
|
self.vrecq_src = self.vrecq.get_static_pad("src")
|
|
assert self.vrecq_src is not None
|
|
self.vrecq_src_probe_id = self.vrecq_src.add_probe(
|
|
Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb)
|
|
|
|
self.filesink: Gst.Element = self.pipeline.get_by_name("filesink")
|
|
|
|
self.update_filename()
|
|
|
|
self.muxer: Gst.Element = self.pipeline.get_by_name("muxer")
|
|
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
self.pipeline.bus.add_signal_watch()
|
|
self.pipeline.bus.enable_sync_message_emission()
|
|
self.pipeline.bus.connect("message", self.on_message)
|
|
# pipeline.bus.add_signal_watch()
|
|
# # pipeline.bus.enable_sync_message_emission()
|
|
# pipeline.bus.connect("message", self.on_msg)
|
|
# self.in_valve = pipeline.get_by_name("in-flow")
|
|
# self.queue = pipeline.get_by_name("buf")
|
|
# self.tee = pipeline.get_by_name("t")
|
|
# self.pipeline = pipeline
|
|
# assert self.in_valve is not None
|
|
# assert self.queue is not None
|
|
# assert self.tee is not None
|
|
# assert self.pipeline is not None
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
async def stop(self):
|
|
if self.pipeline is not None:
|
|
self.filesink.set_state(Gst.State.NULL)
|
|
self.muxer.set_state(Gst.State.NULL)
|
|
self.parse.set_state(Gst.State.NULL)
|
|
self.venc.set_state(Gst.State.NULL)
|
|
self.pipeline.set_state(Gst.State.NULL)
|
|
|
|
self.pipeline.unref()
|
|
|
|
await sleep(1)
|
|
|
|
self.pipeline = None
|
|
|
|
async def save(self):
|
|
if self.is_saving:
|
|
return
|
|
|
|
logger.info("timeout, unblocking pad to save recording")
|
|
|
|
# need to hook up another probe to drop the initial old buffer stuck
|
|
# in the blocking pad probe
|
|
assert self.vrecq_src is not None
|
|
self.vrecq_src.add_probe(Gst.PadProbeType.BUFFER, self.probe_drop_one_cb)
|
|
|
|
if self.vrecq_src_probe_id != 0:
|
|
self.vrecq_src.remove_probe(self.vrecq_src_probe_id)
|
|
|
|
self.vrecq_src_probe_id = 0
|
|
|
|
await sleep(5)
|
|
|
|
self.vrecq_src_probe_id = self.vrecq_src.add_probe(
|
|
Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.block_probe_cb)
|
|
|
|
self.loop.create_task(self.push_eos())
|
|
|
|
def block_probe_cb(self, _idk, _idk2):
|
|
return Gst.PadProbeReturn.OK
|
|
|
|
def probe_drop_one_cb(self, pad: Gst.Pad, info: Gst.PadProbeInfo):
|
|
buf: Gst.Buffer = info.get_buffer()
|
|
if self.buffer_count == 0:
|
|
self.buffer_count += 1;
|
|
logger.debug("Drop one buffer with ts " + str(buf.dts))
|
|
return Gst.PadProbeReturn.DROP
|
|
else:
|
|
is_keyframe = buf.has_flags(Gst.BufferFlags.DELTA_UNIT)
|
|
|
|
if is_keyframe:
|
|
logger.debug("Letting buffer through and removing drop probe")
|
|
return Gst.PadProbeReturn.REMOVE
|
|
else:
|
|
logger.debug("Dropping buffer, wait for a keyframe")
|
|
return Gst.PadProbeReturn.DROP
|
|
|
|
async def push_eos(self):
|
|
assert self.vrecq_src is not None
|
|
peer: Gst.Pad = self.vrecq_src.get_peer()
|
|
|
|
assert self.pipeline is not None
|
|
|
|
logger.debug(f"pushing EOS event on pad {peer.get_name()}")
|
|
self.pipeline.set_property("message-forward", True)
|
|
|
|
peer.send_event(Gst.Event.new_eos())
|
|
peer.unref()
|
|
|
|
self.is_saving = False
|
|
|
|
def on_message(self, bus: Gst.Bus, message: Gst.Message):
|
|
print("message", message, message.type)
|
|
|
|
match message.type:
|
|
case Gst.MessageType.ELEMENT:
|
|
struct = message.get_structure()
|
|
|
|
assert struct is not None
|
|
|
|
forwarded_message_dta = struct.get_value("message")
|
|
assert forwarded_message_dta is not None
|
|
forwarded_message: Gst.Message = forwarded_message_dta.get_value()
|
|
|
|
print(str(forwarded_message))
|
|
|
|
if forwarded_message.type == Gst.MessageType.EOS:
|
|
logger.debug("Forwarded EOS")
|
|
|
|
self.filesink.set_state(Gst.State.NULL)
|
|
self.muxer.set_state(Gst.State.NULL)
|
|
self.parse.set_state(Gst.State.NULL)
|
|
self.venc.set_state(Gst.State.NULL)
|
|
|
|
self.update_filename()
|
|
|
|
self.venc.set_state(Gst.State.PLAYING)
|
|
self.parse.set_state(Gst.State.PLAYING)
|
|
self.muxer.set_state(Gst.State.PLAYING)
|
|
self.filesink.set_state(Gst.State.PLAYING)
|
|
|
|
|
|
# Dispose forwarded_message?
|
|
def update_filename(self):
|
|
logger.debug("Updated filename!")
|
|
self.filesink.set_property('location', f"/home/deck/Videos/{time.strftime('%Y-%m-%d-%H-%M-%S')}.mp4")
|
|
|
|
def dispose(self):
|
|
# TODO
|
|
pass
|
|
|
|
|
|
# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
|
|
# async def _main(self):
|
|
# self.recording = True
|
|
# self.paused = True
|
|
# path = os.path.dirname(os.path.abspath(__file__))
|
|
# try:
|
|
# os.symlink("/home/deck/Videos/ReplaySorcery", path + "/dist/assets", target_is_directory=True)
|
|
# except OSError as e:
|
|
# if e.errno == errno.EEXIST:
|
|
# pass
|
|
# # await self.start_recorder(self)
|
|
# pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
get_event_loop().run_until_complete(Plugin()._standalone())
|
|
|