Reverse-Engineering a Proprietary Industrial Protocol on a Raspberry Pi
How we connected to optical sorters using a proprietary binary protocol — no manufacturer rep, no SDK, just docs and trial and error.
Most “AI for manufacturing” pitches skip the hardest part. They show dashboards, they show anomaly detection, they show predictive maintenance curves. What they don’t show is how the data gets from the machine to the model. That integration layer — the part where you’re staring at hex dumps on a factory floor, wondering why the third sorter sends bzip2 and the other two send raw JSON — is where the real work lives.
This is the story of how we connected to three industrial optical sorters using a proprietary binary protocol called NWS, running on a Raspberry Pi as a TCP listener. No manufacturer representative. No SDK. Just a 20-page PDF and a willingness to be wrong a lot.
The Setup
The factory runs three optical sorting machines. Each one inspects individual products at high speed — 3D scanning via three cameras at 120-degree angles, measuring length, diameter, area, volume, shape, roundness, ovality, defect scores, and more. About 840 items per 30-second batch, 20+ measurements per item. Serious industrial equipment.
These machines have a built-in feature called NWS-Client: they can push their data over TCP to an external server. The documentation describes the protocol. What the documentation doesn’t describe is what the machines actually do.
The Protocol (On Paper)
NWS-Client v1.2.0 looks straightforward:
- Machine initiates TCP connection to your server
- Sends a 4-byte big-endian length header
- Sends
lengthbytes of bzip2-compressed JSON - You send back a single byte
A(0x41) as acknowledgment - Connection stays open — machine keeps pushing
Simple enough to write the core in under 50 lines:
async def handle_connection(reader, writer):
while True:
header = await recv_exact(reader, 4)
payload_length = struct.unpack(">I", header)[0]
payload = await recv_exact(reader, payload_length)
json_str = decompress_payload(payload)
packet = json.loads(json_str)
writer.write(b"A")
await writer.drain()
# Route packet to handler...
What could go wrong?
What the Documentation Doesn’t Tell You
Compression is optional. The machines have a “Don’t Compress Message” toggle buried in the touchscreen config. One machine had it enabled, two didn’t. The documentation describes bzip2 compression as the format. Our listener tries bzip2, and on failure falls back to raw JSON:
def decompress_payload(data: bytes) -> str:
try:
return bz2.decompress(data).decode("utf-8")
except Exception:
try:
text = data.decode("utf-8")
json.loads(text) # validate it's JSON
return text
except (UnicodeDecodeError, json.JSONDecodeError):
raise ValueError("Neither bzip2 nor valid JSON")
Defensive? Yes. Necessary? Absolutely. You don’t get to ask the machine to retry.
Packet type naming is inconsistent. The documentation shows "productList" in one section and "product_list" in another. Different firmware versions on different machines sent different variants. Same for "UtilizationInfo" vs "utilizationInfo", "programPacket" vs "ProgramPacket". We ended up with an explicit normalization map:
PACKET_TYPE_MAP = {
"productList": "product_list",
"product_list": "product_list",
"UtilizationInfo": "UtilizationInfo",
"utilizationInfo": "UtilizationInfo",
"programPacket": "programPacket",
"ProgramPacket": "programPacket",
# ...
}
Not elegant. But it works at 3am when nobody’s watching.
The outlet/channel mapping is the real puzzle. Each machine classifies items into classes (1-7) and routes them to physical outlets (1-5). The documentation uses “class” and “outlet” and “channel” somewhat interchangeably. In reality, they are three distinct concepts:
- Class is the machine’s quality classification (1=A, 2=B, … 6=F, 7=Waste)
- Outlet is the physical chute the item exits through
- Channel is the logical grouping that ties to downstream processing
Multiple classes can route to the same outlet. Outlet 1 gets both Class 6 (lower quality) and Class 7 (waste). Outlets 3 and 4 get Classes A, B, and C. The mapping is set by the operator’s sorting program and can change between shifts.
We only figured this out by capturing programPacket messages — 80+ field configuration dumps that the machines send on boot. These contain classOutletNo arrays that reveal the actual routing:
def _build_full_config(packet: dict) -> dict | None:
meta_names = packet.get("classMetaName", [])
if not meta_names:
return None # Only full packets have classMetaName
outlet_no = packet.get("classOutletNo", [])
labels = {}
for idx in range(len(meta_names)):
nws_class = idx + 1
labels[nws_class] = default_labels[idx]
# ...
There’s a catch: full programPacket messages are rare. Mostly you get partial ones — single-field updates sent when an operator tweaks a setting on the touchscreen. Changed the diameter range for Class C? You get a programPacket with just classDiameterMin. No classMetaName, no outlet assignments, nothing else. Our listener merges these partials into the last known full config, building up a picture of what the machine is currently doing.
Diameter ranges are fluid. One machine might define Class A as 22-28mm. Another machine of the same model uses 20-30mm. The operator changes them. Any system that hardcodes thresholds will produce wrong data within a week. We persist the latest config per machine and use it for classification on every incoming packet.
The Weigher Surprise
Halfway through the project, we discovered that packing machines downstream of the sorters — multihead weighers from the same manufacturer — speak the exact same NWS protocol. Same 4-byte header, same JSON structure, same acknowledgment byte. Different packet types though: keyfigureList, errorLog, tareInfo, recipeParameters.
The weigher integration was supposed to require a manufacturer representative visit. Turned out the NWS client was already built into the machine firmware. Two minutes of touchscreen configuration — set the network address and enable three packet types — and data started flowing to our existing listener. We added a prefix check to route weigher packets to a separate handler:
def handle_product_list(packet, raw_json):
store_raw_packet(packet, raw_json, "product_list")
machine_id = packet.get("machine_id", "unknown")
if machine_id.startswith("WGH_"):
weigher_handle_product_list(machine_id, packet, db_path)
return
# Sorter aggregation path...
Same TCP listener, same port, same protocol, different data. The weigher sends per-bag weights with a pansUsed array showing which of the 15 combination heads contributed to each bag. We compute giveaway (actual weight minus target), track underweight/overweight counts, and aggregate per minute — all from a data source we didn’t even plan to capture.
Why This Matters
The gap between “we have AI models” and “we have AI that understands your factory” is almost entirely in the integration layer. It’s TCP sockets and byte-order headers and figuring out that outlet 1 != Channel A. It’s handling three machines with three different firmware versions and subtly different behaviors. It’s knowing that a partial programPacket at 14:37 means the operator just changed the size thresholds, and every product classified after that point uses the new ranges.
None of this is intellectually glamorous. But it’s the reason our system captures 20+ measurements per item across multiple machine types, aggregated per minute, synced to a cloud API, running on a $80 single-board computer with zero data loss. The total external dependency list: requests. That’s it. Python standard library does the rest — asyncio for the TCP server, struct for binary parsing, bz2 for decompression, sqlite3 for storage.
The manufacturer’s own documentation was a starting point, not the truth. The truth was in the packets. And the only way to find it was to listen.