430 lines
14 KiB
Python
430 lines
14 KiB
Python
import fnmatch
|
|
import platform
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
from tkinter import filedialog, messagebox
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
|
import os
|
|
import requests
|
|
import json
|
|
import threading
|
|
import queue
|
|
|
|
class SyncClient:
|
|
@classmethod
|
|
def __convert_to_unix_path(cls, path: str) -> str:
|
|
if platform.system() == "Windows":
|
|
return path.replace("\\", "/").removeprefix("/")
|
|
else:
|
|
return path.removeprefix("/")
|
|
|
|
SETTINGS_DIR: str = ".codecast"
|
|
MAIN_SETTINGS_FILE: str = SETTINGS_DIR + os.sep + "settings.json"
|
|
|
|
def start(self) -> None:
|
|
_sself = self
|
|
|
|
self.load_settings()
|
|
|
|
self._event_queue = queue.Queue()
|
|
|
|
def fs_watch_thread_routine(queue: queue.Queue):
|
|
run = True
|
|
while run:
|
|
event: FileSystemEvent = queue.get()
|
|
|
|
if event.src_path == "**CLOSE**" and event.is_synthetic == True:
|
|
run = False
|
|
continue
|
|
elif event.src_path == SyncClient.MAIN_SETTINGS_FILE or event.dest_path == SyncClient.MAIN_SETTINGS_FILE:
|
|
_sself.load_settings()
|
|
|
|
rel_path = str(event.src_path).removeprefix(_sself.dir()).removeprefix(os.sep)
|
|
for pattern in self.get_ignored_paths():
|
|
if fnmatch.fnmatch(rel_path, pattern):
|
|
continue
|
|
|
|
if event.event_type == "created" or event.event_type == "modified":
|
|
_sself.upload(rel_path)
|
|
elif event.event_type == "moved":
|
|
src_rel_path = rel_path
|
|
dst_rel_path = str(event.dest_path).removeprefix(_sself.dir())
|
|
_sself.move(src_rel_path, dst_rel_path)
|
|
pass
|
|
elif event.event_type == "deleted":
|
|
_sself.delete(rel_path)
|
|
pass
|
|
|
|
|
|
class FileChangeHandler(FileSystemEventHandler):
|
|
EVENTS_WATCHED = [ "moved", "deleted", "created", "modified" ]
|
|
|
|
def on_any_event(self, event):
|
|
if event.event_type in FileChangeHandler.EVENTS_WATCHED:
|
|
if event.src_path != _sself.dir():
|
|
_sself._event_queue.put(event)
|
|
|
|
self.__watch_thread = threading.Thread(target=fs_watch_thread_routine, args=(self._event_queue,))
|
|
self.__watch_thread.start()
|
|
|
|
observer = Observer()
|
|
event_handler = FileChangeHandler()
|
|
observer.schedule(event_handler, self.__dir, recursive=True)
|
|
observer.start()
|
|
|
|
self.__observer = observer
|
|
|
|
|
|
def __init__(self, dir: str, url: str, id: str, key: str) -> None:
|
|
self.__dir = dir
|
|
self.__url = url
|
|
self.__id = id
|
|
self.__key = key
|
|
self.__ignored_paths = set()
|
|
|
|
self.__watch_thread: threading.Thread
|
|
self._event_queue: queue.Queue
|
|
|
|
|
|
def dir(self) -> str:
|
|
return self.__dir
|
|
|
|
|
|
def _get_file_path(self, filename: str) -> str:
|
|
return self.__dir + os.sep + filename
|
|
|
|
|
|
def _get_file_type(self, filename: str) -> str:
|
|
path = self._get_file_path(filename)
|
|
if os.path.isdir(path):
|
|
return "directory"
|
|
else:
|
|
return "file"
|
|
|
|
|
|
def _set_ignored_paths(self, ignored_paths: list | set) -> None:
|
|
old_ignored_paths = set(self.__ignored_paths)
|
|
new_ignored_paths = set(ignored_paths)
|
|
|
|
included = old_ignored_paths - new_ignored_paths
|
|
excluded = new_ignored_paths - old_ignored_paths
|
|
|
|
self.upload_all(included)
|
|
self.delete_all(excluded)
|
|
|
|
self.__ignored_paths = new_ignored_paths
|
|
|
|
|
|
def get_ignored_paths(self) -> set:
|
|
return self.__ignored_paths
|
|
|
|
|
|
def load_settings(self) -> None:
|
|
try:
|
|
with open(self.__dir + os.sep + self.MAIN_SETTINGS_FILE, "r") as sf:
|
|
settings = json.loads(sf.read())
|
|
self._set_ignored_paths(settings.get("ignored-paths", []))
|
|
except:
|
|
self._set_ignored_paths(set())
|
|
|
|
|
|
def upload(self, filename: str) -> bool:
|
|
print("[UPLOAD]", filename)
|
|
|
|
params = {
|
|
"action": "upload",
|
|
"filename": SyncClient.__convert_to_unix_path(filename),
|
|
"type": self._get_file_type(filename),
|
|
"session_id": self.__id,
|
|
"session_key": self.__key
|
|
}
|
|
|
|
if params["type"] == "file":
|
|
path = self.__dir + os.sep + filename
|
|
with open(path, "rb") as f:
|
|
requests.post(self.__url, data=f.read(), headers=params)
|
|
return True
|
|
elif params["type"] == "directory":
|
|
requests.post(self.__url, headers=params)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def delete(self, filename: str) -> bool:
|
|
print("[DELETE]", filename)
|
|
|
|
params = {
|
|
"action": "delete",
|
|
"filename": SyncClient.__convert_to_unix_path(filename),
|
|
"type": self._get_file_type(filename),
|
|
"session_id": self.__id,
|
|
"session_key": self.__key
|
|
}
|
|
|
|
requests.post(self.__url, headers=params)
|
|
|
|
return True
|
|
|
|
|
|
def wipe(self) -> bool:
|
|
print("[WIPE] workspace")
|
|
|
|
params = {
|
|
"action": "wipe",
|
|
"filename": "*",
|
|
"type": "*",
|
|
"session_id": self.__id,
|
|
"session_key": self.__key
|
|
}
|
|
|
|
requests.post(self.__url, headers=params)
|
|
|
|
return True
|
|
|
|
|
|
def move(self, src: str, dst: str) -> bool:
|
|
print("[MOVE] ", src, "->", dst)
|
|
|
|
params = {
|
|
"action": "move",
|
|
"filename": SyncClient.__convert_to_unix_path(src),
|
|
"filename2": SyncClient.__convert_to_unix_path(dst),
|
|
"type": self._get_file_type(dst),
|
|
"session_id": self.__id,
|
|
"session_key": self.__key
|
|
}
|
|
|
|
requests.post(self.__url, headers=params)
|
|
|
|
return True
|
|
|
|
def close(self) -> None:
|
|
self.__observer.stop()
|
|
self._event_queue.put(FileSystemEvent("**CLOSE**", is_synthetic=True))
|
|
self.__watch_thread.join()
|
|
return
|
|
|
|
|
|
def __list_files(self, types = [ "directory", "file" ], path_match: set | None = None) -> list:
|
|
def diriter(path, rel_path = "", types = [ "directory", "file" ], path_match: set | None = None):
|
|
listing = []
|
|
for entry in os.scandir(path):
|
|
path = (rel_path + os.sep + entry.name).removeprefix(os.sep)
|
|
|
|
if path_match is not None:
|
|
skip_entry = True
|
|
for pattern in path_match:
|
|
if fnmatch.fnmatch(path, pattern):
|
|
skip_entry = False
|
|
break
|
|
else:
|
|
skip_entry = False
|
|
|
|
if not skip_entry:
|
|
name = entry.name.removeprefix(os.sep)
|
|
if entry.is_dir():
|
|
if "directory" in types:
|
|
listing.append({"type": "directory", "path": path, "name": name})
|
|
dir_listing = diriter(entry.path, rel_path=path, types=types, path_match=path_match)
|
|
listing.extend(dir_listing)
|
|
else:
|
|
if "file" in types:
|
|
listing.append({"type": "file",
|
|
"path": path,
|
|
"name": name})
|
|
return listing
|
|
|
|
return diriter(self.dir(), types=types, path_match=path_match)
|
|
|
|
|
|
def upload_all(self, path_match: set | None = None) -> None:
|
|
# create directories first
|
|
dirs = self.__list_files(types=["directory"], path_match=path_match)
|
|
|
|
for dir in dirs:
|
|
self.upload(dir["path"])
|
|
|
|
# then upload the files
|
|
files = self.__list_files(types=["file"], path_match=path_match)
|
|
|
|
for file in files:
|
|
self.upload(file["path"])
|
|
|
|
pass
|
|
|
|
def delete_all(self, path_match: set | None = None) -> None:
|
|
files_and_dirs = self.__list_files(path_match=path_match)
|
|
|
|
for entry in files_and_dirs:
|
|
self.delete(entry["path"])
|
|
|
|
# ------------------
|
|
|
|
class SyncClientGui:
|
|
SETTINGS_FILE = "settings.json"
|
|
|
|
def __init_gui(self) -> None:
|
|
win = tk.Tk()
|
|
win.title("CodeCast Kliens")
|
|
win.geometry("400x120")
|
|
self.__win = win
|
|
|
|
frame = ttk.Frame(win)
|
|
frame.pack(fill="both")
|
|
|
|
|
|
dirsel_frame = ttk.Frame(frame)
|
|
dirsel_frame.pack(fill="x")
|
|
|
|
dirsel_label = ttk.Label(dirsel_frame, text="Könyvtár:", width=8)
|
|
dirsel_label.pack(side=tk.LEFT)
|
|
|
|
def select_directory():
|
|
sel_dir = filedialog.askdirectory()
|
|
if sel_dir is not None and sel_dir != "":
|
|
self.__dir = sel_dir
|
|
dirsel_btn.configure(text=sel_dir)
|
|
|
|
|
|
dirsel_btn = ttk.Button(dirsel_frame, text="(nincs kiválasztva)", command=select_directory)
|
|
dirsel_btn.pack(side=tk.RIGHT, fill="x")
|
|
|
|
url_frame = ttk.Frame(frame)
|
|
url_frame.pack(fill="x")
|
|
|
|
url_label = ttk.Label(url_frame, text="URL:", width=8)
|
|
url_label.pack(side=tk.LEFT)
|
|
|
|
url_tf = ttk.Entry(url_frame)
|
|
url_tf.pack(fill="x", expand=True, side=tk.RIGHT)
|
|
|
|
|
|
id_frame = ttk.Frame(frame)
|
|
id_frame.pack(fill="x")
|
|
|
|
id_label = ttk.Label(id_frame, text="ID:", width=8)
|
|
id_label.pack(side=tk.LEFT)
|
|
|
|
id_tf = ttk.Entry(id_frame)
|
|
id_tf.pack(fill="x", expand=True, side=tk.RIGHT)
|
|
|
|
|
|
key_frame = ttk.Frame(frame)
|
|
key_frame.pack(fill="x")
|
|
|
|
key_label = ttk.Label(key_frame, text="Kulcs:", width=8)
|
|
key_label.pack(side=tk.LEFT)
|
|
|
|
key_tf = ttk.Entry(key_frame, show="*")
|
|
key_tf.pack(fill="x", expand=True, side=tk.RIGHT)
|
|
|
|
|
|
action_frame = ttk.Frame(frame)
|
|
action_frame.pack(fill="x")
|
|
|
|
def toggle_sync():
|
|
if not self.__sync_enabled:
|
|
self.__url = url_tf.get().strip().removesuffix("/") + "/sync"
|
|
self.__id = id_tf.get().strip()
|
|
self.__key = key_tf.get().strip()
|
|
|
|
if self.__url != "" and self.__id != "" and self.__key != "" and self.__dir != "":
|
|
self.start_sync()
|
|
else:
|
|
self.stop_sync()
|
|
|
|
|
|
if self.__sync_enabled:
|
|
on_off_btn.config(text="Szinkronizálás KI")
|
|
state = tk.DISABLED
|
|
else:
|
|
on_off_btn.config(text="Szinkronizálás BE")
|
|
state = tk.NORMAL
|
|
|
|
dirsel_btn.configure(state=state)
|
|
url_tf.configure(state=state)
|
|
id_tf.configure(state=state)
|
|
key_tf.configure(state=state)
|
|
|
|
on_off_btn = ttk.Button(action_frame, text="Szinkronizálás BE", command=toggle_sync)
|
|
on_off_btn.pack(fill="x")
|
|
|
|
self.__url_tf = url_tf
|
|
self.__id_tf = id_tf
|
|
self.__key_tf = key_tf
|
|
self.__dirsel_btn = dirsel_btn
|
|
|
|
|
|
def __load_last_settings(self) -> None:
|
|
try:
|
|
with open(SyncClientGui.SETTINGS_FILE, "r") as sf:
|
|
settings = json.loads(sf.read())
|
|
|
|
self.__dirsel_btn.configure(text=settings.get("dir", "(nincs kiválasztva)"))
|
|
self.__dir = settings.get("dir", "")
|
|
self.__url_tf.insert(0, settings.get("url", ""))
|
|
self.__id_tf.insert(0, settings.get("id", ""))
|
|
self.__key_tf.insert(0, settings.get("key", ""))
|
|
except:
|
|
pass
|
|
|
|
def __save_settings(self) -> None:
|
|
with open(SyncClientGui.SETTINGS_FILE, "w") as sf:
|
|
settings = {
|
|
"dir": self.__dir,
|
|
"url": self.__url_tf.get(),
|
|
"id": self.__id_tf.get(),
|
|
"key": self.__key_tf.get(),
|
|
}
|
|
sf.write(json.dumps(settings))
|
|
|
|
|
|
def __init__(self) -> None:
|
|
self.__dir = ""
|
|
self.__url = ""
|
|
self.__id = ""
|
|
self.__key = ""
|
|
self.__sc: SyncClient
|
|
self.__sync_enabled = False
|
|
|
|
self.__init_gui()
|
|
self.__load_last_settings()
|
|
|
|
def on_close():
|
|
self.__save_settings()
|
|
self.stop_sync()
|
|
self.__win.destroy()
|
|
|
|
|
|
self.__win.protocol("WM_DELETE_WINDOW", on_close)
|
|
|
|
|
|
def start_sync(self) -> None:
|
|
if not self.__sync_enabled:
|
|
self.__sc = SyncClient(self.__dir, self.__url, self.__id, self.__key)
|
|
|
|
self.__sc.wipe()
|
|
|
|
self.__sc.start()
|
|
|
|
self.__sync_enabled = True
|
|
|
|
def stop_sync(self) -> None:
|
|
if self.__sync_enabled:
|
|
self.__sc.close()
|
|
self.__sync_enabled = False
|
|
|
|
|
|
def mainloop(self) -> None:
|
|
self.__win.mainloop()
|
|
return
|
|
|
|
# ------------------
|
|
|
|
if __name__ == "__main__":
|
|
gui = SyncClientGui()
|
|
|
|
gui.mainloop()
|