import os import re import threading import tkinter as tk from tkinter import ttk, filedialog, messagebox from concurrent.futures import ThreadPoolExecutor import urllib.request import urllib.parse import urllib.error DEFAULT_GATEWAYS = [ "https://ipfs.ninja/ipfs/", "https://gw.ipfs-lens.dev/ipfs/", "https://ipfs.aleph.cloud/ipfs/" ] class IPFSDownloaderApp: def __init__(self, root): self.root = root self.root.title("IPFS 批量下载器 (纯Python原生版)") self.root.geometry("800x650") self.all_paths = [] self.selected_paths = set() self.downloading = False self.pending_files = [] self._setup_ui() def _setup_ui(self): # --- 输入区域 --- input_frame = ttk.LabelFrame(self.root, text="基本设置", padding=10) input_frame.pack(fill=tk.X, padx=10, pady=5) ttk.Label(input_frame, text="自定义网关 (可选):").grid(row=0, column=0, sticky=tk.W, pady=2) self.gw_entry = ttk.Entry(input_frame, width=50) self.gw_entry.grid(row=0, column=1, sticky=tk.W, pady=2) ttk.Label(input_frame, text="文件 CID:").grid(row=1, column=0, sticky=tk.W, pady=2) self.cid_entry = ttk.Entry(input_frame, width=50) self.cid_entry.grid(row=1, column=1, sticky=tk.W, pady=2) ttk.Label(input_frame, text="TXT 文件路径:").grid(row=2, column=0, sticky=tk.W, pady=2) self.txt_entry = ttk.Entry(input_frame, width=50) self.txt_entry.grid(row=2, column=1, sticky=tk.W, pady=2) ttk.Button(input_frame, text="浏览", command=self.browse_txt).grid(row=2, column=2, padx=5) ttk.Label(input_frame, text="保存目录:").grid(row=3, column=0, sticky=tk.W, pady=2) self.save_entry = ttk.Entry(input_frame, width=50) self.save_entry.insert(0, os.path.join(os.getcwd(), "download")) self.save_entry.grid(row=3, column=1, sticky=tk.W, pady=2) ttk.Button(input_frame, text="浏览", command=self.browse_save).grid(row=3, column=2, padx=5) ttk.Label(input_frame, text="同时下载数量:").grid(row=4, column=0, sticky=tk.W, pady=2) self.threads_entry = ttk.Entry(input_frame, width=10) self.threads_entry.insert(0, "3") self.threads_entry.grid(row=4, column=1, sticky=tk.W, pady=2) # --- 树状预览区域 --- tree_frame = ttk.LabelFrame(self.root, text="文件选择 (点击复选框切换)", padding=10) tree_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) self.tree = ttk.Treeview(tree_frame, selectmode='none') self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.tree.configure(yscrollcommand=scrollbar.set) self.tree.bind('', self.toggle_check) # --- 控制与日志区域 --- control_frame = tk.Frame(self.root) control_frame.pack(fill=tk.X, padx=10, pady=5) self.start_btn = ttk.Button(control_frame, text="开始下载", command=self.start_download) self.start_btn.pack(side=tk.LEFT, padx=5) self.log_text = tk.Text(self.root, height=8, state=tk.DISABLED) self.log_text.pack(fill=tk.BOTH, padx=10, pady=5) def browse_txt(self): path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) if path: self.txt_entry.delete(0, tk.END) self.txt_entry.insert(0, path) self.load_txt_to_tree(path) def browse_save(self): path = filedialog.askdirectory() if path: self.save_entry.delete(0, tk.END) self.save_entry.insert(0, path) def log(self, msg): self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, msg + "\n") self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) self.root.update_idletasks() def parse_txt(self, filepath): paths = [] path_stack = {} with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip('\n').strip('\r') # 过滤无用信息 if not line or line.startswith('='): continue if line.startswith("文件夹清单:") or line.startswith("位置:") or \ line.startswith("总文件数:") or line.startswith("总大小:"): continue # 读取平铺格式(如读取"未下载完.txt") if "├──" not in line and "└──" not in line and "/" in line and not line.startswith("│"): paths.append(line.split(' (')[0].strip()) continue # 解析树状结构 match = re.search(r'([├└]──\s*)(.*)', line) if match: prefix_len = len(line[:match.start()]) name_size = match.group(2) else: name_size = line.lstrip('│ ').strip() if not name_size: continue prefix_len = -1 name = re.sub(r'\s*\(\d+(\.\d+)?\s*[KMGTP]?B,\s*[\d,]+\s*bytes\)$', '', name_size).strip() is_dir = name.endswith('/') if is_dir: name = name[:-1] keys_to_remove = [k for k in path_stack.keys() if k >= prefix_len] for k in keys_to_remove: del path_stack[k] path_stack[prefix_len] = name valid_keys = sorted(path_stack.keys()) current_parts = [path_stack[k] for k in valid_keys] current_path = "/".join(current_parts) if not is_dir: paths.append(current_path) return paths def load_txt_to_tree(self, filepath): self.tree.delete(*self.tree.get_children()) try: self.all_paths = self.parse_txt(filepath) self.selected_paths = set(self.all_paths) nodes = {} for path in self.all_paths: parts = path.split('/') current_id = "" for i, part in enumerate(parts): parent_id = current_id current_id = f"{parent_id}/{part}" if parent_id else part if current_id not in nodes: text = f"[☑] {part}" node = self.tree.insert(parent_id, "end", current_id, text=text, open=True) nodes[current_id] = node except Exception as e: messagebox.showerror("解析错误", f"解析 TXT 文件失败: {str(e)}") def toggle_check(self, event): item = self.tree.identify_row(event.y) if not item: return text = self.tree.item(item, "text") new_state = "[☐]" if "[☑]" in text else "[☑]" self.tree.item(item, text=text.replace("[☑]", "[x]").replace("[☐]", "[☑]").replace("[x]", "[☐]")) def toggle_children(node_id, state): for child in self.tree.get_children(node_id): c_text = self.tree.item(child, "text") self.tree.item(child, text=c_text.replace("[☑]", "[x]").replace("[☐]", state).replace("[x]", state)) toggle_children(child, state) toggle_children(item, new_state) self.update_selected_paths() def update_selected_paths(self): self.selected_paths.clear() for path in self.all_paths: node_id = path if self.tree.exists(node_id) and "[☑]" in self.tree.item(node_id, "text"): self.selected_paths.add(path) def write_unfinished(self): txt_dir = os.path.dirname(self.txt_entry.get()) unfinish_file = os.path.join(txt_dir, "未下载完.txt") with open(unfinish_file, 'w', encoding='utf-8') as f: for p in self.pending_files: f.write(p + "\n") def download_worker(self, file_path, cid, gateways, save_dir): # 处理本地路径 local_path = os.path.join(save_dir, file_path.replace('/', os.sep)) os.makedirs(os.path.dirname(local_path), exist_ok=True) # 检查是否已完成下载 if os.path.exists(local_path): self.log(f"已存在,跳过: {file_path}") return True temp_path = local_path + ".downloading" # 将带有 Emoji/日文的路径进行 URL 编码,避免 HTTP 请求报错 encoded_file_path = urllib.parse.quote(file_path) for gw in gateways: if os.path.exists(temp_path): try: os.remove(temp_path) except Exception: pass url = f"{gw.rstrip('/')}/{cid}/{encoded_file_path}" try: # 伪装成浏览器请求 req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) # 开始流式下载 (超时设为30秒) with urllib.request.urlopen(req, timeout=30) as response, open(temp_path, 'wb') as out_file: # 分块写入,防止将大文件全部读入内存 while True: chunk = response.read(8192) if not chunk: break out_file.write(chunk) # 下载成功,去除 .downloading 后缀 os.rename(temp_path, local_path) self.log(f"下载成功: {file_path}") return True except Exception as e: # 当前网关失败,自动切换下一个 continue # 如果所有网关都失败,清理残留的临时文件 if os.path.exists(temp_path): try: os.remove(temp_path) except: pass self.log(f"下载失败 (已重试所有网关): {file_path}") return False def start_download(self): if self.downloading: return cid = self.cid_entry.get().strip() save_dir = self.save_entry.get().strip() if not cid or not self.selected_paths: messagebox.showwarning("提示", "请填写 CID 并选择要下载的文件!") return gateways = [] custom_gw = self.gw_entry.get().strip() if custom_gw: gateways.append(custom_gw) gateways.extend(DEFAULT_GATEWAYS) try: max_workers = int(self.threads_entry.get().strip()) except: max_workers = 3 self.pending_files = list(self.selected_paths) self.write_unfinished() self.downloading = True self.start_btn.config(state=tk.DISABLED) self.log("开始原生批量下载...") def run_downloads(): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self.download_worker, p, cid, gateways, save_dir): p for p in self.pending_files.copy()} for future in futures: p = futures[future] if future.result(): if p in self.pending_files: self.pending_files.remove(p) self.write_unfinished() self.downloading = False self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL)) self.root.after(0, lambda: self.log("所有下载任务执行完毕!")) threading.Thread(target=run_downloads, daemon=True).start() if __name__ == "__main__": root = tk.Tk() app = IPFSDownloaderApp(root) root.mainloop()