| | import os |
| | import time |
| | import subprocess |
| | import datetime |
| |
|
| | BACKUP_REPO = os.environ.get("BACKUP_REPO") |
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| |
|
| | def run_backup(): |
| | env = os.environ.copy() |
| | env["HF_HOME"] = "/tmp/hf_cache" |
| | env["XDG_CACHE_HOME"] = "/tmp/xdg_cache" |
| | env["TMPDIR"] = "/tmp" |
| | env["HF_TOKEN"] = HF_TOKEN |
| |
|
| | os.makedirs(env["HF_HOME"], exist_ok=True) |
| | os.makedirs(env["XDG_CACHE_HOME"], exist_ok=True) |
| | os.makedirs(env["TMPDIR"], exist_ok=True) |
| |
|
| | local_path = "/home/vscode/workspace" |
| |
|
| | |
| | print("[Backup] Deleting old files...") |
| | cmd = [ |
| | "hf", "repo-files", "delete", BACKUP_REPO, "workspace/", |
| | "--repo-type", "dataset", |
| | ] |
| | process = subprocess.Popen( |
| | cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| | env=env, text=True, cwd="/tmp" |
| | ) |
| | for line in iter(process.stdout.readline, ""): |
| | print("[Backup]", line.strip()) |
| | process.wait() |
| |
|
| | |
| | print("[Backup] Uploading workspace...") |
| | cmd = [ |
| | "hf", "upload", BACKUP_REPO, local_path, "workspace/", |
| | "--repo-type", "dataset", |
| | ] |
| | process = subprocess.Popen( |
| | cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| | env=env, text=True, cwd="/tmp" |
| | ) |
| | for line in iter(process.stdout.readline, ""): |
| | print("[Backup]", line.strip()) |
| | process.wait() |
| |
|
| | print("[Backup] Completed at", datetime.datetime.utcnow().isoformat(), "UTC") |
| |
|
| | if __name__ == "__main__": |
| | if not BACKUP_REPO or not HF_TOKEN: |
| | print("[Backup] BACKUP_REPO or HF_TOKEN not set! Please set it in your Spaces secrets to use the Backup Service.") |
| | exit(0) |
| |
|
| | while True: |
| | print("[Backup] Backup started.") |
| | try: |
| | run_backup() |
| | except Exception as e: |
| | print("[Backup] Failed:", e) |
| | time.sleep(45 * 60) |