强网杯 2025 WriteUp + 复现ing
虽然逆向大失败了但是真打爽了,决定全复现
Reverse
butterfly
程序对输入文件按每 8 字节块进行如下变换(加密流程):
t = B XOR K(按位异或,B 为 8 字节明文块,:key为"MMXEncode2024")u = swap16(t)(在每个 16-bit lane 内交换字节:[b0 b1 b2 b3 ...] -> [b1 b0 b3 b2 ...])v = ROL64(u, 1)(对整个 64-bit 做循环左移 1 位)out = bytewise_add(v, K)(对每个字节做(v_byte + K_byte) & 0xFF)
写回并对整个文件的每个 8 字节块重复此操作。程序还在内存中于 buf[file_size] 写入一个 16-bit 的 file_size(原始长度)。
因此解密(逆操作)为:
tmp = bytewise_sub(C, K)rot = ROR64(tmp, 1)swapped = swap16(rot)B = swapped XOR K1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101int __fastcall main(int argc, const char **argv, const char **envp)
{
const char *input_filename_1; // r14
const char *output_filename; // r13
__int64 input_filename_2; // rax
int *input_filename; // r12
__int64 file_size; // rbx
unsigned __int64 input_file; // rax
__m64 *output_file; // rbp
__int64 file_size_1; // r15
__m64 *input_file_2; // rax
__m64 file_handle; // mm0
__m64 ___file_handle___16_bit_lanes____________; // mm2
_OWORD key_file[2]; // [rsp+0h] [rbp-158h] BYREF
unsigned int format_arg3[2]; // [rsp+20h] [rbp-138h] BYREF
if ( argc != 3 )
{
printf(2, (__int64)"Usage: %s <input_file> <output_file>\n", *argv);
printf(2, (__int64)"Example: %s plaintext.txt encoded.dat\n", *argv);
return 1;
}
input_filename_1 = argv[1];
output_filename = argv[2];
input_filename_2 = fopen(input_filename_1, "rb");
input_filename = (int *)input_filename_2;
if ( !input_filename_2 )
{
printf(2, (__int64)"Error: Cannot open file %s\n", input_filename_1);
return 1;
}
fseek(input_filename_2, 0LL, 2LL);
file_size = ftell(input_filename);
fseek(input_filename, 0LL, 0LL);
input_file = malloc(file_size + 8);
output_file = (__m64 *)input_file;
if ( !input_file )
{
fclose(input_filename);
printf_0("Error: Memory allocation failed");
return 1;
}
file_size_1 = fread(input_file, file_size + 8, 1LL, file_size, input_filename);//
// // fread(buf,1,file_size,f)
fclose(input_filename);
if ( file_size != file_size_1 )
{
free((__int64)output_file);
printf_0("Error: File read failed");
return 1;
}
*(__int16 *)((char *)output_file->m64_i16 + file_size) = file_size;//
// 把 file_size 的低 16 位写入 buf[file_size](写两个字节)
key_file[0] = _mm_loadu_si128((const __m128i *)"MMXEncode2024");
key_file[1] = _mm_loadu_si128((const __m128i *)"coding file: %s\n");
printf(2, (__int64)"Encoding file: %s\n", input_filename_1);
printf(2, (__int64)"Original size: %zu bytes\n", file_size);
*(_QWORD *)format_arg3 = *(_QWORD *)&key_file[0];// 前 8 字节
if ( (int)file_size > 7 )
{
input_file_2 = output_file;
do
{
file_handle = _m_pxor((__m64)input_file_2->m64_u64, *(__m64 *)format_arg3);//
// _m_pxor(a,b):按位 XOR(64-bit)
___file_handle___16_bit_lanes____________ = _m_por(_m_psrlwi(file_handle, 8u), _m_psllwi(file_handle, 8u));//
// 把 file_handle 在 16-bit lanes 内做“字节交换”后的结果
// (等价于 swap bytes in each 16-bit word)
input_file_2->m64_u64 = (unsigned __int64)_m_paddb(
_m_por(
_m_psllqi(___file_handle___16_bit_lanes____________, 1u),
_m_psrlqi(___file_handle___16_bit_lanes____________, 0x3Fu)),
*(__m64 *)format_arg3);//
// 对整个 64-bit quadword 做左移 1 (logical)
// 和右移 63 (logical),然后 OR
// 内部等价对 64-bit 值做循环左移 1 bit
//
// 按字节加
// , 即对每个字节与 key 的对应字节做 (value_byte + key_byte) & 0xFF
if ( &output_file[(unsigned int)(file_size - 1) >> 3] == input_file_2 )
break;
++input_file_2;
}
while ( &output_file[((unsigned int)(file_size - 8) >> 3) + 1] != input_file_2 );// 覆盖原数据
}
_m_empty();
if ( (unsigned int)maybe_fwrite(output_filename, output_file, file_size) )
{
printf(2, (__int64)"Successfully encoded to: %s\n", output_filename);
printf(2, (__int64)"Encoded size: %zu bytes\n", file_size);
may_sprintf((__int64)format_arg3, 0x100uLL, 2, 0x100uLL, (__int64)"%s.key", output_filename);
if ( (unsigned int)maybe_fwrite((const char *)format_arg3, (__m64 *)key_file, 32LL) )
printf(2, (__int64)"Key saved to: %s\n", (const char *)format_arg3);
}
free((__int64)output_file);
return 0;
}exp
因为最后的几位不对.9.是不确定的,就用flag{butter_fly_mmx_encode_77781}当输入试了一下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201from pathlib import Path
import struct
import hashlib
ENC_PATH = Path(r"D:\LeStoreDownload\webpage\CTF\2025 1018-1019 强网杯\butterfly\encode.dat")
KEY_PATH = Path(r"D:\LeStoreDownload\webpage\CTF\2025 1018-1019 强网杯\butterfly\encode.dat.key")
OUT_PATH = Path(r"D:\LeStoreDownload\webpage\CTF\2025 1018-1019 强网杯\butterfly\decrypted_output.bin")
def ror64(x: int, n: int) -> int:
return ((x >> n) | ((x << (64 - n)) & ((1 << 64) - 1))) & ((1 << 64) - 1)
def swap16_bytes(b: bytes) -> bytes:
out = bytearray(len(b))
for i in range(0, len(b), 2):
if i+1 < len(b):
out[i] = b[i+1]
out[i+1] = b[i]
else:
out[i] = b[i]
return bytes(out)
def xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes([x ^ y for x, y in zip(a, b)])
def sub_bytes(a: bytes, b: bytes) -> bytes:
return bytes([(x - y) & 0xFF for x, y in zip(a, b)])
def decrypt_block(cipher_block: bytes, key8: bytes) -> bytes:
if len(cipher_block) != 8:
raise ValueError("decrypt_block requires 8-byte block")
tmp = sub_bytes(cipher_block, key8)
val = int.from_bytes(tmp, 'little')
val = ror64(val, 1)
rot_bytes = val.to_bytes(8, 'little')
swapped = swap16_bytes(rot_bytes)
orig = xor_bytes(swapped, key8)
return orig
def derive_key8_from_str(key_str: str, mode: str) -> bytes:
kb = key_str.encode('utf-8')
mode = mode.lower()
if mode == 'raw8':
return (kb + b'\x00' * 8)[:8]
if mode == 'xorfold':
acc = [0] * 8
for i, v in enumerate(kb):
acc[i % 8] ^= v
return bytes(acc)
if mode == 'sumfold':
acc = [0] * 8
for i, v in enumerate(kb):
acc[i % 8] = (acc[i % 8] + v) & 0xFF
return bytes(acc)
if mode == 'md5':
return hashlib.md5(kb).digest()[:8]
raise ValueError('unsupported key mode: ' + mode)
COMMON_MAGIC_PREFIXES = (
b'PK\x03\x04',
b'\x89PNG\r\n\x1a\n',
b'GIF87a', b'GIF89a',
b'MZ',
b'ELF',
b'7z\xBC\xAF\x27\x1C',
b'BM',
b'\xFF\xD8\xFF',
b'ID3',
b'%PDF-',
)
def looks_plausible(data: bytes) -> tuple:
head = data[:8]
for mg in COMMON_MAGIC_PREFIXES:
if head.startswith(mg):
return (0, 0)
sample = data[:64]
if not sample:
return (2, 1.0)
printable = sum(1 for c in sample if 32 <= c <= 126 or c in (9, 10, 13))
ratio = 1.0 - (printable / len(sample))
return (1, ratio)
# Load encode.dat
if not ENC_PATH.exists():
raise FileNotFoundError(f"{ENC_PATH} not found. Make sure the file was uploaded.")
enc_data = ENC_PATH.read_bytes()
key_text = "MMXEncode2024"
footer_len = None
orig_size = None
if len(enc_data) >= 8:
tail8 = int.from_bytes(enc_data[-8:], 'little')
max_cipher = len(enc_data) - 8
if 0 < tail8 <= max_cipher:
footer_len = 8
orig_size = tail8
if footer_len is None and len(enc_data) >= 4:
tail4 = int.from_bytes(enc_data[-4:], 'little')
max_cipher = len(enc_data) - 4
if 0 < tail4 <= max_cipher:
footer_len = 4
orig_size = tail4
if footer_len is None:
# Fallback: no valid footer detected; treat whole file as cipher and use its length
print("[!] 未检测到有效的尾部原始大小,回退为整文件解密并不截断。")
cipher_data = enc_data
orig_size = len(enc_data)
else:
cipher_data = enc_data[:-footer_len]
print(f"[+] 检测到尾部 {footer_len} 字节原始大小: {orig_size}")
modes = ['raw8', 'xorfold', 'sumfold', 'md5']
candidates = []
for m in modes:
try:
key8 = derive_key8_from_str(key_text, m)
dec = bytearray(len(cipher_data))
pos = 0
while pos < len(cipher_data):
block = cipher_data[pos:pos+8]
pad = 8 - len(block) if len(block) < 8 else 0
if pad:
block = block.ljust(8, b'\x00')
dec_full = decrypt_block(block, key8)
take = 8 - pad if pad else 8
dec[pos:pos+take] = dec_full[:take]
pos += take
plain_try = bytes(dec[:orig_size]) if orig_size <= len(dec) else bytes(dec)
score = looks_plausible(plain_try)
candidates.append((score, m, key8, plain_try, bytes(dec)))
except Exception as e:
continue
if not candidates:
raise RuntimeError("未能使用任何派生方式解密,请检查输入或算法细节。")
candidates.sort(key=lambda x: x[0])
best = candidates[0]
_, best_mode, best_key8, plain_trunc, plain_full = best
print(f"[+] 选定 key='{key_text}' 的派生方式: {best_mode} ; 8-byte key = {best_key8}")
if orig_size is not None and orig_size <= len(plain_full):
plain_truncated = plain_full[:orig_size]
else:
plain_truncated = plain_full
final_plain = plain_truncated
if b'flag{' in plain_truncated or b'FLAG{' in plain_truncated:
# 检查是否缺少结尾的 }
if b'}' not in plain_truncated and b'}' in plain_full:
# 找到第一个 } 的位置,扩展到这个位置
end_pos = plain_full.find(b'}')
if end_pos != -1:
final_plain = plain_full[:end_pos + 1]
print(f"[+] 检测到不完整的 flag,自动扩展到完整长度: {len(final_plain)}")
elif len(plain_truncated) < len(plain_full) * 0.8:
flag_start = plain_truncated.find(b'flag{')
if flag_start == -1:
flag_start = plain_truncated.find(b'FLAG{')
if flag_start != -1:
full_flag_start = plain_full.find(b'flag{', flag_start)
if full_flag_start == -1:
full_flag_start = plain_full.find(b'FLAG{', flag_start)
if full_flag_start != -1:
flag_end = plain_full.find(b'}', full_flag_start)
if flag_end != -1:
final_plain = plain_full[:flag_end + 1]
print(f"[+] 检测到截断的 flag,扩展到完整版本: {len(final_plain)}")
OUT_PATH.write_bytes(final_plain)
print("Decrypted output written to:", OUT_PATH)
print("Saved length:", len(final_plain))
def hexdump(b: bytes, length=256):
from binascii import hexlify
sample = b[:length]
hexs = hexlify(sample).decode('ascii')
pairs = " ".join([hexs[i:i+2] for i in range(0, len(hexs), 2)])
return pairs
print("\nFirst 256 bytes (hex):")
print(hexdump(final_plain, 256))
print("\nLast 256 bytes (hex):")
print(hexdump(final_plain[-256:], 256))
text_preview = "".join([chr(c) if 32 <= c < 127 else "." for c in final_plain[:512]])
print("\nText preview (first 512 bytes):")
print(text_preview)
# flag{butter_fly_mmx_encode_77781.9:.
out.txt:
1 | 8F A3 9C B7 70 8D 8F 98 9D BF 8C 99 8C 73 E5 90 |
原本的encode.dat:
1 | 8F A3 9C B7 70 8D 8F 98 9D BF 8C 99 8C 73 E5 90 |
7D是},猜测0A先不管或者是7D0A是两位一处理的
然后随便加了1位:flag{butter_fly_mmx_encode_777812}
1 | 8F A3 9C B7 70 8D 8F 98 9D BF 8C 99 8C 73 E5 90 |
只加了一位,还是原本的输入,手动测了一下是67就出了
1 | 8F A3 9C B7 70 8D 8F 98 9D BF 8C 99 8C 73 E5 90 |
flag{butter_fly_mmx_encode_7778167}
Misc
Personal Vault
先volatility3
1 | Kernel Base 0xf8047a400000 |
1 | 3192 5316 aPersonalVault 0xc58d5492a080 6 - 1 False 2025-10-09 05:47:31.000000 UTC N/A Disabled |
- PID 3192中有:”This is my secret, touch it or you’ll die without blinking”
- PID 5408中有:”mysuperdupersecretkey”和各种恶意软件信息
然后去搜索关键词secret,顺便搜了一下flag。。没想到flag明文写里面了
1 | import struct |
The_Interrogation_Room
简单分析下源码可以知道
- 每一轮交互
do_round():- 随机生成 8 个秘密位
secrets = [True/False x8]。 - 服务允许你发送 17 个问题,每个问题是一个布尔表达式(只允许有限 token),服务会按一个预先打乱但固定长度为 17 的响应列表
responses对每个问题做应答;该列表包含 15 个 truth(返回真实值)和 2 个 lie(返回相反值)。 - 17 次回答之后会要求你 “Now reveal the true secrets (1 for true, 0 for false):” —— 你要提交 8 个 0/1。
- 随机生成 8 个秘密位
Token 白名单(由服务在
do_round()中检查)只允许:1
['==','(',')','S0'..'S7','0','1','and','or']
因此表达式必须用空格分隔 token(例如
S0 == 1),并且 不能 使用not(尽管解析器内部支持not,但白名单没有它)。解析器
interrogate()会将S0..S7替换为真正的布尔值并评估表达式;随后interrogate_prisoner()使用responses[i](truth 或 lie)对结果做翻转或不翻转,再返回给客户端。关键点(漏洞):因为每轮正好 17 次回答且恰有 2 次说谎,如果你对同一位(比如
S0 == 1)重复询问 17 次,由于其中只有 2 次是 lie,多数投票(majority)一定等于真实值(15 vs 2)。而服务器并没有限制重复问题或禁止相同问题多次出现。因此可以对每个 S0..S7 逐一重复问 17 次并取多数票来恢复真实的 8bit。
另外,连接建立时会有一个 PoW(证明工作)步骤:服务发出
sha256(XXXX+{proof[4:]}) == <digest>,要你找出 4 字节的XXXX使得哈希匹配。XXXX必须是长度 4 的 ASCII- 所以只需要每轮交互随机进行生成反复确认就行,单线程有点不稳定,用ai跑sokcet跑了个多线程
脚本如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427import socket, sys, itertools, random, re, time, select
from hashlib import sha256
from contextlib import closing
from multiprocessing import Process, Event, Queue, cpu_count
# ======== 参数(兼顾速度与稳健) ========
HOST = "8.147.135.168"
PORT = 30736
N_QUERIES = 17
ROUNDS = 25
SEED_BASE = 42
READ_TIMEOUT = 7.0 # 全局读超时(秒)
WRITE_TIMEOUT = 7.0 # 写超时(秒)
PROMPT_TIMEOUT = 2.5 # 每题等待“Ask your question”超时
PER_Q_TIMEOUT = 3.5 # 单题回答读取超时
LINE_MAXLEN = 32768
RECV_SOFT_LIMIT_LINES = 60
MAX_ERROR_BITS = 3 # 容错 3:对抗偶发误判/粘包
RETRY_PER_ROUND = 2 # 每轮最多重试 2 次
POW_WAIT_MAX = 200 # 并行 PoW 最长等待
VERBOSE = False # 设 True 输出更详细日志
# ======================================
ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
# ---------- 低层网络:select+recv 行级 IO ----------
class LineIO:
def __init__(self, sock: socket.socket):
self.sock = sock
self.buf = b""
try:
self.sock.settimeout(None) # 用 select 控制超时
except Exception:
pass
def writeln(self, s: str):
data = (s.rstrip("\r\n") + "\n").encode()
end = time.time() + WRITE_TIMEOUT
mv = memoryview(data)
sent = 0
while sent < len(data):
remain = end - time.time()
if remain <= 0:
raise TimeoutError("write timed out")
_, w, _ = select.select([], [self.sock], [], remain)
if not w:
continue
n = self.sock.send(mv[sent:])
if n <= 0:
raise OSError("socket closed while writing")
sent += n
def _recv_some(self, timeout: float):
if timeout is not None and timeout < 0:
timeout = 0
r, _, _ = select.select([self.sock], [], [], timeout)
if not r:
return b""
try:
return self.sock.recv(4096)
except BlockingIOError:
return b""
def readline(self, timeout: float = READ_TIMEOUT) -> str:
end = time.time() + (timeout if timeout is not None else 1e9)
while True:
pos = self.buf.find(b"\n")
if pos != -1:
line = self.buf[:pos+1]; self.buf = self.buf[pos+1:]
return line.decode(errors="ignore")
if time.time() >= end:
return ""
chunk = self._recv_some(end - time.time())
if chunk == b"":
if self.buf:
line = self.buf; self.buf = b""
return line.decode(errors="ignore")
return ""
self.buf += chunk
if len(self.buf) > LINE_MAXLEN:
line = self.buf[:LINE_MAXLEN]; self.buf = self.buf[LINE_MAXLEN:]
return line.decode(errors="ignore")
def read_until_any(self, patterns, timeout=READ_TIMEOUT, max_lines=RECV_SOFT_LIMIT_LINES):
compiled = [re.compile(p, re.I) for p in patterns]
end = time.time() + (timeout if timeout is not None else 1e9)
lines, midx = [], -1
while len(lines) < max_lines and time.time() < end:
line = self.readline(timeout=end - time.time())
if not line: break
lines.append(line)
for i, pr in enumerate(compiled):
if pr.search(line):
return "".join(lines), i
return "".join(lines), midx
def read_some_lines(self, n=1, timeout=READ_TIMEOUT):
end = time.time() + (timeout if timeout is not None else 1e9)
out = []
for _ in range(n):
if time.time() >= end: break
line = self.readline(timeout=end - time.time())
if not line: break
out.append(line)
return "".join(out)
def tcp_tune(sock: socket.socket):
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
except Exception:
pass
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 立发小包
except Exception:
pass
# ---------- 查询构造 ----------
def xor_two(a, b):
return "( ( {} and ( {} == 0 ) ) or ( ( {} == 0 ) and {} ) )".format(a, b, a, b)
def xor_expr_for_indices(indices):
if not indices: return "0"
expr = f"S{indices[0]}"
for idx in indices[1:]:
expr = xor_two(expr, f"S{idx}")
return expr
def rank_gf2(A):
A = [row[:] for row in A]
m = len(A); n = len(A[0]) if m else 0
r = 0
for c in range(n):
piv = None
for i in range(r, m):
if A[i][c]: piv = i; break
if piv is None: continue
A[r], A[piv] = A[piv], A[r]
for i in range(m):
if i != r and A[i][c]:
for j in range(c, n):
A[i][j] ^= A[r][j]
r += 1
if r == m: break
return r
def build_queries(n_queries, seed=None):
rnd = random.Random(seed)
rows = []
while True:
rows = []
for _ in range(8):
mask = [i for i in range(8) if rnd.choice([0,1])]
if not mask: mask = [rnd.randrange(8)]
rows.append(mask)
A = [[1 if j in row else 0 for j in range(8)] for row in rows]
if rank_gf2(A) == 8: break
while len(rows) < n_queries:
mask = [i for i in range(8) if rnd.choice([0,1])]
if not mask: mask = [rnd.randrange(8)]
rows.append(mask)
queries = [f"( {xor_expr_for_indices(mask)} ) == 1" for mask in rows]
return rows, queries
# ---------- GF(2) 求解 + 位运算验证 ----------
def solve_gf2(A, b):
m = len(A)
if m == 0: return [0]* (0 if len(A)==0 else len(A[0]))
n = len(A[0])
M = [row[:] + [bit] for row, bit in zip(A, b)]
r = 0; pivs = []
for c in range(n):
sel = None
for i in range(r, m):
if M[i][c] == 1: sel = i; break
if sel is None: continue
M[r], M[sel] = M[sel], M[r]
pivs.append(c)
for i in range(m):
if i != r and M[i][c] == 1:
for j in range(c, n+1):
M[i][j] ^= M[r][j]
r += 1
if r == m: break
for i in range(r, m):
if all(M[i][j]==0 for j in range(n)) and M[i][n]==1:
return None
x = [0]*n
for i, c in enumerate(pivs):
x[c] = M[i][n]
return x
def rows_to_bitmasks(rows):
masks = []
for mask_list in rows:
v = 0
for j in mask_list:
v |= (1 << j)
masks.append(v)
return masks
def predict_all(bitmasks, sol_bits):
sv = 0
for i, b in enumerate(sol_bits):
if b: sv |= (1 << i)
return [ (m & sv).bit_count() & 1 for m in bitmasks ]
def decode_from_responses(rows, responses, max_err=3):
n = len(rows)
assert len(responses) == n
A_full = [[1 if j in rows[i] else 0 for j in range(8)] for i in range(n)]
b_full = responses
bitmasks = rows_to_bitmasks(rows)
idxs = range(n)
for ecount in range(0, max_err+1):
for errs in itertools.combinations(idxs, ecount):
good = [i for i in idxs if i not in errs]
A = [A_full[i] for i in good]
b = [b_full[i] for i in good]
sol = solve_gf2(A, b)
if sol is None: continue
pred = predict_all(bitmasks, sol)
diff = [i for i in idxs if pred[i] != b_full[i]]
if set(diff) == set(errs):
return sol, errs
return None, None
# ---------- 并行 PoW ----------
def _pow_worker(prefixes, suffix, target, found_evt: Event, outq: Queue):
for a in prefixes:
if found_evt.is_set(): return
for b in ALPHABET:
for c in ALPHABET:
for d in ALPHABET:
if found_evt.is_set(): return
xxxx = f"{a}{b}{c}{d}"
if sha256((xxxx + suffix).encode()).hexdigest() == target:
outq.put(xxxx); found_evt.set(); return
def solve_pow_parallel(suffix, target):
nprocs = min(max(4, cpu_count()), 32)
chunk = (len(ALPHABET) + nprocs - 1) // nprocs
found_evt = Event(); outq = Queue(); procs = []
for i in range(nprocs):
seg = ALPHABET[i*chunk:(i+1)*chunk]
if not seg: continue
p = Process(target=_pow_worker, args=(seg, suffix, target, found_evt, outq))
p.daemon = True; p.start(); procs.append(p)
xxxx = None
deadline = time.time() + POW_WAIT_MAX
try:
while time.time() < deadline and not found_evt.is_set():
time.sleep(0.01)
if found_evt.is_set():
xxxx = outq.get_nowait()
except Exception:
pass
finally:
for p in procs: p.terminate()
for p in procs:
try: p.join(timeout=0.1)
except Exception: pass
return xxxx
# ---------- 回答解析 ----------
RE_PR_LINE = re.compile(r"Prisoner'?s?\s+response:\s*([A-Za-z0-9_!]+)", re.I)
RE_TRUE = re.compile(r"\btrue\b", re.I)
RE_FALSE = re.compile(r"\bfalse\b", re.I)
RE_YES = re.compile(r"\byes\b|\bon\b|\b1\b", re.I)
RE_NO = re.compile(r"\bno\b|\boff\b|\b0\b", re.I)
def parse_bool(text: str) -> int:
m = RE_PR_LINE.search(text)
if m:
tok = m.group(1).lower()
if tok in ("true","t","1","yes","y","on"): return 1
if tok in ("false","f","0","no","n","off"): return 0
if tok.startswith("t"): return 1
if tok.startswith("f"): return 0
if RE_TRUE.search(text): return 1
if RE_FALSE.search(text): return 0
if RE_YES.search(text): return 1
if RE_NO.search(text): return 0
return 0
# ---------- 同步+解题 ----------
PROMPT_PATTERNS = [
r"Ask your question",
r"\byou may ask\b",
r"\bquestion\b",
r"\bquery\b",
r"\bstart\b",
r"\bS0\b",
]
SUBMIT_PATTERNS = [r"\banswer\b", r"\bsubmit\b", r"\benter\b", r"\binput\b"]
RESULT_PATTERNS = [r"correct", r"scowls", r"\bnext\b", r"\bround\b", r"gift", r"congrat", r"success"]
def complete_round(io: LineIO, round_idx: int) -> bool:
rows, queries = build_queries(N_QUERIES, seed=SEED_BASE + round_idx)
# 等第一条“可提问”提示
io.read_until_any(PROMPT_PATTERNS, timeout=READ_TIMEOUT, max_lines=40)
responses = []
for idx, q in enumerate(queries):
# 每题都等提示,确保同步
io.read_until_any(PROMPT_PATTERNS, timeout=PROMPT_TIMEOUT, max_lines=4)
# 发问
io.writeln(q)
# 读取回答
ans_blob, _ = io.read_until_any(
patterns=[r"Prisoner'?s?\s+response", r"\btrue\b", r"\bfalse\b", r"\b(?:yes|no)\b", r"\b[01]\b"],
timeout=PER_Q_TIMEOUT, max_lines=8
)
if not ans_blob:
# 短补读,兜底
ans_blob = io.read_some_lines(3, timeout=PER_Q_TIMEOUT)
bit = parse_bool(ans_blob)
responses.append(bit)
if VERBOSE:
print(f"[A{idx:02d}] bit={bit} | raw={ans_blob.strip()[:120]}")
# 解码 + 校验
sol, errs = decode_from_responses(rows, responses, max_err=MAX_ERROR_BITS)
if sol is None:
return False
# 提交答案
io.read_until_any(SUBMIT_PATTERNS, timeout=READ_TIMEOUT, max_lines=20)
io.writeln(" ".join(str(x) for x in sol))
# 读结果
res, _ = io.read_until_any(RESULT_PATTERNS, timeout=READ_TIMEOUT, max_lines=40)
low = (res or "").lower()
ok = any(k in low for k in ["scowls","correct","next","round","gift","success","congrat"])
return ok
def do_pow(io: LineIO, banner: str):
m = re.search(r"sha256\(XXXX\+([^\)]+)\)\s*==\s*([0-9a-fA-F]{64})", banner)
if not m:
extra, _ = io.read_until_any([r"sha256\(XXXX\+"], timeout=READ_TIMEOUT, max_lines=40)
banner += extra
m = re.search(r"sha256\(XXXX\+([^\)]+)\)\s*==\s*([0-9a-fA-F]{64})", banner)
if not m:
return
suffix, target = m.group(1), m.group(2)
print(f"[POW] suffix={suffix} target={target}")
xxxx = solve_pow_parallel(suffix, target)
if not xxxx:
print("[POW] 并行未找到;建议提高 POW_WAIT_MAX 或换更快的机器。")
sys.exit(2)
print(f"[POW] solved: {xxxx}")
io.writeln(xxxx)
ack, _ = io.read_until_any([r"OK", r"pass", r"correct", r"success", r"failed", r"invalid"],
timeout=READ_TIMEOUT, max_lines=20)
if re.search(r"failed|invalid", ack or "", re.I):
print("[POW] 校验失败"); sys.exit(3)
def interact(host, port):
try:
socket.getaddrinfo(host, port) # DNS 预热
except Exception:
pass
with closing(socket.create_connection((host, port), timeout=8)) as s:
# 网络参数优化
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"): s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
if hasattr(socket, "TCP_KEEPINTVL"): s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
if hasattr(socket, "TCP_KEEPCNT"): s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
except Exception:
pass
try: s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception: pass
io = LineIO(s)
# 初始横幅
banner = io.read_some_lines(40, timeout=READ_TIMEOUT)
if banner:
print("== banner ==\n" + banner)
# POW
do_pow(io, banner)
# 25 轮
for r in range(ROUNDS):
attempts = 0
while True:
attempts += 1
ok = complete_round(io, r)
if ok:
if r == 9:
gift = io.read_some_lines(20, timeout=READ_TIMEOUT)
if gift.strip():
print("[Gift] " + gift.strip()[:200])
break
if attempts > (1 + RETRY_PER_ROUND):
print(f"[!] 第 {r+1} 轮失败过多,退出。")
sys.exit(4)
# 清残余后重试
io.read_some_lines(40, timeout=READ_TIMEOUT)
final = io.read_some_lines(80, timeout=READ_TIMEOUT)
if final:
print("== Final ==\n" + final)
# ---------- 入口 ----------
if __name__ == "__main__":
try:
interact(HOST, PORT)
except KeyboardInterrupt:
print("\n[!] 中断")
sys.exit(130)
except Exception as e:
print(f"[!] 异常:{e}")
sys.exit(1)




