This commit is contained in:
András Wiesner 2026-02-19 10:22:50 +01:00
commit b45fcb1c33
6 changed files with 309 additions and 0 deletions

4
install.bat Executable file
View File

@ -0,0 +1,4 @@
python3 -m venv .venv
call .venv\Scripts\activate.bat
python3 -m pip install --upgrade pip
python3 -m pip install -r requirements.txt

5
install.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
watchdog
requests

2
run.bat Executable file
View File

@ -0,0 +1,2 @@
call .venv\Scripts\activate.bat
python3 sync_client.py

4
run.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
source .venv/bin/activate
python3 sync_client.py

292
sync_client.py Normal file
View File

@ -0,0 +1,292 @@
import tkinter as tk
from tkinter import ttk
from tkinter import filedialog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
import requests
class SyncClient:
def start(self) -> None:
_sself = self
class FileChangeHandler(FileSystemEventHandler):
EVENTS_WATCHED = [ "moved", "deleted", "created", "modified" ]
def on_any_event(self, event):
if event.event_type in FileChangeHandler.EVENTS_WATCHED:
if event.src_path != _sself.dir():
rel_path = str(event.src_path).removeprefix(_sself.dir())
if event.event_type == "created" or event.event_type == "modified":
_sself.upload(rel_path)
elif event.event_type == "moved":
src_rel_path = rel_path
dst_rel_path = str(event.dest_path).removeprefix(_sself.dir())
_sself.move(src_rel_path, dst_rel_path)
pass
elif event.event_type == "deleted":
_sself.delete(rel_path)
pass
observer = Observer()
event_handler = FileChangeHandler()
observer.schedule(event_handler, self.__dir, recursive=True)
observer.start()
self.__observer = observer
def __init__(self, dir: str, url: str, id: str, key: str) -> None:
self.__dir = dir
self.__url = url
self.__id = id
self.__key = key
self.start()
def dir(self) -> str:
return self.__dir
def _get_file_path(self, filename: str) -> str:
return self.__dir + os.sep + filename
def _get_file_type(self, filename: str) -> str:
path = self._get_file_path(filename)
if os.path.isdir(path):
return "directory"
else:
return "file"
def upload(self, filename: str) -> bool:
print("[UPLOAD]", filename)
params = {
"action": "upload",
"filename": filename,
"type": self._get_file_type(filename),
"session_id": self.__id,
"session_key": self.__key
}
if params["type"] == "file":
path = self.__dir + os.sep + filename
with open(path, "rb") as f:
requests.post(self.__url, data=f.read(), headers=params)
return True
elif params["type"] == "directory":
requests.post(self.__url, headers=params)
return True
return False
def delete(self, filename: str) -> bool:
print("[DELETE]", filename)
params = {
"action": "delete",
"filename": filename,
"type": self._get_file_type(filename),
"session_id": self.__id,
"session_key": self.__key
}
requests.post(self.__url, headers=params)
return True
def wipe(self) -> bool:
print("[WIPE] workspace")
params = {
"action": "wipe",
"filename": "*",
"type": "*",
"session_id": self.__id,
"session_key": self.__key
}
requests.post(self.__url, headers=params)
return True
def move(self, src: str, dst: str) -> bool:
print("[MOVE] ", src, "->", dst)
params = {
"action": "move",
"filename": src,
"filename2": dst,
"type": self._get_file_type(dst),
"session_id": self.__id,
"session_key": self.__key
}
requests.post(self.__url, headers=params)
return True
def close(self) -> None:
self.__observer.stop()
def upload_all(self) -> None:
# create directories first
def diriter(path, rel_path = "", types = [ "directory", "file" ]):
listing = []
for entry in os.scandir(path):
path = (rel_path + os.sep + entry.name).removeprefix("/")
name = entry.name.removeprefix("/")
if entry.is_dir():
if "directory" in types:
listing.append({"type": "directory", "path": path, "name": name})
dir_listing = diriter(entry.path, rel_path=path, types=types)
listing.extend(dir_listing)
else:
if "file" in types:
listing.append({"type": "file",
"path": path,
"name": name})
return listing
dirs = diriter(self.dir(), types=["directory"])
for dir in dirs:
self.upload(dir["path"])
# then upload the files
files = diriter(self.dir(), types=["file"])
for file in files:
self.upload(file["path"])
pass
# ------------------
class SyncClientGui:
def init_gui(self) -> None:
win = tk.Tk()
win.title("CodeCast Kliens")
win.geometry("400x120")
self.__win = win
frame = ttk.Frame(win)
frame.pack(fill="both")
dirsel_frame = ttk.Frame(frame)
dirsel_frame.pack(fill="x")
dirsel_label = ttk.Label(dirsel_frame, text="Könyvtár:", width=8)
dirsel_label.pack(side=tk.LEFT)
def select_directory():
sel_dir = filedialog.askdirectory()
if sel_dir is not None and sel_dir != "":
self.__dir = sel_dir
dirsel_btn.configure(text=sel_dir)
dirsel_btn = ttk.Button(dirsel_frame, text="(nincs kiválasztva)", command=select_directory)
dirsel_btn.pack(side=tk.RIGHT, fill="x")
url_frame = ttk.Frame(frame)
url_frame.pack(fill="x")
url_label = ttk.Label(url_frame, text="URL:", width=8)
url_label.pack(side=tk.LEFT)
url_tf = ttk.Entry(url_frame)
url_tf.pack(fill="x", expand=True, side=tk.RIGHT)
id_frame = ttk.Frame(frame)
id_frame.pack(fill="x")
id_label = ttk.Label(id_frame, text="ID:", width=8)
id_label.pack(side=tk.LEFT)
id_tf = ttk.Entry(id_frame)
id_tf.pack(fill="x", expand=True, side=tk.RIGHT)
key_frame = ttk.Frame(frame)
key_frame.pack(fill="x")
key_label = ttk.Label(key_frame, text="Kulcs:", width=8)
key_label.pack(side=tk.LEFT)
key_tf = ttk.Entry(key_frame, show="*")
key_tf.pack(fill="x", expand=True, side=tk.RIGHT)
action_frame = ttk.Frame(frame)
action_frame.pack(fill="x")
def toggle_sync():
if not self.__sync_enabled:
self.__url = url_tf.get().strip()
self.__id = id_tf.get().strip()
self.__key = key_tf.get().strip()
if self.__url != "" and self.__id != "" and self.__key != "" and self.__dir != "":
self.start_sync()
else:
self.stop_sync()
if self.__sync_enabled:
on_off_btn.config(text="Szinkronizálás KI")
else:
on_off_btn.config(text="Szinkronizálás BE")
on_off_btn = ttk.Button(action_frame, text="Szinkronizálás BE", command=toggle_sync)
on_off_btn.pack(fill="x")
def __init__(self) -> None:
self.__dir = ""
self.__url = ""
self.__id = ""
self.__key = ""
self.__sc: SyncClient
self.__sync_enabled = False
self.init_gui()
def start_sync(self) -> None:
if not self.__sync_enabled:
self.__sc = SyncClient(self.__dir, self.__url, self.__id, self.__key)
self.__sc.wipe()
self.__sc.upload_all()
self.__sc.start()
self.__sync_enabled = True
def stop_sync(self) -> None:
if self.__sync_enabled:
self.__sc.close()
self.__sync_enabled = False
def mainloop(self) -> None:
self.__win.mainloop()
self.stop_sync()
# ------------------
if __name__ == "__main__":
gui = SyncClientGui()
gui.mainloop()