首先我们要了解一点基础知识

线程与进程

进程(Process)

进程是操作系统中资源分配的最小单位,简单来说,一个进程就是一个正在运行的程序的实例,操作系统会为每个进程分配独立的内存空间、文件句柄、CPU 时间、网络端口等资源。不同的进程之间相互独立,一个进程崩了不会影响另一个进程

在 Python 中,一个multiprocessing.Process() 启动的任务就是一个新的子进程,它拥有独立的 Python 解释器和内存空间

线程(Thread)

线程是进程中的执行单元,是操作系统中能进行调度的最小单位

一个进程可以有多个线程,多个线程共享同一个进程的内存和资源,每个线程只负责执行一部分任务。

在 Python 里,threading.Thread()ThreadPoolExecutor() 启动的就是线程

Python 中的区别和限制

Python 中有一个叫 GIL(全局解释器锁) 的机制,这个机制让同一时刻只能有一个线程在执行 Python 字节码,所以对于 CPU 密集型任务,多线程的效果并不大,但是对于 I/O 密集型任务(网络请求、文件操作等),多线程仍能提高效率。

因此如果是网络扫描、爬虫就用多线程,而如果是数据计算、加密、视频渲染则用多进程

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import requests
import concurrent.futures
import queue
import random

# 获取 URL
url = input("请输入 URL:").strip()

headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
}

print("Ciallo~ (∠・ω< )⌒★")

# 代理池(可以自己添加更多代理)
proxy_pool = [
"http://123.456.789.1:8080",
"http://234.567.890.2:8080",
"http://345.678.901.3:8080",
"http://456.789.012.4:8080",
]

def read_file(file1):
"""尝试以 UTF-8 读取文件,失败后尝试 GBK"""
try:
with open(file1, encoding='utf-8') as f:
return f.readlines()
except UnicodeDecodeError:
with open(file1, encoding='gbk', errors='ignore') as f:
return f.readlines()

def get_random_proxy():
"""随机获取一个代理"""
return random.choice(proxy_pool)

def scan_path(path):
"""扫描单个路径"""
full_url = url.rstrip("/") + "/" + path.strip() # 拼接完整 URL
proxy = get_random_proxy() # 随机选一个代理
proxies = {"http": proxy, "https": proxy} # 设置代理

try:
response = requests.get(full_url, headers=headers, proxies=proxies, timeout=5) # 发送请求
if response.status_code == 200:
red_text = f"\033[91m[+] {path.strip()} 存在 (代理: {proxy})\033[0m" # 红色高亮
print(red_text)
return f"[+] {path.strip()} 存在 (代理: {proxy})" # 返回结果
else:
print(f"[-] {path.strip()} 不存在 (代理: {proxy})") # 打印但不保存
except requests.exceptions.RequestException as e:
print(f"[!] 访问 {full_url} 失败 (代理: {proxy}): {e}") # 处理错误

return None # 失败返回 None

def run():
"""使用多线程执行扫描"""
paths = read_file("PHP.txt") # 读取字典文件
results = []
q = queue.Queue() # 任务队列

for path in paths:
q.put(path.strip()) # 加入队列

# 设置最大线程数
max_threads = 10 # 线程池大小(可调整)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交所有任务
future_to_path = {executor.submit(scan_path, q.get()): path for path in paths}

for future in concurrent.futures.as_completed(future_to_path):
result = future.result()
if result:
results.append(result)

return "\n".join(results)

# 执行扫描并保存结果
with open("output.txt", 'w', encoding='utf-8') as f:
result = run()
if result:
f.write(result)

这是一个用 ai 生成的简易的用字典撞目录的 python 脚本,其中包含多线程并发和代理池,撞目录的逻辑挺简单的,我们来分析一下多线程和代理池

这里的代理池我为了方便随便列了几个,但是事实上我们可以从网络接口自动抓取,比如如下代码

1
2
3
4
5
6
7
8
9
10
import requests, random

def get_proxy_list():
url = "https://www.proxy-list.download/api/v1/get?type=http"
resp = requests.get(url)
proxies = [f"http://{p.strip()}" for p in resp.text.split("\n") if p.strip()]
return proxies

proxies_list = get_proxy_list()
print(f"已获取 {len(proxies_list)} 个代理")

把这个改一改就可以直接套进上面的代码

下面是实现多线程的核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
q = queue.Queue()  # 任务队列

for path in paths:
q.put(path.strip()) # 加入队列

# 设置最大线程数
max_threads = 10 # 线程池大小(可调整)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交所有任务
future_to_path = {executor.submit(scan_path, q.get()): path for path in paths}

for future in concurrent.futures.as_completed(future_to_path):
result = future.result()
if result:
results.append(result)

return "\n".join(results)

在 python 中,ThreadPoolExecutor 是标准库 concurrent.futures 提供的一个线程池管理器

1
2
max_threads = 10
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:

这就表示创建一个最多同时运行 10 个线程的线程池,这里的 10 个线程是主进程的子线程,他们不会立即退出吗,而是等待分配任务, 当 executor.submit() 被调用时,任务会进入内部队列,线程池中的线程会从队列中取出任务执行,在所有的任务完成后,这些线程会随着线程池的关闭而关闭

而代码中

1
2
3
q = queue.Queue()
for path in paths:
q.put(path.strip())

这里就是在创建一个线程安全的线程,把所有要扫描的路径放进去

1
future_to_path = {executor.submit(scan_path, q.get()): path for path in paths}

这一行中我们做了两件事

  • 每次循环从q队列中取出一个任务
  • 将这个任务交给线程池中一个空闲的线程去执行
1
2
3
4
for future in concurrent.futures.as_completed(future_to_path):
result = future.result()
if result:
results.append(result)

而通过这个 for 循环可以监听每个任务是否完成

  • future 表示每个任务的执行状态
  • 当任务完成时,它会进入 as_completed
  • 主线程再从 future.result() 获取返回值
  • 再保存到结果列表

那么这时候一定有人会问,为什么不手动创建线程呢,就像下面的代码

1
2
3
for path in paths:
t = threading.Thread(target=scan_path, args=(path,))
t.start()

这个虽然也可以实现并发,但是有几个问题:

  • 线程太多会爆内存,然而这个写法不好控制线程数
  • 难以回收线程与收集结果

在上面代码中还有一段从代理池中随机选择代理的逻辑,每个线程在执行scan_path()时都会独立选择一个代理,互不影响

1
2
3
proxy = get_random_proxy()
proxies = {"http": proxy, "https": proxy}
response = requests.get(full_url, headers=headers, proxies=proxies, timeout=5)