-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
299 lines (266 loc) · 12.8 KB
/
run.py
File metadata and controls
299 lines (266 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import os
import hashlib
import json
import shutil
import argparse
import time
# 跳过文件类型预设(扩展名小写,带点)
SKIP_EXT_PRESETS = {
"图片": {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif", ".heic", ".heif", ".svg", ".raw", ".cr2", ".nef", ".arw"},
"视频": {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".mpeg", ".mpg", ".3gp", ".ts", ".m2ts"},
"音频": {".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a", ".ape", ".alac"},
"文档": {".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".txt", ".md", ".rtf"},
"压缩包": {".zip", ".rar", ".7z", ".tar", ".gz", ".bz2", ".xz"},
}
def get_skip_extensions(preset_names=None, extra_extensions=None):
"""根据预设名称和额外扩展名合并得到要跳过的扩展名集合(小写、带点)。"""
out = set()
if preset_names:
for name in preset_names:
name = name.strip()
if name in SKIP_EXT_PRESETS:
out |= SKIP_EXT_PRESETS[name]
if extra_extensions:
for ext in extra_extensions:
ext = ext.strip().lower()
if ext and not ext.startswith("."):
ext = "." + ext
if ext:
out.add(ext)
return out
def _should_skip_file(file_path, skip_extensions):
if not skip_extensions:
return False
_, ext = os.path.splitext(file_path)
return ext.lower() in skip_extensions
def get_fast_md5(file_path):
"""
快速哈希:读取文件头、中、尾各 1MB 数据进行计算。
适用于超大视频或素材文件,速度极快。
"""
sample_size = 1024 * 1024 # 1MB
try:
file_size = os.path.getsize(file_path)
hash_md = hashlib.md5()
with open(file_path, 'rb') as f:
if file_size <= sample_size * 3:
hash_md.update(f.read())
else:
# 头部
hash_md.update(f.read(sample_size))
# 中部
f.seek(file_size // 2)
hash_md.update(f.read(sample_size))
# 尾部
f.seek(file_size - sample_size)
hash_md.update(f.read(sample_size))
# 加上文件大小防止长度相同的不同文件冲突
hash_md.update(str(file_size).encode())
return hash_md.hexdigest()
except Exception as e:
print(f"[错误] 无法计算哈希 {file_path}: {e}")
return None
def _norm_key(path_str):
"""统一为用正斜杠的路径,便于跨平台一致。"""
return os.path.normpath(path_str).replace("\\", "/")
def scan_source(source_dir, output_json, log_fn=print, progress_every=10, verbose=False, skip_extensions=None, scan_mode="hash"):
"""扫描源盘(使用盘),生成映射表。scan_mode: 'hash'=MD5 深度扫描, 'name'=仅按相对路径/文件名简易扫描。"""
mapping = {}
if log_fn is None:
log_fn = print
if scan_mode == "name":
log_fn("[*] 扫描模式: 简易扫描(仅比对路径/文件名,不计算 MD5)")
else:
log_fn("[*] 扫描模式: 深度扫描(MD5)")
if skip_extensions:
log_fn(f"[*] 跳过扩展名: {', '.join(sorted(skip_extensions))}")
log_fn(f"[*] 正在扫描源目录: {source_dir}")
start_time = time.time()
count = 0
for root, _, files in os.walk(source_dir):
for file in files:
if file.startswith('.') or file.lower() == 'thumbs.db':
continue
if _should_skip_file(file, skip_extensions):
continue
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, source_dir)
key = _norm_key(rel_path)
if scan_mode == "name":
mapping[key] = key
count += 1
if verbose:
log_fn(f" [文件] {rel_path}")
elif progress_every and count % progress_every == 0:
log_fn(f" 已处理 {count} 个文件...")
else:
f_hash = get_fast_md5(full_path)
if f_hash:
mapping[f_hash] = key
count += 1
if verbose:
log_fn(f" [文件] {rel_path}")
elif progress_every and count % progress_every == 0:
log_fn(f" 已处理 {count} 个文件...")
out_data = {"_scan_mode": scan_mode, **mapping}
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(out_data, f, ensure_ascii=False, indent=4)
end_time = time.time()
log_fn(f"[OK] 扫描完成!共 {count} 个文件,耗时 {end_time - start_time:.2f}s")
log_fn(f"[OK] 映射表已保存至: {output_json}")
def sync_target(target_dir, mapping_json, dry_run=False, log_fn=print, verbose=False, skip_extensions=None, scan_mode=None):
"""根据映射表整理目标盘(备份盘)。scan_mode 若为 None 则从映射文件 _scan_mode 读取。"""
if not os.path.exists(mapping_json):
if log_fn is None:
log_fn = print
log_fn(f"[错误] 找不到映射表文件: {mapping_json}")
return
with open(mapping_json, 'r', encoding='utf-8') as f:
data = json.load(f)
# 兼容旧版无 _scan_mode 的映射文件
if scan_mode is None:
scan_mode = data.pop("_scan_mode", "hash")
else:
data.pop("_scan_mode", None)
mapping = data
if log_fn is None:
log_fn = print
if scan_mode == "name":
log_fn("[*] 匹配模式: 按路径/文件名(简易)")
else:
log_fn("[*] 匹配模式: 按 MD5(深度)")
if skip_extensions:
log_fn(f"[*] 跳过扩展名: {', '.join(sorted(skip_extensions))}")
log_fn(f"[*] 正在索引目标目录 (此过程可能较慢): {target_dir}")
target_index = {}
for root, _, files in os.walk(target_dir):
for file in files:
if _should_skip_file(file, skip_extensions):
continue
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, target_dir)
key = _norm_key(rel_path)
if scan_mode == "name":
target_index[key] = full_path
if verbose:
log_fn(f" [索引] {key}")
else:
f_hash = get_fast_md5(full_path)
if f_hash:
target_index[f_hash] = full_path
if verbose:
log_fn(f" [索引] {key}")
log_fn(f"[*] 开始匹配与结构重组...")
if dry_run:
log_fn("注意:当前处于 [预览模式],不会实际移动文件。")
moved_count = 0
for key, rel_path in mapping.items():
if key == "_scan_mode":
continue
if key in target_index:
old_path = target_index[key]
# rel_path 可能已是正斜杠,join 前用 os.path 规范
new_path = os.path.join(target_dir, *rel_path.split("/"))
if os.path.abspath(old_path) == os.path.abspath(new_path):
continue
if not dry_run:
os.makedirs(os.path.dirname(new_path), exist_ok=True)
shutil.move(old_path, new_path)
log_fn(f"[{'预览' if dry_run else '移动'}] {os.path.basename(old_path)} -> {rel_path}")
moved_count += 1
# 清理空文件夹
if not dry_run:
log_fn("[*] 正在清理空文件夹...")
for root, dirs, _ in os.walk(target_dir, topdown=False):
for d in dirs:
dir_path = os.path.join(root, d)
if not os.listdir(dir_path):
os.rmdir(dir_path)
log_fn(f"[OK] 整理完成!共处理 {moved_count} 个文件。")
def _build_hash_index(root_dir, log_fn=print, verbose=False, progress_every=10, label="源", skip_extensions=None, scan_mode="hash"):
"""遍历目录构建 键->rel_path 映射。scan_mode='hash' 时键为 MD5,'name' 时键为规范化相对路径。"""
index = {}
count = 0
if skip_extensions:
log_fn(f"[*] 跳过扩展名: {', '.join(sorted(skip_extensions))}")
log_fn(f"[*] 正在扫描{label}目录: {root_dir}")
start = time.time()
for r, _, files in os.walk(root_dir):
for f in files:
if f.startswith(".") or f.lower() == "thumbs.db":
continue
if _should_skip_file(f, skip_extensions):
continue
full = os.path.join(r, f)
rel = os.path.relpath(full, root_dir)
key = _norm_key(rel)
if scan_mode == "name":
index[key] = key
count += 1
if verbose:
log_fn(f" [{label}] {rel}")
elif progress_every and count % progress_every == 0:
log_fn(f" {label}已处理 {count} 个文件...")
else:
h = get_fast_md5(full)
if h:
index[h] = key
count += 1
if verbose:
log_fn(f" [{label}] {rel}")
elif progress_every and count % progress_every == 0:
log_fn(f" {label}已处理 {count} 个文件...")
log_fn(f"[OK] {label}扫描完成,共 {count} 个文件,耗时 {time.time() - start:.2f}s")
return index
def diff_dirs(source_dir, target_dir, log_fn=print, verbose=False, skip_extensions=None, scan_mode="hash"):
"""
对比两目录,返回源目录相对目标「多了」「少了」的文件。
scan_mode: 'hash' 按 MD5;'name' 按相对路径/文件名。
"""
if log_fn is None:
log_fn = print
src_index = _build_hash_index(source_dir, log_fn=log_fn, verbose=verbose, label="源", skip_extensions=skip_extensions, scan_mode=scan_mode)
dst_index = _build_hash_index(target_dir, log_fn=log_fn, verbose=verbose, label="目标", skip_extensions=skip_extensions, scan_mode=scan_mode)
dst_keys = set(dst_index.keys())
src_keys = set(src_index.keys())
duo_liao = [src_index[k] for k in src_keys if k not in dst_keys]
shao_liao = [dst_index[k] for k in dst_keys if k not in src_keys]
log_fn(f"[*] 差异统计:源多了 {len(duo_liao)} 个,源少了 {len(shao_liao)} 个。")
return {"多了": duo_liao, "少了": shao_liao}
def main():
parser = argparse.ArgumentParser(description="File-Structure-Sync: 基于指纹的文件结构同步工具")
parser.add_argument("mode", choices=["scan", "sync", "diff"], help="操作模式: scan / sync / diff (找差异)")
parser.add_argument("--src", help="源目录路径 (使用盘)")
parser.add_argument("--dst", help="目标目录路径 (备份盘)")
parser.add_argument("--map", default="file_map.json", help="映射表文件名 (默认: file_map.json)")
parser.add_argument("--dry-run", action="store_true", help="预览模式:仅显示将要进行的操作,不实际移动文件")
parser.add_argument("--verbose", "-v", action="store_true", help="详细模式:逐文件输出日志")
parser.add_argument("--skip-preset", choices=list(SKIP_EXT_PRESETS.keys()), action="append", metavar="PRESET", help="按预设跳过类型,可多次指定。可选: " + ", ".join(SKIP_EXT_PRESETS.keys()))
parser.add_argument("--skip-ext", type=str, metavar="EXT", help="额外跳过的扩展名,逗号分隔,如: .jpg,.mp4")
parser.add_argument("--scan-mode", choices=["hash", "name"], default="hash", help="hash=深度扫描(MD5), name=简易扫描(仅路径/文件名)")
args = parser.parse_args()
skip_ext = get_skip_extensions(
preset_names=args.skip_preset or None,
extra_extensions=args.skip_ext.split(",") if args.skip_ext else None,
) or None
if args.mode == "scan":
if not args.src:
print("错误:扫描模式需要指定 --src 参数")
return
scan_source(args.src, args.map, verbose=args.verbose, skip_extensions=skip_ext, scan_mode=args.scan_mode)
elif args.mode == "sync":
if not args.dst:
print("错误:同步模式需要指定 --dst 参数")
return
sync_target(args.dst, args.map, dry_run=args.dry_run, verbose=args.verbose, skip_extensions=skip_ext)
elif args.mode == "diff":
if not args.src or not args.dst:
print("错误:找差异模式需要同时指定 --src 与 --dst 参数")
return
result = diff_dirs(args.src, args.dst, verbose=args.verbose, skip_extensions=skip_ext, scan_mode=args.scan_mode)
out = args.map if args.map != "file_map.json" else "diff_result.json"
with open(out, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=4)
print(f"[OK] 差异结果已保存至: {out}")
if __name__ == "__main__":
main()