# -*- coding: utf-8 -*-
"""
RTU-over-TCP Modbus Master server
- TCP(8181): device connects
- HTTP(8182): /ui, /clients, /push_raw, /modbus/read, /modbus/write, /modbus/read_bulk8
PPT 8페이지(일괄 읽기) 최신 매핑(사용자 제공):
온도 : 1~3 (3워드)
습도 : 4~6 (3워드)
UV 상태 : 7 (1워드, uint16)
FAN 상태 : 8 (1워드, uint16)
잠김 상태 : 9 (1워드, uint16)
충전 중 상태 : 10 (1워드, uint16)
충전 완료 상태 : 11 (1워드, uint16)
안전모 유무 상태 : 12 (1워드, uint16)
태양광 전압 : 13~15 (3워드)
태양광 전류 : 16~18 (3워드)
GPS 위도 : 19~21 (3워드)
GPS 경도 : 22~24 (3워드)
배터리 잔량 % : 25~27 (3워드)
메모:
- 3워드 필드는 장치 펌웨어에 따라 ASCII(예: "26.4"), BCD/정수스케일, 또는 32bit float(+예약워드)일 수 있음.
- 아래 디코더는 우선순위로 [ASCII -> 24/48bit 정수스케일 -> 32bit float(word-swap/normal)]을 시도한다.
- 값의 합리성 체크(클램프)를 통해 가장 그럴듯한 해석을 최종 선택한다.
"""
import os, asyncio, struct, base64, re, socket
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
TCP_HOST = os.getenv("TCP_HOST", "0.0.0.0")
TCP_PORT = int(os.getenv("TCP_PORT", "8181"))
CTRL_HOST = os.getenv("CTRL_HOST", "0.0.0.0")
CTRL_PORT = int(os.getenv("CTRL_PORT", "8182"))
IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "120"))
READ_CHUNK = int(os.getenv("READ_CHUNK", "65536"))
LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", "4096"))
CLIENTS: Dict[str, asyncio.StreamWriter] = {}
META: Dict[str, Dict] = {}
INBOX: Dict[str, asyncio.Queue] = {}
from collections import deque
# ===== In-memory time-series buffers =====
TS_BUFFERS: Dict[str, deque] = {}
TS_MAX = int(os.getenv("TS_MAX", "500")) # keep up to 500 samples per device
TS_PERIOD_SEC = float(os.getenv("TS_PERIOD_SEC", "2.0")) # poll every 2 seconds
async def _poll_devices_task():
"""Periodically read bulk8 (1..30) from all connected devices and store last TS_MAX samples."""
while True:
keys = list(CLIENTS.keys())
for key in keys:
try:
start_addr, qty = 0x0001, 30
frame = build_read_holding(1, start_addr, qty)
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=2.0))
regs = parsed.get("registers_be", [])
if not regs:
continue
fields = _decode_bulk_fields_v2(regs)
def g(k):
for f in fields:
if f.get("key")==k:
return f.get("value")
return None
sample = {
"ts": datetime.now(timezone.utc).isoformat(),
"key": key,
"temp": g("temp"),
"humi": g("humi"),
"pv_volt": g("pv_volt"),
"pv_curr": g("pv_curr"),
"batt_pct": g("batt_pct"),
"uv_state": g("uv_state"),
"fan_state": g("fan_state"),
"lock": g("lock"),
"chg_ing": g("chg_ing"),
"chg_done": g("chg_done"),
"helmet": g("helmet"),
}
dq = TS_BUFFERS.get(key)
if dq is None:
dq = deque(maxlen=TS_MAX)
TS_BUFFERS[key] = dq
dq.append(sample)
except Exception:
pass
await asyncio.sleep(TS_PERIOD_SEC)
def hexdump(b: bytes, width=16) -> str:
def p(x): return chr(x) if 32 <= x < 127 else "."
lines = []
for i in range(0, len(b), width):
chunk = b[i:i+width]
hexpart = " ".join(f"{c:02x}" for c in chunk)
lines.append(f"{i:04x} {hexpart:<{width*3}} |{''.join(p(c) for c in chunk)}|")
return "\n".join(lines)
def log_payload(peer: str, data: bytes):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%fZ")
size = len(data)
print(f"[tcp] {ts} {peer} -> {size} bytes")
sample = data[:LOG_MAX_BYTES]
print(hexdump(sample))
if size > LOG_MAX_BYTES:
print(f"... (truncated, showed first {LOG_MAX_BYTES} of {size} bytes)")
# ----- MODBUS RTU helpers -----
def crc16_modbus(data: bytes) -> int:
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 1: crc = (crc >> 1) ^ 0xA001
else: crc >>= 1
return crc & 0xFFFF
def add_crc(frame_wo_crc: bytes) -> bytes:
c = crc16_modbus(frame_wo_crc)
return frame_wo_crc + bytes([c & 0xFF, (c >> 8) & 0xFF])
def build_read_holding(unit: int, addr: int, qty: int) -> bytes:
return add_crc(struct.pack(">B B H H", unit, 0x03, addr, qty))
def build_write_single(unit: int, addr: int, value: int) -> bytes:
return add_crc(struct.pack(">B B H H", unit, 0x06, addr, value))
def clean_hex(s: str) -> str:
s = s.strip().replace("0x","").replace("0X","")
return re.sub(r"[^0-9A-Fa-f]", "", s).rjust((len(s)+1)//2*2, "0")
def parse_modbus_03_or_06(resp: bytes) -> dict:
buf = resp[:-2] if resp.endswith(b"\r\n") else resp
if len(buf) < 5: return {"error":"frame too short","len":len(buf),"hex":buf.hex()}
uid, fc = buf[0], buf[1]
if fc == 0x03:
bc = buf[2]; data_end = 3 + bc
if data_end + 2 > len(buf): return {"error":"byte_count mismatch","len":len(buf),"hex":buf.hex()}
data = buf[3:data_end]
pcrc = (buf[data_end+1] << 8) | buf[data_end]
ccrc = crc16_modbus(buf[:data_end])
regs = [int.from_bytes(data[i:i+2],"big") for i in range(0,len(data),2) if i+2<=len(data)]
# 2워드 단위 float(word-swap)도 병행 저장(후보 해석용)
floats_wordswap = []
if len(data) >= 4 and len(data) % 2 == 0:
for i in range(0, len(data) - 3, 4):
w1, w2 = data[i:i+2], data[i+2:i+4]
try:
floats_wordswap.append(struct.unpack(">f", w2 + w1)[0])
except Exception:
floats_wordswap.append(float("nan"))
return {"id":uid,"fc":fc,"byte_count":bc,"registers_be":regs,"floats_wordswap":floats_wordswap,
"crc_ok":(pcrc==ccrc),"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()}
if fc == 0x06:
if len(buf) < 8: return {"error":"write echo too short","len":len(buf),"hex":buf.hex()}
addr = (buf[2] << 8) | buf[3]; val = (buf[4] << 8) | buf[5]
pcrc = (buf[7] << 8) | buf[6]; ccrc = crc16_modbus(buf[:6])
return {"id":uid,"fc":fc,"addr":addr,"value":val,"crc_ok":(pcrc==ccrc),
"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()}
return {"id":uid,"fc":fc,"hex":buf.hex(),"note":"unsupported fc"}
# ----- TCP accept -----
async def handle_conn(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
key = f"{addr[0]}:{addr[1]}"
CLIENTS[key] = writer
META[key] = {"addr": addr, "last": datetime.now(timezone.utc)}
INBOX[key] = asyncio.Queue()
print(f"[tcp] connected: {key}")
try:
while True:
try:
chunk = await asyncio.wait_for(reader.read(READ_CHUNK), timeout=1.0)
except asyncio.TimeoutError:
if (datetime.now(timezone.utc) - META[key]["last"]).total_seconds() > IDLE_TIMEOUT:
print(f"[tcp] idle timeout: {key}"); break
continue
except asyncio.IncompleteReadError:
break
if not chunk: break
META[key]["last"] = datetime.now(timezone.utc)
log_payload(key, chunk)
try: INBOX[key].put_nowait(chunk)
except Exception: pass
finally:
try: writer.close(); await writer.wait_closed()
except Exception: pass
CLIENTS.pop(key, None); INBOX.pop(key, None); META.pop(key, None)
print(f"[tcp] disconnected: {key}")
async def run_tcp():
kwargs = {}
if os.name != "nt" and hasattr(socket, "SO_REUSEPORT"):
kwargs["reuse_port"] = True
server = await asyncio.start_server(handle_conn, TCP_HOST, TCP_PORT, **kwargs)
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
print(f"[tcp] listening on {addrs}")
async with server:
await server.serve_forever()
# ----- FastAPI -----
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
try:
from fastapi.middleware.cors import CORSMiddleware
except Exception:
CORSMiddleware = None
from pydantic import BaseModel, Field
app = FastAPI(title="RTU-over-TCP Modbus Master (device connects here)")
if CORSMiddleware:
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
@app.get("/clients")
def list_clients():
return {"count": len(META), "items": [{"key":k, "last":v["last"].isoformat()} for k,v in META.items()]}
def _resolve_target(target: str) -> str:
# Accept patterns:
# - "ip:port" exact
# - "ip:*" pick latest connection for that IP
# - "*" or "" pick latest connection regardless of IP (for DHCP devices)
if target in CLIENTS:
return target
# wildcard or empty -> most recent
if not target or str(target).strip() in ("*", "*:*"):
if not META:
raise HTTPException(404, "no active connections")
return max(META.items(), key=lambda kv: kv[1]["last"])[0]
if ":" not in target or target.endswith(":*"):
ip = target.split(":")[0].strip()
cand = [(k, v["last"]) for k, v in META.items() if k.split(":")[0] == ip]
if not cand:
raise HTTPException(404, f"no active connection for ip {ip}")
cand.sort(key=lambda kv: kv[1], reverse=True)
return cand[0][0]
raise HTTPException(404, "target not connected")
async def _send_and_wait(key: str, payload: bytes, timeout: float=3.0) -> bytes:
w = CLIENTS.get(key)
if not w: raise HTTPException(404, "target writer missing")
q = INBOX.get(key)
if q is None: raise HTTPException(500, "inbox missing")
while not q.empty():
try: q.get_nowait()
except Exception: break
w.write(payload); await w.drain()
try:
resp = await asyncio.wait_for(q.get(), timeout=timeout)
except asyncio.TimeoutError:
raise HTTPException(504, "device response timeout")
return resp
class PushReq(BaseModel):
target: str; data: str
encoding: str = "hex"; append_newline: bool = False
await_response: bool = True; response_timeout_ms: int = 3000
@app.post("/push_raw")
async def push_raw(req: PushReq):
key = _resolve_target(req.target)
enc = (req.encoding or "hex").lower()
if enc == "hex": payload = bytes.fromhex(clean_hex(req.data))
elif enc == "text": payload = req.data.encode() + (b"\n" if req.append_newline else b"")
elif enc == "base64": payload = base64.b64decode(req.data)
else: raise HTTPException(400, "encoding must be hex|text|base64")
if req.await_response:
resp = await _send_and_wait(key, payload, timeout=max(0.001, req.response_timeout_ms/1000.0))
return {"ok":True,"sent_hex":payload.hex(),"resp_hex":resp.hex(),"resp_preview":resp[:128].decode("utf-8","replace")}
w = CLIENTS[key]; w.write(payload); await w.drain()
return {"ok":True,"sent_hex":payload.hex(),"resp":None}
class MBReadReq(BaseModel):
target: str = Field(..., description="ip:port")
unit: int = Field(1, ge=0, le=247)
addr: int = Field(..., ge=0, le=0xFFFF)
qty: int = Field(..., ge=1, le=125)
class MBWriteReq(BaseModel):
target: str; unit: int = Field(1, ge=0, le=247)
addr: int = Field(..., ge=0, le=0xFFFF)
value: int = Field(..., ge=0, le=0xFFFF)
@app.post("/modbus/read")
async def mb_read(req: MBReadReq):
key = _resolve_target(req.target)
frame = build_read_holding(req.unit, req.addr, req.qty)
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed}
@app.post("/modbus/write")
async def mb_write(req: MBWriteReq):
key = _resolve_target(req.target)
frame = build_write_single(req.unit, req.addr, req.value)
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed}
# ====== Bulk8 (addr=1 qty=30) with NEW mapping ======
class MBReadBulk8Req(BaseModel):
target: str = Field(..., description="ip:port")
unit: int = Field(1, ge=0, le=247)
def _words(regs: List[int], a: int, n: int) -> List[int]:
i0 = a - 1
out = []
for i in range(n):
j = i0 + i
out.append(regs[j] if 0 <= j < len(regs) else None)
return out
def _words_hex(regs: List[int], a: int, n: int) -> str:
ws = _words(regs, a, n)
bytes_list = []
for w in ws:
if w is None: bytes_list += ["..",".."]
else: bytes_list += [f"{(w>>8)&0xFF:02X}", f"{w&0xFF:02X}"]
return " ".join(bytes_list)
def _triplet_bytes(regs: List[int], a: int) -> Optional[bytes]:
ws = _words(regs, a, 3)
if any(w is None for w in ws): return None
b = bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF, (ws[2]>>8)&0xFF, ws[2]&0xFF])
return b
def _plausible(v: float, kind: str) -> bool:
if v is None or v != v: return False
if kind == "temp": return -40.0 <= v <= 120.0
if kind == "humi": return 0.0 <= v <= 100.0
if kind == "batt": return 0.0 <= v <= 100.0
if kind == "volt": return 0.0 <= v <= 60.0 # 태양광 전압 대략 범위(필요시 조정)
if kind == "curr": return -20.0 <= v <= 20.0
if kind == "gps": return True # 위경도는 후처리에서 포맷
return True
def _try_ascii_float(b: bytes) -> Optional[float]:
s = "".join(chr(x) for x in b if 32 <= x <= 126).strip()
if not s: return None
# 허용 문자만
if not re.fullmatch(r"[0-9\.\-\+\sA-Za-z]*", s): return None
# 숫자/기호만 추출
m = re.search(r"[-+]?\d+(\.\d+)?", s)
if not m: return None
try: return float(m.group(0))
except: return None
def _try_24_or_48bit_scaled(b: bytes, scale: float=100.0) -> Optional[float]:
# 24비트 정수(상위 3바이트) 또는 48비트 정수→scale로 나눔
if len(b) < 3: return None
i24 = int.from_bytes(b[0:3], "big", signed=False)
v24 = i24/scale
# 48비트도 후보
i48 = int.from_bytes(b[:6], "big", signed=False)
v48 = i48/scale if i48 < 10**9 else None # 너무 크면 버림
# 선택
cand = [v for v in [v24, v48] if v is not None]
return cand[0] if cand else None
def _try_float32_from_words(w1: int, w2: int, swap: bool) -> Optional[float]:
try:
if swap:
b = bytes([(w2>>8)&0xFF, w2&0xFF, (w1>>8)&0xFF, w1&0xFF])
else:
b = bytes([(w1>>8)&0xFF, w1&0xFF, (w2>>8)&0xFF, w2&0xFF])
return struct.unpack(">f", b)[0]
except Exception:
return None
def _pair_bytes(regs: List[int], a: int) -> Optional[bytes]:
ws = _words(regs, a, 2)
if any(w is None for w in ws): return None
return bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF])
def _try_ascii_float4(b: bytes) -> Optional[float]:
s = "".join(chr(x) for x in b if 32 <= x <= 126).strip()
if not s: return None
m = re.search(r"[-+]?\d+(\.\d+)?", s)
if not m: return None
try: return float(m.group(0))
except: return None
def _try_int32_scaled(b: bytes, scale: float=100.0) -> Optional[float]:
if len(b) < 4: return None
i32 = int.from_bytes(b[:4], "big", signed=False)
return i32/scale
def _decode_pair_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]:
b = _pair_bytes(regs, a)
if not b: return (None, "no-bytes")
# 1) ASCII
v = _try_ascii_float4(b)
if v is not None and _plausible(v, kind): return (v, "ascii4")
# 2) float32 swap/be
ws = _words(regs, a, 2)
w1, w2 = ws[0], ws[1]
for swap in (True, False):
v = _try_float32_from_words(w1, w2, swap=swap)
if v is not None and _plausible(v, kind):
return (v, "f32-"+("swap" if swap else "be"))
# 3) int32 /100 then /1000
for sc, tag in [(100.0,"int32/100"), (1000.0,"int32/1000")]:
v = _try_int32_scaled(b, scale=sc)
if v is not None and _plausible(v, kind): return (v, tag)
return (None, "unknown")
def _decode_triplet_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]:
"""3워드 필드 디코드. 반환: (value, how)"""
b = _triplet_bytes(regs, a)
if not b: return (None, "no-bytes")
# 1) ASCII 우선
v = _try_ascii_float(b)
if v is not None and _plausible(v, kind): return (v, "ascii")
# 2) 24/48비트 정수 스케일(기본 /100)
v = _try_24_or_48bit_scaled(b, scale=100.0)
if v is not None and _plausible(v, kind): return (v, "int/100")
# 3) 32비트 float (w1,w2), word-swap/normal 모두 시도
ws = _words(regs, a, 3)
w1, w2 = ws[0], ws[1]
for swap in (True, False):
v = _try_float32_from_words(w1, w2, swap=swap)
if v is not None and _plausible(v, kind):
return (v, "f32-"+("swap" if swap else "be"))
# 실패
return (None, "unknown")
def _decode_bulk_fields_v2(regs: List[int]) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
def add_triplet(addr: int, key: str, name: str, unit: str="", digits:int=2, kind:str="generic", clamp:Tuple[float,float]=None):
v, how = _decode_triplet_numeric(regs, addr, kind)
if clamp and v is not None:
v = max(clamp[0], min(clamp[1], v))
out.append({
"addr": addr, "len":3, "key": key, "name": name, "type":"triplet",
"value": v, "unit": unit, "digits": digits, "how": how,
"raw_words": _words_hex(regs, addr, 3)
})
def add_u16(addr: int, key: str, name: str):
w = _words(regs, addr, 1)[0]
out.append({"addr": addr, "len":1, "key": key, "name": name, "type":"u16",
"value": w, "unit":"", "raw_words": _words_hex(regs, addr, 1)})
# 매핑 적용
add_triplet(1, "temp", "온도", unit="℃", digits=2, kind="temp", clamp=(-50.0, 120.0))
add_triplet(4, "humi", "습도", unit="%", digits=2, kind="humi", clamp=(0.0, 100.0))
add_u16(7, "uv_state", "UV 상태")
add_u16(8, "fan_state", "FAN 상태")
add_u16(9, "lock", "잠김 상태")
add_u16(10, "chg_ing", "충전 중 상태")
add_u16(11, "chg_done", "충전 완료 상태")
add_u16(12, "helmet", "안전모 유무 상태")
add_triplet(13, "pv_volt", "태양광 전압", unit="V", digits=2, kind="volt", clamp=(0.0, 80.0))
add_triplet(16, "pv_curr", "태양광 전류", unit="A", digits=2, kind="curr", clamp=(-50.0, 50.0))
lat_v, lat_how = _decode_pair_numeric(regs, 19, "gps")
out.append({"addr":19, "len":2, "key":"gps_lat", "name":"GPS 위도", "type":"pair", "value":lat_v, "unit":"", "digits":6, "how":lat_how, "raw_words": _words_hex(regs, 19, 2)})
add_u16(21, "gps_stat_lat", "GPS 센서 상태(위도)")
lon_v, lon_how = _decode_pair_numeric(regs, 22, "gps")
out.append({"addr":22, "len":2, "key":"gps_lon", "name":"GPS 경도", "type":"pair", "value":lon_v, "unit":"", "digits":6, "how":lon_how, "raw_words": _words_hex(regs, 22, 2)})
add_u16(24, "gps_stat_lon", "GPS 센서 상태(경도)")
add_triplet(25, "batt_pct","배터리 잔량", unit="%", digits=1, kind="batt", clamp=(0.0, 100.0))
# 참고용 RAW 덤프(미포함 영역만)
covered = set()
for base in (1,4,13,16,25):
covered.update([base, base+1, base+2])
covered.update([19,20,21,22,23,24])
covered.update([7,8,9,10,11,12])
for a in range(1, 31):
if a in covered: continue
out.append({"addr": a, "len":1, "key": f"raw_{a}", "name": f"RAW@{a}", "type":"u16",
"value": _words(regs, a, 1)[0], "raw_words": _words_hex(regs, a, 1)})
out.sort(key=lambda x: x["addr"])
return out
@app.post("/modbus/read_bulk8")
async def mb_read_bulk8(req: MBReadBulk8Req):
key = _resolve_target(req.target)
start_addr = 0x0001
qty = 30 # 슬라이드 8: 1~30
frame = build_read_holding(req.unit, start_addr, qty)
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
if not parsed or "registers_be" not in parsed:
raise HTTPException(502, "invalid device response")
regs = parsed.get("registers_be", [])
fields = _decode_bulk_fields_v2(regs)
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed, "fields": fields}
# ===== Time-series endpoints =====
@app.get("/timeseries/keys")
def ts_keys():
return {"items": [{"key": k, "count": len(v)} for k, v in TS_BUFFERS.items()]}
class TSSamplesReq(BaseModel):
target: str
limit: int = Field(100, ge=1, le=1000)
@app.post("/timeseries/get")
def ts_get(req: TSSamplesReq):
key = _resolve_target(req.target)
dq = TS_BUFFERS.get(key)
if not dq:
return {"ok": True, "target": key, "count": 0, "samples": []}
n = min(len(dq), req.limit)
samples = list(dq)[-n:]
return {"ok": True, "target": key, "count": n, "samples": samples}
# ----- UI -----
@app.get("/ui", response_class=HTMLResponse)
def ui():
return """
RTU-over-TCP Modbus Master
RTU-over-TCP Modbus Master
2) 단말 제어작은 버튼
4) 일괄 상태 (PPT 8p 최신 매핑)
/modbus/read_bulk8: addr=1, qty=30
"""
# ----- entrypoint -----
@app.on_event("startup")
async def _start_poller():
asyncio.create_task(_poll_devices_task())
async def run_http():
import uvicorn
server = uvicorn.Server(uvicorn.Config(app, host=CTRL_HOST, port=CTRL_PORT, log_level="info", reload=False))
await server.serve()
async def main():
await asyncio.gather(run_tcp(), run_http())
if __name__ == "__main__":
asyncio.run(main())