boxmoe_header_banner_img

欢迎来到烨的世界~

加载中

文章导读

逆向组线下实习week1


avatar
liuye 2026年1月23日 468

核心:通过哈希或bindiff比对确定几个或多个资本类APP是否存在侵权行为

day1~4:爬取了GitHub开源项目top1000并编译和许可证并构建库对非商用的开源项目和主流对应APP进行比对确定是否侵权。

后续优化了一下整个爬虫脚本,整个流程下来大概用了4天彻底爬完了GitHub的top1000个项目源码(未编译)

!/usr/bin/env python3

– coding: utf-8 –

“””
C++ 项目并发克隆脚本(服务器版)
使用镜像站加速下载 GitHub 项目
“””

import json
import os
import subprocess
import sys
import re
import time
import threading
import signal
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta

==================== 配置区域 ====================

镜像站地址

MIRROR_PREFIX = “https://ghfast.top/”

服务器基础目录

BASE_DIR = Path(“/data/ly/c++_crawl”)
SAVE_DIR = Path(“/data/ly/save”)

JSON文件路径

JSON_FILE = BASE_DIR / “CPP_urls_20260120_114754.json”

并发下载数(设为1表示串行下载)

MAX_WORKERS = 1

克隆超时时间(秒)

CLONE_TIMEOUT = 1800 # 30分钟

速度限制(使用 trickle)

MAX_SPEED_MBPS = 15 # 15MB/s(通过 trickle 严格限制)

磁盘空间检查

MIN_DISK_GB = 10 # 最小剩余空间(GB)
MIN_DISK_PERCENT = 5 # 最小剩余空间百分比

进度更新间隔(秒)

PROGRESS_UPDATE_INTERVAL = 1.5

==================== 全局变量 ====================

打印锁(防止多线程输出混乱)

print_lock = threading.Lock()

统计数据(使用锁保护)

stats = {
‘total’: 0,
‘success’: 0,
‘failed’: 0,
‘skipped’: 0,
‘in_progress’: 0,
‘start_time’: None,
‘last_update_time’: None
}
stats_lock = threading.Lock()

磁盘监控标志

disk_check_enabled = True
disk_check_lock = threading.Lock()

停止标志(Ctrl+C处理)

stop_requested = False

==================== 工具函数 ====================

def load_json(json_file):
“””加载JSON文件”””
with open(json_file, ‘r’, encoding=’utf-8′) as f:
return json.load(f)

def get_mirror_url(clone_url):
“””获取镜像地址”””
return f”{MIRROR_PREFIX}{clone_url}”

def check_disk_space():
“””检查磁盘剩余空间”””
try:
result = subprocess.run(
[‘df’, ‘-BG’, str(SAVE_DIR)],
capture_output=True,
text=True,
timeout=5
)

    if result.returncode != 0:
        print(f"[警告] 无法获取磁盘信息: {result.stderr}")
        return True, 0, 0

    # 解析 df 输出
    # 示例: Filesystem 1M-blocks Used Available Use% Mounted on
    lines = result.stdout.strip().split('\n')
    if len(lines) < 2:
        return True, 0, 0

    parts = lines[1].split()
    if len(parts) < 5:
        return True, 0, 0

    # 获取可用空间(GB)和使用百分比
    available_gb = int(parts[3].replace('G', '').replace('M', '')) if 'G' in parts[3] or 'M' in parts[3] else 0
    used_percent = int(parts[4].replace('%', ''))

    available_percent = 100 - used_percent

    # 检查是否满足条件
    enough_space = (available_gb >= MIN_DISK_GB and available_percent >= MIN_DISK_PERCENT)

    return enough_space, available_gb, available_percent

except Exception as e:
    print(f"[警告] 磁盘检查异常: {e}")
    return True, 0, 0

def format_size(bytes_size):
“””格式化字节大小”””
for unit in [‘B’, ‘KB’, ‘MB’, ‘GB’]:
if bytes_size < 1024.0:
return f”{bytes_size:.2f} {unit}”
bytes_size /= 1024.0
return f”{bytes_size:.2f} TB”

def format_duration(seconds):
“””格式化时间 duration”””
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)

if hours > 0:
    return f"{hours}h {minutes}m {secs}s"
elif minutes > 0:
    return f"{minutes}m {secs}s"
else:
    return f"{secs}s"

def print_progress_bar(current, total, prefix=””, suffix=””, length=50):
“””打印进度条”””
percent = current / total if total > 0 else 0
filled_length = int(length * percent)
bar = ‘█’ * filled_length + ‘░’ * (length – filled_length)

print(f'\r{prefix} [{bar}] {percent*100:.1f}% ({current}/{total}){suffix}', end='', flush=True)

def print_statistics():
“””打印统计信息”””
with stats_lock:
total = stats[‘total’]
success = stats[‘success’]
failed = stats[‘failed’]
skipped = stats[‘skipped’]
in_progress = stats[‘in_progress’]

    if stats['start_time']:
        elapsed = (datetime.now() - stats['start_time']).total_seconds()
        completed = success + failed + skipped
        if completed > 0:
            avg_time = elapsed / completed
            eta = avg_time * (total - completed)
        else:
            eta = 0
    else:
        elapsed = 0
        eta = 0

print(f"\n{'='*80}")
print(f"📊 下载统计")
print(f"{'='*80}")
print(f"总数: {total} | ✅ 成功: {success} | ❌ 失败: {failed} | ⏭️  跳过: {skipped} | 🔄 进行中: {in_progress}")
print(f"⏱️  已运行: {format_duration(elapsed)} | ⏳ 预计剩余: {format_duration(eta)}")
print(f"{'='*80}")

==================== 克隆函数 ====================

def git_clone_project(name, clone_url, save_dir):
“””克隆单个项目”””
global stop_requested

# 项目文件夹名称(将斜杠替换为下划线)
project_name = name.replace("/", "_")
project_path = save_dir / project_name

# 检查是否已存在
if project_path.exists():
    with print_lock:
        print(f"[⏭️  跳过] {name} 已存在")
    return True, "skipped"

# 获取镜像URL
mirror_url = get_mirror_url(clone_url)

with print_lock:
    print(f"\n[🔄 开始克隆] {name}")
    print(f"  原地址: {clone_url}")
    print(f"  镜像: {mirror_url}")

try:
    # 执行 git clone,使用 trickle 限制速度为 15MB/s
    # trickle -d: 下行速度限制(KB/s), -u: 上行速度限制(KB/s)
    # 15MB/s = 15 * 1024 KB/s = 15360 KB/s
    process = subprocess.Popen(
        ["trickle", "-d", "15360", "-u", "15360", "git", "clone", "--progress", mirror_url, str(project_path)],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        encoding='utf-8',
        errors='ignore',
        text=True,
        bufsize=1
    )

    # 实时读取输出
    last_speed = ""
    last_progress = ""
    start_time = time.time()
    last_output_time = start_time

    while True:
        # 检查停止请求
        if stop_requested:
            process.terminate()
            with print_lock:
                print(f"\n[⚠️  中止] {name} - 用户请求停止")
            return False, "stopped"

        # 检查超时
        elapsed = time.time() - start_time
        if elapsed > CLONE_TIMEOUT:
            process.terminate()
            with print_lock:
                print(f"\n[⏰ 超时] {name} - 超过 {CLONE_TIMEOUT} 秒")
            return False, "timeout"

        # 检查磁盘空间(每10秒检查一次)
        if int(time.time() - start_time) % 10 == 0:
            with disk_check_lock:
                if disk_check_enabled:
                    enough, avail_gb, avail_percent = check_disk_space()
                    if not enough:
                        process.terminate()
                        with print_lock:
                            print(f"\n[💾 磁盘不足] {name} - 剩余 {avail_gb}GB ({avail_percent}%)")
                        return False, "disk_full"

        # 读取输出(非阻塞)
        try:
            line = process.stdout.readline()
            if not line:
                # 检查进程是否结束
                if process.poll() is not None:
                    break
                time.sleep(0.1)
                continue

            line = line.rstrip()

            # 解析进度信息
            progress_match = re.search(r'Receiving objects:\s+(\d+)%', line)
            if progress_match:
                percent = progress_match.group(1)

                # 查找速度信息
                speed_match = re.search(r'([\d.]+\s*[KMGT]?iB/s)', line)
                speed = speed_match.group(1) if speed_match else "0 B/s"

                # 查找已接收/总量
                size_match = re.search(r'\((\d+)/(\d+)\)', line)
                if size_match:
                    received = size_match.group(1)
                    total_size = size_match.group(2)
                    with print_lock:
                        progress_str = f"\r  📥 [{name}] {percent}% | {received}/{total_size} | ⚡ {speed}"
                        print(progress_str, end='', flush=True)
                    last_progress = progress_str
                    last_output_time = time.time()

            # 显示其他重要信息
            if any(key in line for key in ['remote:', 'fatal:', 'error:']):
                if 'Receiving objects' not in line:
                    with print_lock:
                        print(f"\n  📡 [{name}] {line}")

        except Exception as e:
            with print_lock:
                print(f"\n[❌ 异常] {name} - 读取输出错误: {e}")
            break

    # 等待进程完成
    returncode = process.wait()

    # 换行
    if last_progress:
        with print_lock:
            print()

    if returncode == 0:
        with print_lock:
            duration = time.time() - start_time
            print(f"[✅ 成功] {name} - 耗时 {format_duration(duration)}")
        return True, "success"
    else:
        with print_lock:
            print(f"[❌ 失败] {name} - 返回码: {returncode}")
        return False, "failed"

except subprocess.TimeoutExpired:
    with print_lock:
        print(f"\n[⏰ 超时] {name}")
    return False, "timeout"
except Exception as e:
    with print_lock:
        print(f"\n[❌ 异常] {name} - {e}")
    return False, "error"

def clone_wrapper(index, project, total):
“””包装函数,用于线程池执行”””
global stop_requested

# 更新统计
with stats_lock:
    stats['in_progress'] += 1

name = project.get("name")
clone_url = project.get("clone_url")

if not name or not clone_url:
    with print_lock:
        print(f"[⏭️  跳过] 第{index}个项目数据不完整")
    with stats_lock:
        stats['skipped'] += 1
        stats['in_progress'] -= 1
    return False, "invalid"

# 执行克隆
result, status = git_clone_project(name, clone_url, SAVE_DIR)

# 更新统计
with stats_lock:
    stats['in_progress'] -= 1
    if result:
        stats['success'] += 1
    else:
        stats['failed'] += 1

return result, status

==================== 监控线程 ====================

def disk_monitor_thread():
“””磁盘监控线程”””
global stop_requested, disk_check_enabled

while not stop_requested:
    try:
        time.sleep(30)  # 每30秒检查一次

        with disk_check_lock:
            if not disk_check_enabled:
                continue

            enough, avail_gb, avail_percent = check_disk_space()

            with print_lock:
                if enough:
                    print(f"\n[💾 磁盘检查] 剩余 {avail_gb}GB ({avail_percent}%) - ✅ 正常")
                else:
                    print(f"\n[💾 磁盘不足] 剩余 {avail_gb}GB ({avail_percent}%) - ❌ 停止下载!")
                    print("[💾 磁盘不足] 触发阈值: < {MIN_DISK_GB}GB 或 < {MIN_DISK_PERCENT}%")
                    stop_requested = True
                    print("[💾 磁盘不足] 正在停止所有下载任务...")

    except Exception as e:
        with print_lock:
            print(f"\n[⚠️  磁盘监控异常] {e}")

def progress_monitor_thread():
“””进度监控线程”””
global stop_requested

last_print_time = time.time()

while not stop_requested:
    try:
        time.sleep(PROGRESS_UPDATE_INTERVAL)

        with stats_lock:
            total = stats['total']
            success = stats['success']
            failed = stats['failed']
            skipped = stats['skipped']
            in_progress = stats['in_progress']
            completed = success + failed + skipped

        # 每秒打印一次进度条
        current_time = time.time()
        if current_time - last_print_time >= 1.0:
            with print_lock:
                suffix = f" | ✅{success} ❌{failed} ⏭️{skipped} 🔄{in_progress}"
                print_progress_bar(completed, total, "📥 总进度", suffix, length=60)
            last_print_time = current_time

    except Exception as e:
        with print_lock:
            print(f"\n[⚠️  进度监控异常] {e}")

==================== 主函数 ====================

def signal_handler(signum, frame):
“””信号处理函数(Ctrl+C)”””
global stop_requested
stop_requested = True

with print_lock:
    print(f"\n\n[⚠️  收到停止信号] 正在安全退出...")
    print("[⚠️  提示] 等待当前任务完成后退出(最多30秒)")

def main():
“””主函数”””
global stop_requested, disk_check_enabled

# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

print(f"\n{'='*80}")
print(f"🚀 C++ 项目批量克隆脚本(服务器版)")
print(f"{'='*80}")
print(f"📁 保存目录: {SAVE_DIR}")
print(f"📄 JSON文件: {JSON_FILE}")
print(f"🔗 镜像源: {MIRROR_PREFIX}")
print(f"⚡ 并发数: {MAX_WORKERS}")
print(f"🚀 速度限制: {MAX_SPEED_MBPS}MB/s(通过 trickle 限制)")
print(f"⏱️  超时时间: {CLONE_TIMEOUT}秒")
print(f"💾 磁盘阈值: {MIN_DISK_GB}GB 或 {MIN_DISK_PERCENT}%")
print(f"{'='*80}\n")

# 检查文件和目录
if not JSON_FILE.exists():
    print(f"[❌ 错误] JSON文件不存在: {JSON_FILE}")
    sys.exit(1)

# 创建保存目录
SAVE_DIR.mkdir(parents=True, exist_ok=True)

# 初始磁盘检查
enough, avail_gb, avail_percent = check_disk_space()
print(f"[💾 初始磁盘] 剩余 {avail_gb}GB ({avail_percent}%)")

if not enough:
    print(f"[❌ 错误] 磁盘空间不足!")
    print(f"[❌ 错误] 当前: {avail_gb}GB ({avail_percent}%)")
    print(f"[❌ 错误] 要求: >={MIN_DISK_GB}GB 且 >={MIN_DISK_PERCENT}%")
    sys.exit(1)

print()

# 加载JSON
try:
    projects = load_json(JSON_FILE)
    total = len(projects)
    print(f"[📋 加载] 共 {total} 个项目")
    print(f"[📋 下载策略] 正序下载(从第1个到第{total}个)\n")
except Exception as e:
    print(f"[❌ 错误] 加载JSON失败: {e}")
    sys.exit(1)

# 初始化统计
with stats_lock:
    stats['total'] = total
    stats['start_time'] = datetime.now()

# 启动监控线程
disk_thread = threading.Thread(target=disk_monitor_thread, daemon=True)
progress_thread = threading.Thread(target=progress_monitor_thread, daemon=True)

disk_thread.start()
progress_thread.start()

# 串行克隆(一个接一个下载)
try:
    for i, project in enumerate(projects, 1):
        # 检查停止请求
        if stop_requested:
            print("\n[⚠️  停止请求] 停止下载...")
            break

        try:
            clone_wrapper(i, project, total)
        except Exception as e:
            with print_lock:
                print(f"\n[❌ 异常] 第{i}个任务出错: {e}")
            with stats_lock:
                stats['failed'] += 1

except KeyboardInterrupt:
    stop_requested = True
    print("\n[⚠️  中断] 用户中断")

# 等待监控线程结束
time.sleep(1)

# 打印最终统计
print()  # 换行
print_statistics()

# 最终磁盘状态
enough, avail_gb, avail_percent = check_disk_space()
print(f"\n[💾 最终磁盘] 剩余 {avail_gb}GB ({avail_percent}%)")

print(f"\n{'='*80}")
if stop_requested:
    print("⚠️  脚本已中止(部分项目可能未完成)")
else:
    print("✅ 所有任务已完成")
print(f"{'='*80}\n")

if name == “main“:
main()

day5:路线商议后改为爬取小米应用商店类目下top100应用(我为影视类目)

寻找爬取小米top100思路

手机下载然后pull拉取

后面用jadxcli批量处理解包即可!



评论(0)

查看评论列表

暂无评论


发表评论