import os import re import time import random import socket import threading import tkinter as tk from tkinter import ttk, filedialog, messagebox from concurrent.futures import ThreadPoolExecutor import urllib.request import urllib.parse import urllib.error # 默认 IPFS 公共网关地址池 DEFAULT_GATEWAYS = [ "https://ipfs.io", "https://dweb.link", "https://w3s.link", "https://ipfs.filebase.io", "https://ipfs.orbitor.dev", "https://gateway.pinata.cloud", "https://ipfs.hypha.coop", "https://gw.ipfs-lens.dev", "https://i0.img2ipfs.com", "https://ipfs.interface.social", "https://trustless-gateway.link", "https://eu.orbitor.dev", "https://apac.orbitor.dev", "https://latam.orbitor.dev", "https://ipfs.storry.tv", "https://ipfs.runfission.com", "https://ipfs.dget.top", "https://ipfs.ecolatam.com", "https://ipfs.raribleuserdata.com", "https://eth.sucks", "https://ipfs.ninja", "https://ipfs.aleph.cloud", "https://gw.crust-gateway.xyz", "https://4everland.io" ] class IPFSDownloaderApp: def __init__(self, root): self.root = root self.root.title("IPFS 批量下载器") self.root.geometry("800x780") # 数据初始化 self.all_paths = [] self.selected_paths = set() self.downloading = False self.pending_files = [] # 线程锁与状态控制 self.gateway_lock = threading.Lock() self.speed_lock = threading.Lock() # 网关监控数据 self.gateway_fail_counts = {} # 记录网关累计失败次数 self.blacklisted_gateways = set() # 运行中被拉黑的网关 self.active_downloads = {} # 记录当前进行中的下载任务速度 self._setup_ui() # 拦截窗口关闭信号,绑定自定义的强制退出函数 self.root.protocol("WM_DELETE_WINDOW", self.force_exit) def _setup_ui(self): """初始化界面组件""" # 参数设置区域 input_frame = ttk.LabelFrame(self.root, text="基本设置", padding=10) input_frame.pack(fill=tk.X, padx=10, pady=5) # 自定义网关输入 ttk.Label(input_frame, text="自定义网关 (可选):").grid(row=0, column=0, sticky=tk.W, pady=2) self.gw_entry = ttk.Entry(input_frame, width=50) self.gw_entry.grid(row=0, column=1, sticky=tk.W, pady=2) ttk.Label(input_frame, text="多个用空格或逗号分隔", foreground="gray").grid(row=0, column=2, sticky=tk.W, padx=5) # 黑名单文件导入 ttk.Label(input_frame, text="黑名单文件 (可选):").grid(row=1, column=0, sticky=tk.W, pady=2) self.blacklist_entry = ttk.Entry(input_frame, width=50) self.blacklist_entry.grid(row=1, column=1, sticky=tk.W, pady=2) ttk.Button(input_frame, text="浏览", command=self.browse_blacklist).grid(row=1, column=2, padx=5) # IPFS CID 输入 ttk.Label(input_frame, text="文件 CID:").grid(row=2, column=0, sticky=tk.W, pady=2) self.cid_entry = ttk.Entry(input_frame, width=50) self.cid_entry.grid(row=2, column=1, sticky=tk.W, pady=2) # 目录结构文本导入 ttk.Label(input_frame, text="TXT 文件路径:").grid(row=3, column=0, sticky=tk.W, pady=2) self.txt_entry = ttk.Entry(input_frame, width=50) self.txt_entry.grid(row=3, column=1, sticky=tk.W, pady=2) ttk.Button(input_frame, text="浏览", command=self.browse_txt).grid(row=3, column=2, padx=5) # 本地保存目录 ttk.Label(input_frame, text="保存目录:").grid(row=4, column=0, sticky=tk.W, pady=2) self.save_entry = ttk.Entry(input_frame, width=50) self.save_entry.insert(0, os.path.join(os.getcwd(), "download")) self.save_entry.grid(row=4, column=1, sticky=tk.W, pady=2) ttk.Button(input_frame, text="浏览", command=self.browse_save).grid(row=4, column=2, padx=5) # 并发线程数 ttk.Label(input_frame, text="同时下载数量:").grid(row=5, column=0, sticky=tk.W, pady=2) self.threads_entry = ttk.Entry(input_frame, width=10) self.threads_entry.insert(0, "5") self.threads_entry.grid(row=5, column=1, sticky=tk.W, pady=2) # 目录树预览区域 tree_frame = ttk.LabelFrame(self.root, text="文件选择 (点击复选框切换)", padding=10) tree_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) self.tree = ttk.Treeview(tree_frame, selectmode='none') self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.tree.configure(yscrollcommand=scrollbar.set) self.tree.bind('', self.toggle_check) # 底部控制区 control_frame = tk.Frame(self.root) control_frame.pack(fill=tk.X, padx=10, pady=5) self.start_btn = ttk.Button(control_frame, text="开始下载", command=self.start_download) self.start_btn.pack(side=tk.LEFT, padx=5) # 日志输出文本框 self.log_text = tk.Text(self.root, height=8, state=tk.DISABLED) self.log_text.pack(fill=tk.BOTH, padx=10, pady=5) # 网关状态监控看板 circuit_frame = ttk.LabelFrame(self.root, text="网关状态看板 (累计2次阻塞/慢速自动拉黑)") circuit_frame.pack(fill=tk.X, padx=10, pady=5) self.blacklist_label = tk.Label(circuit_frame, text="暂无黑名单网关,所有通道运行良好。", fg="green", anchor="w", justify="left", wraplength=760) self.blacklist_label.pack(fill=tk.X, padx=5, pady=5) def force_exit(self): """强制退出函数:瞬间释放所有线程与网络连接,避免后台残留""" self.downloading = False try: self.root.destroy() except: pass os._exit(0) # 使用系统底层退出指令,直接终止整个 Python 进程 def browse_txt(self): path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) if path: self.txt_entry.delete(0, tk.END) self.txt_entry.insert(0, path) self.load_txt_to_tree(path) def browse_save(self): path = filedialog.askdirectory() if path: self.save_entry.delete(0, tk.END) self.save_entry.insert(0, path) def browse_blacklist(self): path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) if path: self.blacklist_entry.delete(0, tk.END) self.blacklist_entry.insert(0, path) def log(self, msg): """向界面文本框输出日志""" try: self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, msg + "\n") self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) self.root.update_idletasks() except: pass def update_blacklist_ui(self): """更新界面上的黑名单网关列表""" with self.gateway_lock: if not self.blacklisted_gateways: self.blacklist_label.config(text="暂无黑名单网关,所有通道运行良好。", fg="green") else: bad_gws = "\n".join([f"⚠️ 已自动拉黑网关: {gw}" for gw in self.blacklisted_gateways]) self.blacklist_label.config(text=bad_gws, fg="red") try: self.root.update_idletasks() except: pass def parse_txt(self, filepath): """解析导出的树状文本文件目录结构""" paths = [] path_stack = {} with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip('\n').strip('\r') if not line or line.startswith('='): continue if line.startswith("文件夹清单:") or line.startswith("位置:") or line.startswith("总文件数:") or line.startswith("总大小:"): continue if "├──" not in line and "└──" not in line and "/" in line and not line.startswith("│"): path_str = line.split(' (')[0].strip() if not path_str.endswith('/'): paths.append(path_str) continue match = re.search(r'([├└]──\s*)(.*)', line) if match: prefix_len = len(line[:match.start()]) name_size = match.group(2) else: name_size = line.lstrip('│ ').strip() if not name_size: continue prefix_len = -1 name = re.sub(r'\s*\(\d+(\.\d+)?\s*[KMGTP]?B,\s*[\d,]+\s*bytes\)$', '', name_size).strip() is_dir = name.endswith('/') if is_dir: name = name[:-1] keys_to_remove = [k for k in path_stack.keys() if k >= prefix_len] for k in keys_to_remove: del path_stack[k] path_stack[prefix_len] = name valid_keys = sorted(path_stack.keys()) current_parts = [path_stack[k] for k in valid_keys] current_path = "/".join(current_parts) if not is_dir: paths.append(current_path) return paths def load_txt_to_tree(self, filepath): """加载解析后的路径并渲染到 GUI Treeview 树状组件中""" self.tree.delete(*self.tree.get_children()) try: self.all_paths = self.parse_txt(filepath) self.selected_paths = set(self.all_paths) nodes = {} for path in self.all_paths: parts = path.split('/') current_id = "" for part in parts: parent_id = current_id current_id = f"{parent_id}/{part}" if parent_id else part if current_id not in nodes: text = f"[☑] {part}" node = self.tree.insert(parent_id, "end", current_id, text=text, open=True) nodes[current_id] = node except Exception as e: messagebox.showerror("解析错误", f"解析 TXT 文件失败: {str(e)}") def toggle_check(self, event): """处理树状列表中复选框的点击切换状态""" item = self.tree.identify_row(event.y) if not item: return text = self.tree.item(item, "text") new_state = "[☐]" if "[☑]" in text else "[☑]" self.tree.item(item, text=text.replace("[☑]", "[x]").replace("[☐]", "[☑]").replace("[x]", "[☐]")) def toggle_children(node_id, state): for child in self.tree.get_children(node_id): c_text = self.tree.item(child, "text") self.tree.item(child, text=c_text.replace("[☑]", "[x]").replace("[☐]", state).replace("[x]", state)) toggle_children(child, state) toggle_children(item, new_state) self.update_selected_paths() def update_selected_paths(self): """获取当前所有选中的文件路径集合""" self.selected_paths.clear() for path in self.all_paths: if self.tree.exists(path) and "[☑]" in self.tree.item(path, "text"): self.selected_paths.add(path) def _normalize_gateway(self, gw): """格式化网关 URL""" gw = gw.strip() if not gw: return "" if not gw.startswith("http://") and not gw.startswith("https://"): gw = "https://" + gw parsed = urllib.parse.urlparse(gw) return f"{parsed.scheme}://{parsed.netloc}/ipfs/" def speed_monitor_worker(self): """后台速度监控线程,计算网关速度差距并掐断极慢连接""" while self.downloading: time.sleep(15) if not self.downloading: break current_time = time.time() valid_speeds = [] with self.speed_lock: for f_path, info in self.active_downloads.items(): elapsed = current_time - info['start_time'] if elapsed > 12: speed = info['downloaded'] / elapsed valid_speeds.append((f_path, speed, info['gateway'])) if len(valid_speeds) >= 3: valid_speeds.sort(key=lambda x: x[1]) speeds = [x[1] for x in valid_speeds] n = len(speeds) gaps = [speeds[i+1] - speeds[i] for i in range(n-1)] g1 = gaps[0] if n == 3: other_gap_avg = gaps[1] else: other_gap_avg = sum(gaps[1:]) / len(gaps[1:]) if g1 > other_gap_avg * 2.5 and speeds[0] < 100 * 1024: slowest_file = valid_speeds[0][0] slowest_gw = valid_speeds[0][2] with self.speed_lock: if slowest_file in self.active_downloads: self.active_downloads[slowest_file]['abort_flag'] = True self.log(f"📉 速度监控:检测到网关传输极慢,已强行终止: {slowest_file} ({slowest_gw})") def download_worker(self, file_path, cid, custom_gateways, base_default_pool, save_dir): """核心下载工作线程""" if not self.downloading: return False local_path = os.path.join(save_dir, file_path.replace('/', os.sep)) os.makedirs(os.path.dirname(local_path), exist_ok=True) # 【修复点一】提前声明临时路径,方便在不同分支下进行清理 temp_path = local_path + ".downloading" # 检查正式目标文件是否已经存在 if os.path.exists(local_path): self.log(f"已存在,跳过: {file_path}") # 【修复点一】正式文件已下载好时,主动检查并擦除可能残存的历史遗留 downloading 临时文件 if os.path.exists(temp_path): try: os.remove(temp_path) except: pass return True with self.gateway_lock: active_customs = [g for g in custom_gateways if g not in self.blacklisted_gateways] active_defaults = [g for g in base_default_pool if g not in self.blacklisted_gateways] random.shuffle(active_defaults) final_gateways = active_customs + active_defaults if not final_gateways: self.log(f"❌ 错误: 无可用网关(所有网关已被拉黑): {file_path}") return False for gw in final_gateways: if not self.downloading: return False if os.path.exists(temp_path): try: os.remove(temp_path) except: pass url = f"{gw.rstrip('/')}/{cid}/{urllib.parse.quote(file_path)}" with self.speed_lock: self.active_downloads[file_path] = { 'downloaded': 0, 'start_time': time.time(), 'abort_flag': False, 'gateway': gw } try: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=10) as response: content_type = response.info().get_content_type() _, ext = os.path.splitext(file_path.lower()) if ('text/html' in content_type) and ext not in ['.html', '.htm', '.xhtml']: raise ValueError("软404错误页面") if hasattr(response, 'fp') and hasattr(response.fp, 'raw') and hasattr(response.fp.raw, '_sock'): response.fp.raw._sock.settimeout(5.0) with open(temp_path, 'wb') as out_file: while self.downloading: with self.speed_lock: if self.active_downloads.get(file_path, {}).get('abort_flag', False): raise TimeoutError("速度过慢,触发熔断机制") try: chunk = response.read(8192) except (socket.timeout, TimeoutError): raise TimeoutError("连接僵死:无任何字节流动") if not chunk: break out_file.write(chunk) with self.speed_lock: if file_path in self.active_downloads: self.active_downloads[file_path]['downloaded'] += len(chunk) # 【修复点二】如果在写入中途状态被外部切断,在 return 前必须同步擦除未完成的碎片文件 if not self.downloading: if os.path.exists(temp_path): try: os.remove(temp_path) except: pass return False os.rename(temp_path, local_path) self.log(f"下载成功: {file_path}") with self.speed_lock: if file_path in self.active_downloads: del self.active_downloads[file_path] return True except Exception as e: err_msg = str(e) if "timeout" in err_msg.lower() or "timed out" in err_msg.lower() or "速度过慢" in err_msg: if os.path.exists(temp_path): try: os.remove(temp_path) except: pass with self.gateway_lock: self.gateway_fail_counts[gw] = self.gateway_fail_counts.get(gw, 0) + 1 if self.gateway_fail_counts[gw] >= 2 and gw not in self.blacklisted_gateways: self.blacklisted_gateways.add(gw) self.log(f"🚨 网关熔断: {gw} 累计失败达2次,已自动隔离并写入黑名单文件!") try: with open("网关黑名单.txt", "a", encoding="utf-8") as bl_f: bl_f.write(gw + "\n") except: pass try: self.root.after(0, self.update_blacklist_ui) except: pass continue if os.path.exists(temp_path): try: os.remove(temp_path) except: pass with self.speed_lock: if file_path in self.active_downloads: del self.active_downloads[file_path] self.log(f"下载失败 (已尝试当前所有可用网关): {file_path}") return False def start_download(self): """开始下载任务""" if self.downloading: return cid = self.cid_entry.get().strip() save_dir = self.save_entry.get().strip() if not cid or not self.selected_paths: messagebox.showwarning("提示", "请填写 CID 并选择要下载的文件!") return file_blacklist_paths = set() bl_file_path = self.blacklist_entry.get().strip() if bl_file_path and os.path.exists(bl_file_path): try: with open(bl_file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: norm_bl = self._normalize_gateway(line) if norm_bl: file_blacklist_paths.add(norm_bl) self.log(f"ℹ️ 成功加载黑名单文件,已排除 {len(file_blacklist_paths)} 个网关。") except Exception as e: self.log(f"⚠️ 读取黑名单文件失败: {str(e)}") raw_custom_gws = re.split(r'[,,\s]+', self.gw_entry.get().strip()) custom_gateways = [] for gw in raw_custom_gws: if gw: norm_gw = self._normalize_gateway(gw) if norm_gw and norm_gw not in file_blacklist_paths and norm_gw not in custom_gateways: custom_gateways.append(norm_gw) base_default_pool = [] for gw in DEFAULT_GATEWAYS: norm_gw = self._normalize_gateway(gw) if norm_gw not in file_blacklist_paths and norm_gw not in custom_gateways and norm_gw not in base_default_pool: base_default_pool.append(norm_gw) try: max_workers = int(self.threads_entry.get().strip()) except: max_workers = 5 self.pending_files = list(self.selected_paths) self.downloading = True self.start_btn.config(state=tk.DISABLED) self.log("开始任务 (已启用速度监控、黑名单隔离与累计2次失败熔断机制)...") threading.Thread(target=self.speed_monitor_worker, daemon=True).start() def run_downloads(): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self.download_worker, p, cid, custom_gateways, base_default_pool, save_dir): p for p in self.pending_files.copy()} for future in futures: p = futures[future] if future.result(): if p in self.pending_files: self.pending_files.remove(p) self.downloading = False try: self.root.after(0, lambda: self.start_btn.config(state=tk.NORMAL)) self.root.after(0, lambda: self.log("所有下载任务执行完毕!")) except: pass threading.Thread(target=run_downloads, daemon=True).start() if __name__ == "__main__": root = tk.Tk() app = IPFSDownloaderApp(root) root.mainloop()