Post

Break My Stream

N0PSctf Logo

CrypTopia is testing their next gen encryption algorithm. We believe that the way they implemented it may have a flaw…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os

class CrypTopiaSC:

    @staticmethod
    def KSA(key, n):
        S = list(range(n))
        j = 0
        for i in range(n):
            j = ((j + S[i] + key[i % len(key)]) >> 4 | (j - S[i] + key[i % len(key)]) << 4) & (n-1)
            S[i], S[j] = S[j], S[i]
        return S

    @staticmethod
    def PRGA(S, n):
        i = 0
        j = 0
        while True:
            i = (i+1) & (n-1)
            j = (j+S[i]) & (n-1)
            S[i], S[j] = S[j], S[i]
            yield S[((S[i] + S[j]) >> 4 | (S[i] - S[j]) << 4) & (n-1)]

    def __init__(self, key, n=256):
        self.KeyGenerator = self.PRGA(self.KSA(key, n), n)

    def encrypt(self, message):
        return bytes([char ^ next(self.KeyGenerator) for char in message])

def main():
    flag = b"XXX"
    key = os.urandom(256)
    encrypted_flag = CrypTopiaSC(key).encrypt(flag)
    print("Welcome to our first version of CrypTopia Stream Cipher!\nYou can here encrypt any message you want.")
    print(f"Oh, one last thing: {encrypted_flag.hex()}")
    while True:
        pt = input("Enter your message: ").encode()
        ct = CrypTopiaSC(key).encrypt(pt)
        print(ct.hex())

if __name__ == "__main__":
    main()

This challenge had a server to connect to that was running this program. The only difference is that the flag variable was obviously not b”XXX” as it gave a much longer response.

I worked with ChatGPT to get a python script for this one. I didn’t want the script to connect to the server, so I asked for the script to allow for a manually entered string. It seems my ChatGPT is overly complicating things. I think there is also a chance that my ChatGPT is avoiding using pwntools due to a previous incident.

What follows is the script and the information needed to call it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3
import sys, argparse

def hex_to_bytes(s: str) -> bytes:
    try:
        return bytes.fromhex(s.strip())
    except ValueError:
        print(f"Error: '{s}' is not valid hex.", file=sys.stderr)
        sys.exit(1)

def recover_plaintext(flag_ct: bytes, dummy_ct: bytes, known_prefix: bytes) -> bytes:
    L = len(flag_ct)
    if len(dummy_ct) != L:
        print(
            f"Error: dummy ciphertext is {len(dummy_ct)} bytes but flag ciphertext is {L} bytes.",
            file=sys.stderr
        )
        sys.exit(1)

    # Build the keystream: keystream[i] = dummy_ct[i] ^ 0x41
    keystream = bytes(dummy_ct[i] ^ 0x41 for i in range(L))

    # Decrypt the flag: plaintext[i] = flag_ct[i] ^ keystream[i]
    plaintext = bytes(flag_ct[i] ^ keystream[i] for i in range(L))

    if not plaintext.startswith(known_prefix):
        print(
            "Warning: recovered plaintext does not start with the known prefix.\n"
            f"  Expected {known_prefix!r}, but got {plaintext[:len(known_prefix)]!r}.",
            file=sys.stderr
        )
    return plaintext

def main():
    parser = argparse.ArgumentParser(
        description="Recover a CrypTopiaSC‐encrypted flag given:\n"
                    "  1) the flag’s ciphertext (hex),\n"
                    "  2) the ciphertext of 'A'*L from the same key,\n"
                    "  3) a known prefix (e.g. N0PS{)."
    )
    parser.add_argument(
        "--flag", "-f",
        required=True,
        help="Flag’s ciphertext (hex) copied from the remote service."
    )
    parser.add_argument(
        "--dummy", "-d",
        required=True,
        help="Ciphertext (hex) you got by sending 'A'*L to the same remote service."
    )
    parser.add_argument(
        "--prefix", "-p",
        required=True,
        help="Known ASCII prefix of the flag (e.g. N0PS{)."
    )
    args = parser.parse_args()

    flag_ct   = hex_to_bytes(args.flag)
    dummy_ct  = hex_to_bytes(args.dummy)
    prefix    = args.prefix.encode("utf-8")

    recovered = recover_plaintext(flag_ct, dummy_ct, prefix)
    try:
        print("Recovered plaintext:", recovered.decode("utf-8"))
    except UnicodeDecodeError:
        print("Recovered plaintext (bytes):", recovered)

if __name__ == "__main__":
    main()
1
2
3
4
python3 decrypt_flag.py \
  --flag  9bda0f279f80ddb5c6e3d8f76f56a88f6f114300c44ea0 \
  --dummy <paste‑the‑46‑hex‑chars‑you‑got‑from‑encrypting "A"*23> \
  --prefix N0PS{

Here are some other scripts that accomplished the same thing:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pwn import *

def xor(a, b):
    return bytes([x ^ y for x, y in zip(a, b)])

conn = remote("0.cloud.chals.io", 31561)

# Read intro and get encrypted flag
line = conn.recvuntil(b"thing: ")
flag_ct_hex = conn.recvline().strip()
flag_ct = bytes.fromhex(flag_ct_hex.decode())
print("[+] Got encrypted flag:", flag_ct.hex())

# Send known plaintext of same length
known_pt = b'A' * len(flag_ct)
conn.sendlineafter(b"Enter your message: ", known_pt)

# Receive encrypted known plaintext
known_ct_hex = conn.recvline().strip()
known_ct = bytes.fromhex(known_ct_hex.decode())
print("[+] Got encrypted known plaintext:", known_ct.hex())

# Recover keystream
keystream = xor(known_ct, known_pt)

# Decrypt the flag
flag = xor(flag_ct, keystream)
print("[+] Recovered flag:", flag.decode())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import socket
import re
import sys
HOST = "0.cloud.chals.io"
PORT = 31561
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    print(f"Connecting to {HOST}:{PORT}...")
    sock.connect((HOST, PORT))
    print("Connected.")
    # Receive welcome message and encrypted flag
    initial_data = b""
    while True:
        chunk = sock.recv(4096)
        if not chunk:
            print("Connection closed unexpectedly while reading initial data.", file=sys.stderr)
            sock.close()
            exit(1)
        initial_data += chunk
        # Wait until the prompt is fully received
        if b"Enter your message: " in initial_data:
            break
            
    initial_data_str = initial_data.decode(errors='ignore') # Fixed syntax error
    print("Received initial data:")
    print(initial_data_str)
    # Extract encrypted flag hex
    match_ef = re.search(r"Oh, one last thing: ([0-9a-f]+)", initial_data_str)
    if not match_ef:
        print("Could not find encrypted flag hex.", file=sys.stderr)
        sock.close()
        exit(1)
    encrypted_flag_hex = match_ef.group(1)
    print(f"Encrypted Flag HEX: {encrypted_flag_hex}")
    # Calculate flag length
    try:
        ef_bytes = bytes.fromhex(encrypted_flag_hex)
        flag_len = len(ef_bytes)
        print(f"Flag length: {flag_len}")
    except ValueError:
        print("Invalid hex string for encrypted flag.", file=sys.stderr)
        sock.close()
        exit(1)
    # Prepare message: Send 'A' repeated flag_len times.
    # The server reads this string and encodes it using .encode()
    message_payload_str = 'A' * flag_len
    message_payload_bytes = message_payload_str.encode("ascii") # This is what gets encrypted
    print(f"Sending message payload (string): {message_payload_str}")
    # Send message payload + newline
    sock.sendall((message_payload_str + '\n').encode())
    # Receive encrypted message hex
    response_data = b""
    while True: # Read until the prompt appears again
        try:
            sock.settimeout(10.0) # Increased timeout
            chunk = sock.recv(4096)
            if not chunk:
                print("Connection closed while waiting for response.", file=sys.stderr)
                break
            response_data += chunk
            # Check if the prompt is at the end of the accumulated data
            if response_data.strip().endswith(b"Enter your message:"):
                print("Detected prompt, stopping read.")
                break
        except socket.timeout:
            print("Socket timeout waiting for response/prompt.", file=sys.stderr)
            # Check if we received anything useful before timeout
            if response_data:
                print("Proceeding with received data despite timeout.")
                break
            else:
                print("No data received before timeout.", file=sys.stderr)
                sock.close()
                exit(1)
        except Exception as e:
            print(f"Error receiving data: {e}", file=sys.stderr)
            sock.close()
            exit(1)
            
    sock.settimeout(None) # Disable timeout
    response_str = response_data.decode(errors='ignore') # Fixed syntax error
    print(f"Raw response data: {repr(response_str)}")
    # Extract hex using regex, looking for a hex string of the correct length
    # The hex string should appear before the *next* "Enter your message:" prompt
    expected_hex_len = flag_len * 2
    # Regex to capture hex ending just before the prompt, allowing for optional whitespace
    match_em = re.search(r"([0-9a-f]{" + str(expected_hex_len) + r"})\s*Enter your message:", response_str)
    
    if not match_em:
         # Fallback: search anywhere if not found immediately before prompt
         print("Primary regex failed, trying fallback regex...")
         match_em = re.search(r"([0-9a-f]{" + str(expected_hex_len) + r"})", response_str)
    if not match_em:
        print(f"Could not find encrypted message hex of length {expected_hex_len} in response.", file=sys.stderr)
        print(f"Raw response was: {repr(response_str)}", file=sys.stderr)
        sock.close()
        exit(1)
    encrypted_message_hex = match_em.group(1)
    print(f"Extracted Encrypted Message HEX: {encrypted_message_hex}")
    # Calculate the flag: F = EF XOR EM XOR P
    try:
        em_bytes = bytes.fromhex(encrypted_message_hex)
        if len(em_bytes) != flag_len:
            print(f"Error: Encrypted message length ({len(em_bytes)}) does not match flag length ({flag_len}).", file=sys.stderr)
            sock.close()
            exit(1)
        # XOR all three components: EncryptedFlag, EncryptedMessage, PlaintextMessage
        flag_bytes = bytes(ef ^ em ^ pm for ef, em, pm in zip(ef_bytes, em_bytes, message_payload_bytes))
        try:
            flag = flag_bytes.decode()
            print(f"\nRecovered Flag: {flag}")
            # Basic validation
            if flag.startswith("flag{") and flag.endswith("}"):
                print("Flag format looks valid!")
            else:
                print("Warning: Flag format might be incorrect.")
        except UnicodeDecodeError:
            print(f"\nCould not decode flag bytes: {flag_bytes.hex()}", file=sys.stderr)
            print("The result might be binary data or incorrectly decrypted.", file=sys.stderr)
    except ValueError as e:
        print(f"Invalid hex string for encrypted message: {e}", file=sys.stderr)
        sock.close()
        exit(1)
finally:
    print("Closing connection.")
    sock.close()
This post is licensed under CC BY 4.0 by the author.