核心:通过哈希或bindiff比对确定几个或多个资本类APP是否存在侵权行为
day1~4:爬取了GitHub开源项目top1000并编译和许可证并构建库对非商用的开源项目和主流对应APP进行比对确定是否侵权。

后续优化了一下整个爬虫脚本,整个流程下来大概用了4天彻底爬完了GitHub的top1000个项目源码(未编译)
!/usr/bin/env python3
–– coding: utf-8 ––
“””
C++ 项目并发克隆脚本(服务器版)
使用镜像站加速下载 GitHub 项目
“””
import json
import os
import subprocess
import sys
import re
import time
import threading
import signal
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
==================== 配置区域 ====================
镜像站地址
MIRROR_PREFIX = “https://ghfast.top/”
服务器基础目录
BASE_DIR = Path(“/data/ly/c++_crawl”)
SAVE_DIR = Path(“/data/ly/save”)
JSON文件路径
JSON_FILE = BASE_DIR / “CPP_urls_20260120_114754.json”
并发下载数(设为1表示串行下载)
MAX_WORKERS = 1
克隆超时时间(秒)
CLONE_TIMEOUT = 1800 # 30分钟
速度限制(使用 trickle)
MAX_SPEED_MBPS = 15 # 15MB/s(通过 trickle 严格限制)
磁盘空间检查
MIN_DISK_GB = 10 # 最小剩余空间(GB)
MIN_DISK_PERCENT = 5 # 最小剩余空间百分比
进度更新间隔(秒)
PROGRESS_UPDATE_INTERVAL = 1.5
==================== 全局变量 ====================
打印锁(防止多线程输出混乱)
print_lock = threading.Lock()
统计数据(使用锁保护)
stats = {
‘total’: 0,
‘success’: 0,
‘failed’: 0,
‘skipped’: 0,
‘in_progress’: 0,
‘start_time’: None,
‘last_update_time’: None
}
stats_lock = threading.Lock()
磁盘监控标志
disk_check_enabled = True
disk_check_lock = threading.Lock()
停止标志(Ctrl+C处理)
stop_requested = False
==================== 工具函数 ====================
def load_json(json_file):
“””加载JSON文件”””
with open(json_file, ‘r’, encoding=’utf-8′) as f:
return json.load(f)
def get_mirror_url(clone_url):
“””获取镜像地址”””
return f”{MIRROR_PREFIX}{clone_url}”
def check_disk_space():
“””检查磁盘剩余空间”””
try:
result = subprocess.run(
[‘df’, ‘-BG’, str(SAVE_DIR)],
capture_output=True,
text=True,
timeout=5
)
if result.returncode != 0:
print(f"[警告] 无法获取磁盘信息: {result.stderr}")
return True, 0, 0
# 解析 df 输出
# 示例: Filesystem 1M-blocks Used Available Use% Mounted on
lines = result.stdout.strip().split('\n')
if len(lines) < 2:
return True, 0, 0
parts = lines[1].split()
if len(parts) < 5:
return True, 0, 0
# 获取可用空间(GB)和使用百分比
available_gb = int(parts[3].replace('G', '').replace('M', '')) if 'G' in parts[3] or 'M' in parts[3] else 0
used_percent = int(parts[4].replace('%', ''))
available_percent = 100 - used_percent
# 检查是否满足条件
enough_space = (available_gb >= MIN_DISK_GB and available_percent >= MIN_DISK_PERCENT)
return enough_space, available_gb, available_percent
except Exception as e:
print(f"[警告] 磁盘检查异常: {e}")
return True, 0, 0
def format_size(bytes_size):
“””格式化字节大小”””
for unit in [‘B’, ‘KB’, ‘MB’, ‘GB’]:
if bytes_size < 1024.0:
return f”{bytes_size:.2f} {unit}”
bytes_size /= 1024.0
return f”{bytes_size:.2f} TB”
def format_duration(seconds):
“””格式化时间 duration”””
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours}h {minutes}m {secs}s"
elif minutes > 0:
return f"{minutes}m {secs}s"
else:
return f"{secs}s"
def print_progress_bar(current, total, prefix=””, suffix=””, length=50):
“””打印进度条”””
percent = current / total if total > 0 else 0
filled_length = int(length * percent)
bar = ‘█’ * filled_length + ‘░’ * (length – filled_length)
print(f'\r{prefix} [{bar}] {percent*100:.1f}% ({current}/{total}){suffix}', end='', flush=True)
def print_statistics():
“””打印统计信息”””
with stats_lock:
total = stats[‘total’]
success = stats[‘success’]
failed = stats[‘failed’]
skipped = stats[‘skipped’]
in_progress = stats[‘in_progress’]
if stats['start_time']:
elapsed = (datetime.now() - stats['start_time']).total_seconds()
completed = success + failed + skipped
if completed > 0:
avg_time = elapsed / completed
eta = avg_time * (total - completed)
else:
eta = 0
else:
elapsed = 0
eta = 0
print(f"\n{'='*80}")
print(f"📊 下载统计")
print(f"{'='*80}")
print(f"总数: {total} | ✅ 成功: {success} | ❌ 失败: {failed} | ⏭️ 跳过: {skipped} | 🔄 进行中: {in_progress}")
print(f"⏱️ 已运行: {format_duration(elapsed)} | ⏳ 预计剩余: {format_duration(eta)}")
print(f"{'='*80}")
==================== 克隆函数 ====================
def git_clone_project(name, clone_url, save_dir):
“””克隆单个项目”””
global stop_requested
# 项目文件夹名称(将斜杠替换为下划线)
project_name = name.replace("/", "_")
project_path = save_dir / project_name
# 检查是否已存在
if project_path.exists():
with print_lock:
print(f"[⏭️ 跳过] {name} 已存在")
return True, "skipped"
# 获取镜像URL
mirror_url = get_mirror_url(clone_url)
with print_lock:
print(f"\n[🔄 开始克隆] {name}")
print(f" 原地址: {clone_url}")
print(f" 镜像: {mirror_url}")
try:
# 执行 git clone,使用 trickle 限制速度为 15MB/s
# trickle -d: 下行速度限制(KB/s), -u: 上行速度限制(KB/s)
# 15MB/s = 15 * 1024 KB/s = 15360 KB/s
process = subprocess.Popen(
["trickle", "-d", "15360", "-u", "15360", "git", "clone", "--progress", mirror_url, str(project_path)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8',
errors='ignore',
text=True,
bufsize=1
)
# 实时读取输出
last_speed = ""
last_progress = ""
start_time = time.time()
last_output_time = start_time
while True:
# 检查停止请求
if stop_requested:
process.terminate()
with print_lock:
print(f"\n[⚠️ 中止] {name} - 用户请求停止")
return False, "stopped"
# 检查超时
elapsed = time.time() - start_time
if elapsed > CLONE_TIMEOUT:
process.terminate()
with print_lock:
print(f"\n[⏰ 超时] {name} - 超过 {CLONE_TIMEOUT} 秒")
return False, "timeout"
# 检查磁盘空间(每10秒检查一次)
if int(time.time() - start_time) % 10 == 0:
with disk_check_lock:
if disk_check_enabled:
enough, avail_gb, avail_percent = check_disk_space()
if not enough:
process.terminate()
with print_lock:
print(f"\n[💾 磁盘不足] {name} - 剩余 {avail_gb}GB ({avail_percent}%)")
return False, "disk_full"
# 读取输出(非阻塞)
try:
line = process.stdout.readline()
if not line:
# 检查进程是否结束
if process.poll() is not None:
break
time.sleep(0.1)
continue
line = line.rstrip()
# 解析进度信息
progress_match = re.search(r'Receiving objects:\s+(\d+)%', line)
if progress_match:
percent = progress_match.group(1)
# 查找速度信息
speed_match = re.search(r'([\d.]+\s*[KMGT]?iB/s)', line)
speed = speed_match.group(1) if speed_match else "0 B/s"
# 查找已接收/总量
size_match = re.search(r'\((\d+)/(\d+)\)', line)
if size_match:
received = size_match.group(1)
total_size = size_match.group(2)
with print_lock:
progress_str = f"\r 📥 [{name}] {percent}% | {received}/{total_size} | ⚡ {speed}"
print(progress_str, end='', flush=True)
last_progress = progress_str
last_output_time = time.time()
# 显示其他重要信息
if any(key in line for key in ['remote:', 'fatal:', 'error:']):
if 'Receiving objects' not in line:
with print_lock:
print(f"\n 📡 [{name}] {line}")
except Exception as e:
with print_lock:
print(f"\n[❌ 异常] {name} - 读取输出错误: {e}")
break
# 等待进程完成
returncode = process.wait()
# 换行
if last_progress:
with print_lock:
print()
if returncode == 0:
with print_lock:
duration = time.time() - start_time
print(f"[✅ 成功] {name} - 耗时 {format_duration(duration)}")
return True, "success"
else:
with print_lock:
print(f"[❌ 失败] {name} - 返回码: {returncode}")
return False, "failed"
except subprocess.TimeoutExpired:
with print_lock:
print(f"\n[⏰ 超时] {name}")
return False, "timeout"
except Exception as e:
with print_lock:
print(f"\n[❌ 异常] {name} - {e}")
return False, "error"
def clone_wrapper(index, project, total):
“””包装函数,用于线程池执行”””
global stop_requested
# 更新统计
with stats_lock:
stats['in_progress'] += 1
name = project.get("name")
clone_url = project.get("clone_url")
if not name or not clone_url:
with print_lock:
print(f"[⏭️ 跳过] 第{index}个项目数据不完整")
with stats_lock:
stats['skipped'] += 1
stats['in_progress'] -= 1
return False, "invalid"
# 执行克隆
result, status = git_clone_project(name, clone_url, SAVE_DIR)
# 更新统计
with stats_lock:
stats['in_progress'] -= 1
if result:
stats['success'] += 1
else:
stats['failed'] += 1
return result, status
==================== 监控线程 ====================
def disk_monitor_thread():
“””磁盘监控线程”””
global stop_requested, disk_check_enabled
while not stop_requested:
try:
time.sleep(30) # 每30秒检查一次
with disk_check_lock:
if not disk_check_enabled:
continue
enough, avail_gb, avail_percent = check_disk_space()
with print_lock:
if enough:
print(f"\n[💾 磁盘检查] 剩余 {avail_gb}GB ({avail_percent}%) - ✅ 正常")
else:
print(f"\n[💾 磁盘不足] 剩余 {avail_gb}GB ({avail_percent}%) - ❌ 停止下载!")
print("[💾 磁盘不足] 触发阈值: < {MIN_DISK_GB}GB 或 < {MIN_DISK_PERCENT}%")
stop_requested = True
print("[💾 磁盘不足] 正在停止所有下载任务...")
except Exception as e:
with print_lock:
print(f"\n[⚠️ 磁盘监控异常] {e}")
def progress_monitor_thread():
“””进度监控线程”””
global stop_requested
last_print_time = time.time()
while not stop_requested:
try:
time.sleep(PROGRESS_UPDATE_INTERVAL)
with stats_lock:
total = stats['total']
success = stats['success']
failed = stats['failed']
skipped = stats['skipped']
in_progress = stats['in_progress']
completed = success + failed + skipped
# 每秒打印一次进度条
current_time = time.time()
if current_time - last_print_time >= 1.0:
with print_lock:
suffix = f" | ✅{success} ❌{failed} ⏭️{skipped} 🔄{in_progress}"
print_progress_bar(completed, total, "📥 总进度", suffix, length=60)
last_print_time = current_time
except Exception as e:
with print_lock:
print(f"\n[⚠️ 进度监控异常] {e}")
==================== 主函数 ====================
def signal_handler(signum, frame):
“””信号处理函数(Ctrl+C)”””
global stop_requested
stop_requested = True
with print_lock:
print(f"\n\n[⚠️ 收到停止信号] 正在安全退出...")
print("[⚠️ 提示] 等待当前任务完成后退出(最多30秒)")
def main():
“””主函数”””
global stop_requested, disk_check_enabled
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print(f"\n{'='*80}")
print(f"🚀 C++ 项目批量克隆脚本(服务器版)")
print(f"{'='*80}")
print(f"📁 保存目录: {SAVE_DIR}")
print(f"📄 JSON文件: {JSON_FILE}")
print(f"🔗 镜像源: {MIRROR_PREFIX}")
print(f"⚡ 并发数: {MAX_WORKERS}")
print(f"🚀 速度限制: {MAX_SPEED_MBPS}MB/s(通过 trickle 限制)")
print(f"⏱️ 超时时间: {CLONE_TIMEOUT}秒")
print(f"💾 磁盘阈值: {MIN_DISK_GB}GB 或 {MIN_DISK_PERCENT}%")
print(f"{'='*80}\n")
# 检查文件和目录
if not JSON_FILE.exists():
print(f"[❌ 错误] JSON文件不存在: {JSON_FILE}")
sys.exit(1)
# 创建保存目录
SAVE_DIR.mkdir(parents=True, exist_ok=True)
# 初始磁盘检查
enough, avail_gb, avail_percent = check_disk_space()
print(f"[💾 初始磁盘] 剩余 {avail_gb}GB ({avail_percent}%)")
if not enough:
print(f"[❌ 错误] 磁盘空间不足!")
print(f"[❌ 错误] 当前: {avail_gb}GB ({avail_percent}%)")
print(f"[❌ 错误] 要求: >={MIN_DISK_GB}GB 且 >={MIN_DISK_PERCENT}%")
sys.exit(1)
print()
# 加载JSON
try:
projects = load_json(JSON_FILE)
total = len(projects)
print(f"[📋 加载] 共 {total} 个项目")
print(f"[📋 下载策略] 正序下载(从第1个到第{total}个)\n")
except Exception as e:
print(f"[❌ 错误] 加载JSON失败: {e}")
sys.exit(1)
# 初始化统计
with stats_lock:
stats['total'] = total
stats['start_time'] = datetime.now()
# 启动监控线程
disk_thread = threading.Thread(target=disk_monitor_thread, daemon=True)
progress_thread = threading.Thread(target=progress_monitor_thread, daemon=True)
disk_thread.start()
progress_thread.start()
# 串行克隆(一个接一个下载)
try:
for i, project in enumerate(projects, 1):
# 检查停止请求
if stop_requested:
print("\n[⚠️ 停止请求] 停止下载...")
break
try:
clone_wrapper(i, project, total)
except Exception as e:
with print_lock:
print(f"\n[❌ 异常] 第{i}个任务出错: {e}")
with stats_lock:
stats['failed'] += 1
except KeyboardInterrupt:
stop_requested = True
print("\n[⚠️ 中断] 用户中断")
# 等待监控线程结束
time.sleep(1)
# 打印最终统计
print() # 换行
print_statistics()
# 最终磁盘状态
enough, avail_gb, avail_percent = check_disk_space()
print(f"\n[💾 最终磁盘] 剩余 {avail_gb}GB ({avail_percent}%)")
print(f"\n{'='*80}")
if stop_requested:
print("⚠️ 脚本已中止(部分项目可能未完成)")
else:
print("✅ 所有任务已完成")
print(f"{'='*80}\n")
if name == “main“:
main()
day5:路线商议后改为爬取小米应用商店类目下top100应用(我为影视类目)
寻找爬取小米top100思路
手机下载然后pull拉取
后面用jadxcli批量处理解包即可!
评论(0)
暂无评论