mirror of
https://github.com/tcsenpai/pensieve.git
synced 2025-06-06 03:05:25 +00:00
added linux support
This commit is contained in:
parent
66aaec4150
commit
36af2346c8
5
.gitignore
vendored
5
.gitignore
vendored
@ -10,3 +10,8 @@ test-data/
|
|||||||
memos/static/
|
memos/static/
|
||||||
db/
|
db/
|
||||||
memos/plugins/ocr/temp_ppocr.yaml
|
memos/plugins/ocr/temp_ppocr.yaml
|
||||||
|
memos.spec
|
||||||
|
memosexec
|
||||||
|
screenshots
|
||||||
|
screenshots/
|
||||||
|
yarn.lock
|
||||||
|
@ -15,6 +15,8 @@ from functools import lru_cache
|
|||||||
from collections import defaultdict, deque
|
from collections import defaultdict, deque
|
||||||
|
|
||||||
# Third-party imports
|
# Third-party imports
|
||||||
|
import platform
|
||||||
|
import subprocess
|
||||||
import typer
|
import typer
|
||||||
import httpx
|
import httpx
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
@ -839,12 +841,31 @@ def sync(
|
|||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def is_on_battery():
|
def is_on_battery():
|
||||||
try:
|
|
||||||
battery = psutil.sensors_battery()
|
|
||||||
return battery is not None and not battery.power_plugged
|
|
||||||
except:
|
|
||||||
return False # If unable to detect battery status, assume not on battery
|
|
||||||
|
|
||||||
|
if platform.system() == "Darwin":
|
||||||
|
try:
|
||||||
|
result = subprocess.check_output(['pmset', '-g', 'batt']).decode()
|
||||||
|
return "'Battery Power'" in result
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
elif platform.system() == "Windows":
|
||||||
|
try:
|
||||||
|
return psutil.sensors_battery().power_plugged == False
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
elif platform.system() == "Linux":
|
||||||
|
try:
|
||||||
|
# Try using upower
|
||||||
|
result = subprocess.check_output(['upower', '--show-info', '/org/freedesktop/UPower/devices/battery_BAT0']).decode()
|
||||||
|
return 'state: discharging' in result.lower()
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
# Fallback to checking /sys/class/power_supply
|
||||||
|
with open('/sys/class/power_supply/BAT0/status', 'r') as f:
|
||||||
|
return f.read().strip().lower() == 'discharging'
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
# Modify the LibraryFileHandler class
|
# Modify the LibraryFileHandler class
|
||||||
class LibraryFileHandler(FileSystemEventHandler):
|
class LibraryFileHandler(FileSystemEventHandler):
|
||||||
|
@ -450,30 +450,43 @@ def remove_windows_autostart():
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
def generate_systemd_service():
|
||||||
def disable():
|
"""Generate systemd service file for Linux."""
|
||||||
"""Disable memos from running at startup"""
|
memos_dir = settings.resolved_base_dir
|
||||||
if is_windows():
|
python_path = get_python_path()
|
||||||
if remove_windows_autostart():
|
log_dir = memos_dir / "logs"
|
||||||
typer.echo(
|
log_dir.mkdir(parents=True, exist_ok=True)
|
||||||
"Removed Memos shortcut from startup folder. Memos will no longer run at startup."
|
|
||||||
)
|
service_content = f"""[Unit]
|
||||||
else:
|
Description=Memos Service
|
||||||
typer.echo(
|
After=network.target
|
||||||
"Memos shortcut not found in startup folder. Memos is not set to run at startup."
|
|
||||||
)
|
[Service]
|
||||||
elif is_macos():
|
Type=simple
|
||||||
plist_path = Path.home() / "Library/LaunchAgents/com.user.memos.plist"
|
Environment="PATH={os.environ['PATH']}"
|
||||||
if plist_path.exists():
|
ExecStart={python_path} -m memos.commands record
|
||||||
subprocess.run(["launchctl", "unload", str(plist_path)], check=True)
|
ExecStart={python_path} -m memos.commands serve
|
||||||
plist_path.unlink()
|
ExecStartPre=/bin/sleep 15
|
||||||
typer.echo(
|
ExecStart={python_path} -m memos.commands watch
|
||||||
"Unloaded and removed plist file. Memos will no longer run at startup."
|
Restart=always
|
||||||
)
|
User={os.getenv('USER')}
|
||||||
else:
|
StandardOutput=append:{log_dir}/memos.log
|
||||||
typer.echo("Plist file does not exist. Memos is not set to run at startup.")
|
StandardError=append:{log_dir}/memos.error.log
|
||||||
else:
|
|
||||||
typer.echo("Unsupported operating system.")
|
[Install]
|
||||||
|
WantedBy=default.target
|
||||||
|
"""
|
||||||
|
|
||||||
|
service_path = Path.home() / ".config/systemd/user"
|
||||||
|
service_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
service_file = service_path / "memos.service"
|
||||||
|
with open(service_file, "w") as f:
|
||||||
|
f.write(service_content)
|
||||||
|
return service_file
|
||||||
|
|
||||||
|
|
||||||
|
def is_linux():
|
||||||
|
return platform.system() == "Linux"
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
@app.command()
|
||||||
@ -497,9 +510,145 @@ def enable():
|
|||||||
plist_path = generate_plist()
|
plist_path = generate_plist()
|
||||||
typer.echo(f"Generated plist file at {plist_path}")
|
typer.echo(f"Generated plist file at {plist_path}")
|
||||||
load_plist(plist_path)
|
load_plist(plist_path)
|
||||||
typer.echo(
|
typer.echo("Loaded plist file. Memos will run at next startup.")
|
||||||
"Loaded plist file. Memos is started and will run at next startup or when 'start' command is used."
|
elif is_linux():
|
||||||
|
service_file = generate_systemd_service()
|
||||||
|
typer.echo(f"Generated systemd service file at {service_file}")
|
||||||
|
# Enable and start the service
|
||||||
|
subprocess.run(["systemctl", "--user", "enable", "memos.service"], check=True)
|
||||||
|
typer.echo("Enabled memos systemd service for current user.")
|
||||||
|
else:
|
||||||
|
typer.echo("Unsupported operating system.")
|
||||||
|
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def disable():
|
||||||
|
"""Disable memos from running at startup"""
|
||||||
|
if is_windows():
|
||||||
|
if remove_windows_autostart():
|
||||||
|
typer.echo("Removed Memos shortcut from startup folder.")
|
||||||
|
else:
|
||||||
|
typer.echo("Memos shortcut not found in startup folder.")
|
||||||
|
elif is_macos():
|
||||||
|
plist_path = Path.home() / "Library/LaunchAgents/com.user.memos.plist"
|
||||||
|
if plist_path.exists():
|
||||||
|
subprocess.run(["launchctl", "unload", str(plist_path)], check=True)
|
||||||
|
plist_path.unlink()
|
||||||
|
typer.echo("Unloaded and removed plist file.")
|
||||||
|
else:
|
||||||
|
typer.echo("Plist file does not exist.")
|
||||||
|
elif is_linux():
|
||||||
|
service_file = Path.home() / ".config/systemd/user/memos.service"
|
||||||
|
if service_file.exists():
|
||||||
|
subprocess.run(
|
||||||
|
["systemctl", "--user", "disable", "memos.service"], check=True
|
||||||
)
|
)
|
||||||
|
subprocess.run(["systemctl", "--user", "stop", "memos.service"], check=True)
|
||||||
|
service_file.unlink()
|
||||||
|
typer.echo("Disabled and removed memos systemd service.")
|
||||||
|
else:
|
||||||
|
typer.echo("Systemd service file does not exist.")
|
||||||
|
else:
|
||||||
|
typer.echo("Unsupported operating system.")
|
||||||
|
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def start():
|
||||||
|
"""Start all Memos processes"""
|
||||||
|
memos_dir = settings.resolved_base_dir
|
||||||
|
|
||||||
|
if is_windows():
|
||||||
|
bat_path = memos_dir / "launch.bat"
|
||||||
|
if not bat_path.exists():
|
||||||
|
typer.echo("Launch script not found. Please run 'memos enable' first.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
subprocess.Popen(
|
||||||
|
[str(bat_path)], shell=True, creationflags=subprocess.CREATE_NEW_CONSOLE
|
||||||
|
)
|
||||||
|
typer.echo("Started Memos processes.")
|
||||||
|
except Exception as e:
|
||||||
|
typer.echo(f"Failed to start Memos processes: {str(e)}")
|
||||||
|
elif is_macos():
|
||||||
|
service_name = "com.user.memos"
|
||||||
|
subprocess.run(["launchctl", "start", service_name], check=True)
|
||||||
|
typer.echo("Started Memos processes.")
|
||||||
|
elif is_linux():
|
||||||
|
try:
|
||||||
|
subprocess.run(
|
||||||
|
["systemctl", "--user", "start", "memos.service"], check=True
|
||||||
|
)
|
||||||
|
typer.echo("Started Memos processes.")
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
typer.echo(f"Failed to start Memos processes: {str(e)}")
|
||||||
|
else:
|
||||||
|
typer.echo("Unsupported operating system.")
|
||||||
|
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def stop():
|
||||||
|
"""Stop all running Memos processes"""
|
||||||
|
if is_windows():
|
||||||
|
services = ["serve", "watch", "record"]
|
||||||
|
stopped = False
|
||||||
|
for service in services:
|
||||||
|
processes = [
|
||||||
|
p
|
||||||
|
for p in psutil.process_iter(["pid", "name", "cmdline"])
|
||||||
|
if "python" in p.info["name"].lower()
|
||||||
|
and p.info["cmdline"] is not None
|
||||||
|
and "memos.commands" in p.info["cmdline"]
|
||||||
|
and service in p.info["cmdline"]
|
||||||
|
]
|
||||||
|
|
||||||
|
for process in processes:
|
||||||
|
try:
|
||||||
|
os.kill(process.info["pid"], signal.SIGTERM)
|
||||||
|
typer.echo(
|
||||||
|
f"Stopped {service} process (PID: {process.info['pid']})"
|
||||||
|
)
|
||||||
|
stopped = True
|
||||||
|
except ProcessLookupError:
|
||||||
|
typer.echo(
|
||||||
|
f"Process {service} (PID: {process.info['pid']}) not found"
|
||||||
|
)
|
||||||
|
except PermissionError:
|
||||||
|
typer.echo(
|
||||||
|
f"Permission denied to stop {service} process (PID: {process.info['pid']})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not stopped:
|
||||||
|
typer.echo("No running Memos processes found")
|
||||||
|
elif is_macos():
|
||||||
|
service_name = "com.user.memos"
|
||||||
|
try:
|
||||||
|
subprocess.run(["launchctl", "stop", service_name], check=True)
|
||||||
|
typer.echo("Stopped Memos processes.")
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
typer.echo("Failed to stop Memos processes. They may not be running.")
|
||||||
|
elif is_linux():
|
||||||
|
try:
|
||||||
|
subprocess.run(["systemctl", "--user", "stop", "memos.service"], check=True)
|
||||||
|
typer.echo("Stopped Memos processes.")
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
# Fallback to manual process killing if systemd service fails
|
||||||
|
services = ["serve", "watch", "record"]
|
||||||
|
stopped = False
|
||||||
|
for service in services:
|
||||||
|
try:
|
||||||
|
output = subprocess.check_output(
|
||||||
|
["pgrep", "-f", f"memos.commands {service}"]
|
||||||
|
)
|
||||||
|
pids = output.decode().strip().split()
|
||||||
|
for pid in pids:
|
||||||
|
os.kill(int(pid), signal.SIGTERM)
|
||||||
|
typer.echo(f"Stopped {service} process (PID: {pid})")
|
||||||
|
stopped = True
|
||||||
|
except (subprocess.CalledProcessError, ProcessLookupError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not stopped:
|
||||||
|
typer.echo("No running Memos processes found")
|
||||||
else:
|
else:
|
||||||
typer.echo("Unsupported operating system.")
|
typer.echo("Unsupported operating system.")
|
||||||
|
|
||||||
@ -538,83 +687,6 @@ def ps():
|
|||||||
typer.echo(tabulate(table_data, headers=headers, tablefmt="plain"))
|
typer.echo(tabulate(table_data, headers=headers, tablefmt="plain"))
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def stop():
|
|
||||||
"""Stop all running Memos processes"""
|
|
||||||
if is_windows():
|
|
||||||
services = ["serve", "watch", "record"]
|
|
||||||
stopped = False
|
|
||||||
|
|
||||||
for service in services:
|
|
||||||
processes = [
|
|
||||||
p
|
|
||||||
for p in psutil.process_iter(["pid", "name", "cmdline"])
|
|
||||||
if "python" in p.info["name"].lower()
|
|
||||||
and p.info["cmdline"] is not None
|
|
||||||
and "memos.commands" in p.info["cmdline"]
|
|
||||||
and service in p.info["cmdline"]
|
|
||||||
]
|
|
||||||
|
|
||||||
for process in processes:
|
|
||||||
try:
|
|
||||||
os.kill(process.info["pid"], signal.SIGTERM)
|
|
||||||
typer.echo(
|
|
||||||
f"Stopped {service} process (PID: {process.info['pid']})"
|
|
||||||
)
|
|
||||||
stopped = True
|
|
||||||
except ProcessLookupError:
|
|
||||||
typer.echo(
|
|
||||||
f"Process {service} (PID: {process.info['pid']}) not found"
|
|
||||||
)
|
|
||||||
except PermissionError:
|
|
||||||
typer.echo(
|
|
||||||
f"Permission denied to stop {service} process (PID: {process.info['pid']})"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not stopped:
|
|
||||||
typer.echo("No running Memos processes found")
|
|
||||||
else:
|
|
||||||
typer.echo("All Memos processes have been stopped")
|
|
||||||
|
|
||||||
elif is_macos():
|
|
||||||
service_name = "com.user.memos"
|
|
||||||
try:
|
|
||||||
subprocess.run(["launchctl", "stop", service_name], check=True)
|
|
||||||
typer.echo("Stopped Memos processes.")
|
|
||||||
except subprocess.CalledProcessError:
|
|
||||||
typer.echo("Failed to stop Memos processes. They may not be running.")
|
|
||||||
|
|
||||||
else:
|
|
||||||
typer.echo("Unsupported operating system.")
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
|
||||||
def start():
|
|
||||||
"""Start all Memos processes"""
|
|
||||||
memos_dir = settings.resolved_base_dir
|
|
||||||
|
|
||||||
if is_windows():
|
|
||||||
bat_path = memos_dir / "launch.bat"
|
|
||||||
if not bat_path.exists():
|
|
||||||
typer.echo("Launch script not found. Please run 'memos enable' first.")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
subprocess.Popen(
|
|
||||||
[str(bat_path)], shell=True, creationflags=subprocess.CREATE_NEW_CONSOLE
|
|
||||||
)
|
|
||||||
typer.echo("Started Memos processes. Check the logs for more information.")
|
|
||||||
except Exception as e:
|
|
||||||
typer.echo(f"Failed to start Memos processes: {str(e)}")
|
|
||||||
|
|
||||||
elif is_macos():
|
|
||||||
service_name = "com.user.memos"
|
|
||||||
subprocess.run(["launchctl", "start", service_name], check=True)
|
|
||||||
typer.echo("Started Memos processes.")
|
|
||||||
else:
|
|
||||||
typer.echo("Unsupported operating system.")
|
|
||||||
|
|
||||||
|
|
||||||
@app.command()
|
@app.command()
|
||||||
def config():
|
def config():
|
||||||
"""Show current configuration settings"""
|
"""Show current configuration settings"""
|
||||||
|
288
memos/record.py
288
memos/record.py
@ -66,6 +66,64 @@ def save_previous_hashes(base_dir, previous_hashes):
|
|||||||
json.dump(previous_hashes, f)
|
json.dump(previous_hashes, f)
|
||||||
|
|
||||||
|
|
||||||
|
def get_wayland_displays():
|
||||||
|
displays = []
|
||||||
|
try:
|
||||||
|
# Try using swaymsg for sway
|
||||||
|
output = subprocess.check_output(["swaymsg", "-t", "get_outputs"], text=True)
|
||||||
|
outputs = json.loads(output)
|
||||||
|
for output in outputs:
|
||||||
|
if output["active"]:
|
||||||
|
displays.append(
|
||||||
|
{
|
||||||
|
"name": output["name"],
|
||||||
|
"geometry": f"{output['rect'].x},{output['rect'].y} {output['rect'].width}x{output['rect'].height}",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
# Try using wlr-randr for wlroots-based compositors
|
||||||
|
output = subprocess.check_output(["wlr-randr"], text=True)
|
||||||
|
# Parse wlr-randr output
|
||||||
|
current_display = {}
|
||||||
|
for line in output.splitlines():
|
||||||
|
if line.startswith(" "):
|
||||||
|
if "enabled" in line and "yes" in line:
|
||||||
|
current_display["active"] = True
|
||||||
|
else:
|
||||||
|
if current_display and current_display.get("active"):
|
||||||
|
displays.append(current_display)
|
||||||
|
current_display = {"name": line.split()[0]}
|
||||||
|
except:
|
||||||
|
# Fallback to single display
|
||||||
|
displays.append({"name": "", "geometry": ""})
|
||||||
|
|
||||||
|
return displays
|
||||||
|
|
||||||
|
|
||||||
|
def get_x11_displays():
|
||||||
|
displays = []
|
||||||
|
try:
|
||||||
|
output = subprocess.check_output(["xrandr", "--current"], text=True)
|
||||||
|
current_display = None
|
||||||
|
|
||||||
|
for line in output.splitlines():
|
||||||
|
if " connected " in line:
|
||||||
|
parts = line.split()
|
||||||
|
name = parts[0]
|
||||||
|
# Find the geometry in format: 1920x1080+0+0
|
||||||
|
for part in parts:
|
||||||
|
if "x" in part and "+" in part:
|
||||||
|
geometry = part
|
||||||
|
break
|
||||||
|
displays.append({"name": name, "geometry": geometry})
|
||||||
|
except:
|
||||||
|
# Fallback to single display
|
||||||
|
displays.append({"name": "default", "geometry": ""})
|
||||||
|
|
||||||
|
return displays
|
||||||
|
|
||||||
|
|
||||||
def get_active_window_info_darwin():
|
def get_active_window_info_darwin():
|
||||||
active_app = NSWorkspace.sharedWorkspace().activeApplication()
|
active_app = NSWorkspace.sharedWorkspace().activeApplication()
|
||||||
app_name = active_app["NSApplicationName"]
|
app_name = active_app["NSApplicationName"]
|
||||||
@ -94,11 +152,68 @@ def get_active_window_info_windows():
|
|||||||
return "", ""
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_window_info_linux():
|
||||||
|
try:
|
||||||
|
# Try using xdotool for X11
|
||||||
|
window_id = (
|
||||||
|
subprocess.check_output(["xdotool", "getactivewindow"]).decode().strip()
|
||||||
|
)
|
||||||
|
window_name = (
|
||||||
|
subprocess.check_output(["xdotool", "getwindowname", window_id])
|
||||||
|
.decode()
|
||||||
|
.strip()
|
||||||
|
)
|
||||||
|
window_pid = (
|
||||||
|
subprocess.check_output(["xdotool", "getwindowpid", window_id])
|
||||||
|
.decode()
|
||||||
|
.strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
app_name = ""
|
||||||
|
try:
|
||||||
|
with open(f"/proc/{window_pid}/comm", "r") as f:
|
||||||
|
app_name = f.read().strip()
|
||||||
|
except:
|
||||||
|
app_name = window_name.split(" - ")[0]
|
||||||
|
|
||||||
|
return app_name, window_name
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
# Try using qdbus for Wayland/KDE
|
||||||
|
active_window = (
|
||||||
|
subprocess.check_output(
|
||||||
|
["qdbus", "org.kde.KWin", "/KWin", "org.kde.KWin.activeWindow"]
|
||||||
|
)
|
||||||
|
.decode()
|
||||||
|
.strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
window_title = (
|
||||||
|
subprocess.check_output(
|
||||||
|
[
|
||||||
|
"qdbus",
|
||||||
|
"org.kde.KWin",
|
||||||
|
f"/windows/{active_window}",
|
||||||
|
"org.kde.KWin.caption",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
.decode()
|
||||||
|
.strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
return window_title.split(" - ")[0], window_title
|
||||||
|
except:
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
def get_active_window_info():
|
def get_active_window_info():
|
||||||
if platform.system() == "Darwin":
|
if platform.system() == "Darwin":
|
||||||
return get_active_window_info_darwin()
|
return get_active_window_info_darwin()
|
||||||
elif platform.system() == "Windows":
|
elif platform.system() == "Windows":
|
||||||
return get_active_window_info_windows()
|
return get_active_window_info_windows()
|
||||||
|
elif platform.system() == "Linux":
|
||||||
|
return get_active_window_info_linux()
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
def take_screenshot_macos(
|
def take_screenshot_macos(
|
||||||
@ -231,13 +346,144 @@ def take_screenshot_windows(
|
|||||||
yield safe_monitor_name, webp_filename, "Saved"
|
yield safe_monitor_name, webp_filename, "Saved"
|
||||||
|
|
||||||
|
|
||||||
|
def take_screenshot_linux(
|
||||||
|
base_dir,
|
||||||
|
previous_hashes,
|
||||||
|
threshold,
|
||||||
|
screen_sequences,
|
||||||
|
date,
|
||||||
|
timestamp,
|
||||||
|
app_name,
|
||||||
|
window_title,
|
||||||
|
):
|
||||||
|
screenshots = []
|
||||||
|
|
||||||
|
# Check if running under Wayland or X11
|
||||||
|
wayland_display = os.environ.get("WAYLAND_DISPLAY")
|
||||||
|
is_wayland = wayland_display is not None
|
||||||
|
|
||||||
|
if is_wayland:
|
||||||
|
# Try different Wayland screenshot tools in order of preference
|
||||||
|
screenshot_tools = [
|
||||||
|
["spectacle", "-m", "-b", "-n"], # Plasma default
|
||||||
|
["grim"], # Basic Wayland screenshot utility
|
||||||
|
["grimshot", "save"], # sway's screenshot utility
|
||||||
|
["slurp", "-f", "%o"], # Alternative selection tool
|
||||||
|
]
|
||||||
|
|
||||||
|
for tool in screenshot_tools:
|
||||||
|
try:
|
||||||
|
# subprocess.run(["which", tool[0]], check=True, capture_output=True)
|
||||||
|
subprocess.run(["which", tool[0]], check=True, capture_output=True)
|
||||||
|
screenshot_cmd = tool
|
||||||
|
print(screenshot_cmd)
|
||||||
|
break
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise RuntimeError(
|
||||||
|
"No supported Wayland screenshot tool found. Please install grim or grimshot."
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# X11 screenshot tools
|
||||||
|
screenshot_tools = [
|
||||||
|
["maim"], # Modern screenshot tool
|
||||||
|
["scrot", "-z"], # Traditional screenshot tool
|
||||||
|
["import", "-window", "root"], # ImageMagick
|
||||||
|
]
|
||||||
|
|
||||||
|
for tool in screenshot_tools:
|
||||||
|
try:
|
||||||
|
subprocess.run(["which", tool[0]], check=True, capture_output=True)
|
||||||
|
screenshot_cmd = tool
|
||||||
|
break
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise RuntimeError(
|
||||||
|
"No supported X11 screenshot tool found. Please install maim, scrot, or imagemagick."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get display information using xrandr or Wayland equivalent
|
||||||
|
if is_wayland:
|
||||||
|
displays = get_wayland_displays()
|
||||||
|
else:
|
||||||
|
displays = get_x11_displays()
|
||||||
|
|
||||||
|
for display_index, display_info in enumerate(displays):
|
||||||
|
screen_name = f"screen_{display_index}"
|
||||||
|
|
||||||
|
temp_filename = os.path.join(
|
||||||
|
base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png"
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_wayland:
|
||||||
|
# For Wayland, we need to specify the output
|
||||||
|
output_arg = display_info["name"]
|
||||||
|
if output_arg == "":
|
||||||
|
output_arg = "0"
|
||||||
|
cmd = screenshot_cmd + ["-o", temp_filename]
|
||||||
|
print(cmd)
|
||||||
|
else:
|
||||||
|
# For X11, we can specify the geometry
|
||||||
|
geometry = display_info["geometry"]
|
||||||
|
cmd = screenshot_cmd + ["-g", geometry, temp_filename]
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.run(cmd, check=True)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
logging.error(f"Failed to capture screenshot: {e}")
|
||||||
|
yield screen_name, None, "Failed to capture"
|
||||||
|
continue
|
||||||
|
|
||||||
|
with Image.open(temp_filename) as img:
|
||||||
|
img = img.convert("RGB")
|
||||||
|
current_hash = str(imagehash.phash(img))
|
||||||
|
|
||||||
|
if (
|
||||||
|
screen_name in previous_hashes
|
||||||
|
and imagehash.hex_to_hash(current_hash)
|
||||||
|
- imagehash.hex_to_hash(previous_hashes[screen_name])
|
||||||
|
< threshold
|
||||||
|
):
|
||||||
|
logging.info(
|
||||||
|
f"Screenshot for {screen_name} is similar to the previous one. Skipping."
|
||||||
|
)
|
||||||
|
os.remove(temp_filename)
|
||||||
|
yield screen_name, None, "Skipped (similar to previous)"
|
||||||
|
continue
|
||||||
|
|
||||||
|
previous_hashes[screen_name] = current_hash
|
||||||
|
screen_sequences[screen_name] = screen_sequences.get(screen_name, 0) + 1
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"active_app": app_name,
|
||||||
|
"active_window": window_title,
|
||||||
|
"screen_name": screen_name,
|
||||||
|
"sequence": screen_sequences[screen_name],
|
||||||
|
}
|
||||||
|
|
||||||
|
webp_filename = os.path.join(
|
||||||
|
base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp"
|
||||||
|
)
|
||||||
|
img.save(webp_filename, format="WebP", quality=85)
|
||||||
|
write_image_metadata(webp_filename, metadata)
|
||||||
|
|
||||||
|
save_screen_sequences(base_dir, screen_sequences, date)
|
||||||
|
|
||||||
|
os.remove(temp_filename)
|
||||||
|
yield screen_name, webp_filename, "Success"
|
||||||
|
|
||||||
|
|
||||||
def take_screenshot(
|
def take_screenshot(
|
||||||
base_dir, previous_hashes, threshold, screen_sequences, date, timestamp
|
base_dir, previous_hashes, threshold, screen_sequences, date, timestamp
|
||||||
):
|
):
|
||||||
app_name, window_title = get_active_window_info()
|
app_name, window_title = get_active_window_info()
|
||||||
|
print(app_name, window_title)
|
||||||
os.makedirs(os.path.join(base_dir, date), exist_ok=True)
|
os.makedirs(os.path.join(base_dir, date), exist_ok=True)
|
||||||
worklog_path = os.path.join(base_dir, date, "worklog")
|
worklog_path = os.path.join(base_dir, date, "worklog")
|
||||||
|
|
||||||
with open(worklog_path, "a") as worklog:
|
with open(worklog_path, "a") as worklog:
|
||||||
if platform.system() == "Darwin":
|
if platform.system() == "Darwin":
|
||||||
screenshot_generator = take_screenshot_macos(
|
screenshot_generator = take_screenshot_macos(
|
||||||
@ -261,6 +507,18 @@ def take_screenshot(
|
|||||||
app_name,
|
app_name,
|
||||||
window_title,
|
window_title,
|
||||||
)
|
)
|
||||||
|
elif platform.system() == "Linux" or platform.system() == "linux":
|
||||||
|
print("Linux")
|
||||||
|
screenshot_generator = take_screenshot_linux(
|
||||||
|
base_dir,
|
||||||
|
previous_hashes,
|
||||||
|
threshold,
|
||||||
|
screen_sequences,
|
||||||
|
date,
|
||||||
|
timestamp,
|
||||||
|
app_name,
|
||||||
|
window_title,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
f"Unsupported operating system: {platform.system()}"
|
f"Unsupported operating system: {platform.system()}"
|
||||||
@ -285,6 +543,29 @@ def is_screen_locked():
|
|||||||
elif platform.system() == "Windows":
|
elif platform.system() == "Windows":
|
||||||
user32 = ctypes.windll.User32
|
user32 = ctypes.windll.User32
|
||||||
return user32.GetForegroundWindow() == 0
|
return user32.GetForegroundWindow() == 0
|
||||||
|
elif platform.system() == "Linux":
|
||||||
|
try:
|
||||||
|
# Check for GNOME screensaver
|
||||||
|
output = subprocess.check_output(
|
||||||
|
["gnome-screensaver-command", "-q"], stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
return b"is active" in output
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
# Check for XScreenSaver
|
||||||
|
output = subprocess.check_output(
|
||||||
|
["xscreensaver-command", "-time"], stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
return b"screen locked" in output
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
# Check for Light-locker (XFCE, LXDE)
|
||||||
|
output = subprocess.check_output(
|
||||||
|
["light-locker-command", "-q"], stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
return b"is locked" in output
|
||||||
|
except:
|
||||||
|
return False # If no screensaver utils found, assume not locked
|
||||||
|
|
||||||
|
|
||||||
def run_screen_recorder_once(threshold, base_dir, previous_hashes):
|
def run_screen_recorder_once(threshold, base_dir, previous_hashes):
|
||||||
@ -317,6 +598,7 @@ def run_screen_recorder(threshold, base_dir, previous_hashes):
|
|||||||
date,
|
date,
|
||||||
timestamp,
|
timestamp,
|
||||||
)
|
)
|
||||||
|
print(screenshot_files)
|
||||||
for screenshot_file in screenshot_files:
|
for screenshot_file in screenshot_files:
|
||||||
logging.info(f"Screenshot saved: {screenshot_file}")
|
logging.info(f"Screenshot saved: {screenshot_file}")
|
||||||
else:
|
else:
|
||||||
@ -337,7 +619,9 @@ def main():
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
base_dir = (
|
base_dir = (
|
||||||
os.path.expanduser(args.base_dir) if args.base_dir else settings.resolved_screenshots_dir
|
os.path.expanduser(args.base_dir)
|
||||||
|
if args.base_dir
|
||||||
|
else settings.resolved_screenshots_dir
|
||||||
)
|
)
|
||||||
previous_hashes = load_previous_hashes(base_dir)
|
previous_hashes = load_previous_hashes(base_dir)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user