From 8734a149a62559fc839216c9d3a859d4e3f1b194 Mon Sep 17 00:00:00 2001 From: nek924 Date: Thu, 2 Oct 2025 16:49:32 +0900 Subject: [PATCH] =?UTF-8?q?=EC=B5=9C=EC=B4=88=20=EC=BB=A4=EB=B0=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- compose.yaml | 28 ++ socket/Dockerfile | 9 + socket/modbus.py | 689 ++++++++++++++++++++++++++++++++++++++++ socket/requirements.txt | 4 + socket/tcp_server.py | 564 ++++++++++++++++++++++++++++++++ 5 files changed, 1294 insertions(+) create mode 100644 compose.yaml create mode 100644 socket/Dockerfile create mode 100644 socket/modbus.py create mode 100644 socket/requirements.txt create mode 100644 socket/tcp_server.py diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..e726cb4 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,28 @@ +name: helmet + +volumes: + mysqldata: + grafana-data: + +networks: + appnet: + +services: + + # ---------- TCP socket ---------- + socket_server: + build: + context: ./socket + dockerfile: Dockerfile + container_name: socket + environment: + TCP_PORT: 8181 + MODE: line # line | length + ACK: "true" + IDLE_TIMEOUT: 90 + LENGTH_BYTES: 2 # MODE=length일 때만 의미 (2 or 4) + ports: + - "8181:8181" +# - "127.0.0.1:8182:8182" + - "8182:8182" + networks: [ appnet ] \ No newline at end of file diff --git a/socket/Dockerfile b/socket/Dockerfile new file mode 100644 index 0000000..d820e79 --- /dev/null +++ b/socket/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY modbus.py . +ENV TCP_PORT=8181 MODE=stream LOG_MODE=both +ENV CTRL_HOST=0.0.0.0 CTRL_PORT=8182 +EXPOSE 8181 8182 +CMD ["python","modbus.py"] \ No newline at end of file diff --git a/socket/modbus.py b/socket/modbus.py new file mode 100644 index 0000000..91e366e --- /dev/null +++ b/socket/modbus.py @@ -0,0 +1,689 @@ +# -*- coding: utf-8 -*- +""" +RTU-over-TCP Modbus Master server +- TCP(8181): device connects +- HTTP(8182): /ui, /clients, /push_raw, /modbus/read, /modbus/write, /modbus/read_bulk8 + +PPT 8페이지(일괄 읽기) 최신 매핑(사용자 제공): + 온도 : 1~3 (3워드) + 습도 : 4~6 (3워드) + UV 상태 : 7 (1워드, uint16) + FAN 상태 : 8 (1워드, uint16) + 잠김 상태 : 9 (1워드, uint16) + 충전 중 상태 : 10 (1워드, uint16) + 충전 완료 상태 : 11 (1워드, uint16) + 안전모 유무 상태 : 12 (1워드, uint16) + 태양광 전압 : 13~15 (3워드) + 태양광 전류 : 16~18 (3워드) + GPS 위도 : 19~21 (3워드) + GPS 경도 : 22~24 (3워드) + 배터리 잔량 % : 25~27 (3워드) + +메모: +- 3워드 필드는 장치 펌웨어에 따라 ASCII(예: "26.4"), BCD/정수스케일, 또는 32bit float(+예약워드)일 수 있음. +- 아래 디코더는 우선순위로 [ASCII -> 24/48bit 정수스케일 -> 32bit float(word-swap/normal)]을 시도한다. +- 값의 합리성 체크(클램프)를 통해 가장 그럴듯한 해석을 최종 선택한다. +""" + +import os, asyncio, struct, base64, re, socket +from datetime import datetime, timezone +from typing import Dict, List, Any, Optional, Tuple + +TCP_HOST = os.getenv("TCP_HOST", "0.0.0.0") +TCP_PORT = int(os.getenv("TCP_PORT", "8181")) +CTRL_HOST = os.getenv("CTRL_HOST", "0.0.0.0") +CTRL_PORT = int(os.getenv("CTRL_PORT", "8182")) +IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "120")) +READ_CHUNK = int(os.getenv("READ_CHUNK", "65536")) +LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", "4096")) + +CLIENTS: Dict[str, asyncio.StreamWriter] = {} +META: Dict[str, Dict] = {} +INBOX: Dict[str, asyncio.Queue] = {} + +from collections import deque + +# ===== In-memory time-series buffers ===== +TS_BUFFERS: Dict[str, deque] = {} +TS_MAX = int(os.getenv("TS_MAX", "500")) # keep up to 500 samples per device +TS_PERIOD_SEC = float(os.getenv("TS_PERIOD_SEC", "2.0")) # poll every 2 seconds + +async def _poll_devices_task(): + """Periodically read bulk8 (1..30) from all connected devices and store last TS_MAX samples.""" + while True: + keys = list(CLIENTS.keys()) + for key in keys: + try: + start_addr, qty = 0x0001, 30 + frame = build_read_holding(1, start_addr, qty) + parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=2.0)) + regs = parsed.get("registers_be", []) + if not regs: + continue + fields = _decode_bulk_fields_v2(regs) + def g(k): + for f in fields: + if f.get("key")==k: + return f.get("value") + return None + sample = { + "ts": datetime.now(timezone.utc).isoformat(), + "key": key, + "temp": g("temp"), + "humi": g("humi"), + "pv_volt": g("pv_volt"), + "pv_curr": g("pv_curr"), + "batt_pct": g("batt_pct"), + "uv_state": g("uv_state"), + "fan_state": g("fan_state"), + "lock": g("lock"), + "chg_ing": g("chg_ing"), + "chg_done": g("chg_done"), + "helmet": g("helmet"), + } + dq = TS_BUFFERS.get(key) + if dq is None: + dq = deque(maxlen=TS_MAX) + TS_BUFFERS[key] = dq + dq.append(sample) + except Exception: + pass + await asyncio.sleep(TS_PERIOD_SEC) + + + +def hexdump(b: bytes, width=16) -> str: + def p(x): return chr(x) if 32 <= x < 127 else "." + lines = [] + for i in range(0, len(b), width): + chunk = b[i:i+width] + hexpart = " ".join(f"{c:02x}" for c in chunk) + lines.append(f"{i:04x} {hexpart:<{width*3}} |{''.join(p(c) for c in chunk)}|") + return "\n".join(lines) + +def log_payload(peer: str, data: bytes): + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%fZ") + size = len(data) + print(f"[tcp] {ts} {peer} -> {size} bytes") + sample = data[:LOG_MAX_BYTES] + print(hexdump(sample)) + if size > LOG_MAX_BYTES: + print(f"... (truncated, showed first {LOG_MAX_BYTES} of {size} bytes)") + +# ----- MODBUS RTU helpers ----- +def crc16_modbus(data: bytes) -> int: + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + if crc & 1: crc = (crc >> 1) ^ 0xA001 + else: crc >>= 1 + return crc & 0xFFFF + +def add_crc(frame_wo_crc: bytes) -> bytes: + c = crc16_modbus(frame_wo_crc) + return frame_wo_crc + bytes([c & 0xFF, (c >> 8) & 0xFF]) + +def build_read_holding(unit: int, addr: int, qty: int) -> bytes: + return add_crc(struct.pack(">B B H H", unit, 0x03, addr, qty)) + +def build_write_single(unit: int, addr: int, value: int) -> bytes: + return add_crc(struct.pack(">B B H H", unit, 0x06, addr, value)) + +def clean_hex(s: str) -> str: + s = s.strip().replace("0x","").replace("0X","") + return re.sub(r"[^0-9A-Fa-f]", "", s).rjust((len(s)+1)//2*2, "0") + +def parse_modbus_03_or_06(resp: bytes) -> dict: + buf = resp[:-2] if resp.endswith(b"\r\n") else resp + if len(buf) < 5: return {"error":"frame too short","len":len(buf),"hex":buf.hex()} + uid, fc = buf[0], buf[1] + if fc == 0x03: + bc = buf[2]; data_end = 3 + bc + if data_end + 2 > len(buf): return {"error":"byte_count mismatch","len":len(buf),"hex":buf.hex()} + data = buf[3:data_end] + pcrc = (buf[data_end+1] << 8) | buf[data_end] + ccrc = crc16_modbus(buf[:data_end]) + regs = [int.from_bytes(data[i:i+2],"big") for i in range(0,len(data),2) if i+2<=len(data)] + # 2워드 단위 float(word-swap)도 병행 저장(후보 해석용) + floats_wordswap = [] + if len(data) >= 4 and len(data) % 2 == 0: + for i in range(0, len(data) - 3, 4): + w1, w2 = data[i:i+2], data[i+2:i+4] + try: + floats_wordswap.append(struct.unpack(">f", w2 + w1)[0]) + except Exception: + floats_wordswap.append(float("nan")) + return {"id":uid,"fc":fc,"byte_count":bc,"registers_be":regs,"floats_wordswap":floats_wordswap, + "crc_ok":(pcrc==ccrc),"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()} + if fc == 0x06: + if len(buf) < 8: return {"error":"write echo too short","len":len(buf),"hex":buf.hex()} + addr = (buf[2] << 8) | buf[3]; val = (buf[4] << 8) | buf[5] + pcrc = (buf[7] << 8) | buf[6]; ccrc = crc16_modbus(buf[:6]) + return {"id":uid,"fc":fc,"addr":addr,"value":val,"crc_ok":(pcrc==ccrc), + "crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()} + return {"id":uid,"fc":fc,"hex":buf.hex(),"note":"unsupported fc"} + +# ----- TCP accept ----- +async def handle_conn(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + addr = writer.get_extra_info("peername") + key = f"{addr[0]}:{addr[1]}" + CLIENTS[key] = writer + META[key] = {"addr": addr, "last": datetime.now(timezone.utc)} + INBOX[key] = asyncio.Queue() + print(f"[tcp] connected: {key}") + try: + while True: + try: + chunk = await asyncio.wait_for(reader.read(READ_CHUNK), timeout=1.0) + except asyncio.TimeoutError: + if (datetime.now(timezone.utc) - META[key]["last"]).total_seconds() > IDLE_TIMEOUT: + print(f"[tcp] idle timeout: {key}"); break + continue + except asyncio.IncompleteReadError: + break + if not chunk: break + META[key]["last"] = datetime.now(timezone.utc) + log_payload(key, chunk) + try: INBOX[key].put_nowait(chunk) + except Exception: pass + finally: + try: writer.close(); await writer.wait_closed() + except Exception: pass + CLIENTS.pop(key, None); INBOX.pop(key, None); META.pop(key, None) + print(f"[tcp] disconnected: {key}") + +async def run_tcp(): + kwargs = {} + if os.name != "nt" and hasattr(socket, "SO_REUSEPORT"): + kwargs["reuse_port"] = True + server = await asyncio.start_server(handle_conn, TCP_HOST, TCP_PORT, **kwargs) + addrs = ", ".join(str(s.getsockname()) for s in server.sockets) + print(f"[tcp] listening on {addrs}") + async with server: + await server.serve_forever() + +# ----- FastAPI ----- +from fastapi import FastAPI, HTTPException +from fastapi.responses import HTMLResponse +try: + from fastapi.middleware.cors import CORSMiddleware +except Exception: + CORSMiddleware = None +from pydantic import BaseModel, Field + +app = FastAPI(title="RTU-over-TCP Modbus Master (device connects here)") +if CORSMiddleware: + app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) + +@app.get("/clients") +def list_clients(): + return {"count": len(META), "items": [{"key":k, "last":v["last"].isoformat()} for k,v in META.items()]} + +def _resolve_target(target: str) -> str: + # Accept patterns: + # - "ip:port" exact + # - "ip:*" pick latest connection for that IP + # - "*" or "" pick latest connection regardless of IP (for DHCP devices) + if target in CLIENTS: + return target + # wildcard or empty -> most recent + if not target or str(target).strip() in ("*", "*:*"): + if not META: + raise HTTPException(404, "no active connections") + return max(META.items(), key=lambda kv: kv[1]["last"])[0] + if ":" not in target or target.endswith(":*"): + ip = target.split(":")[0].strip() + cand = [(k, v["last"]) for k, v in META.items() if k.split(":")[0] == ip] + if not cand: + raise HTTPException(404, f"no active connection for ip {ip}") + cand.sort(key=lambda kv: kv[1], reverse=True) + return cand[0][0] + raise HTTPException(404, "target not connected") + + +async def _send_and_wait(key: str, payload: bytes, timeout: float=3.0) -> bytes: + w = CLIENTS.get(key) + if not w: raise HTTPException(404, "target writer missing") + q = INBOX.get(key) + if q is None: raise HTTPException(500, "inbox missing") + while not q.empty(): + try: q.get_nowait() + except Exception: break + w.write(payload); await w.drain() + try: + resp = await asyncio.wait_for(q.get(), timeout=timeout) + except asyncio.TimeoutError: + raise HTTPException(504, "device response timeout") + return resp + +class PushReq(BaseModel): + target: str; data: str + encoding: str = "hex"; append_newline: bool = False + await_response: bool = True; response_timeout_ms: int = 3000 + +@app.post("/push_raw") +async def push_raw(req: PushReq): + key = _resolve_target(req.target) + enc = (req.encoding or "hex").lower() + if enc == "hex": payload = bytes.fromhex(clean_hex(req.data)) + elif enc == "text": payload = req.data.encode() + (b"\n" if req.append_newline else b"") + elif enc == "base64": payload = base64.b64decode(req.data) + else: raise HTTPException(400, "encoding must be hex|text|base64") + if req.await_response: + resp = await _send_and_wait(key, payload, timeout=max(0.001, req.response_timeout_ms/1000.0)) + return {"ok":True,"sent_hex":payload.hex(),"resp_hex":resp.hex(),"resp_preview":resp[:128].decode("utf-8","replace")} + w = CLIENTS[key]; w.write(payload); await w.drain() + return {"ok":True,"sent_hex":payload.hex(),"resp":None} + +class MBReadReq(BaseModel): + target: str = Field(..., description="ip:port") + unit: int = Field(1, ge=0, le=247) + addr: int = Field(..., ge=0, le=0xFFFF) + qty: int = Field(..., ge=1, le=125) + +class MBWriteReq(BaseModel): + target: str; unit: int = Field(1, ge=0, le=247) + addr: int = Field(..., ge=0, le=0xFFFF) + value: int = Field(..., ge=0, le=0xFFFF) + +@app.post("/modbus/read") +async def mb_read(req: MBReadReq): + key = _resolve_target(req.target) + frame = build_read_holding(req.unit, req.addr, req.qty) + parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) + return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed} + +@app.post("/modbus/write") +async def mb_write(req: MBWriteReq): + key = _resolve_target(req.target) + frame = build_write_single(req.unit, req.addr, req.value) + parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) + return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed} + +# ====== Bulk8 (addr=1 qty=30) with NEW mapping ====== +class MBReadBulk8Req(BaseModel): + target: str = Field(..., description="ip:port") + unit: int = Field(1, ge=0, le=247) + +def _words(regs: List[int], a: int, n: int) -> List[int]: + i0 = a - 1 + out = [] + for i in range(n): + j = i0 + i + out.append(regs[j] if 0 <= j < len(regs) else None) + return out + +def _words_hex(regs: List[int], a: int, n: int) -> str: + ws = _words(regs, a, n) + bytes_list = [] + for w in ws: + if w is None: bytes_list += ["..",".."] + else: bytes_list += [f"{(w>>8)&0xFF:02X}", f"{w&0xFF:02X}"] + return " ".join(bytes_list) + +def _triplet_bytes(regs: List[int], a: int) -> Optional[bytes]: + ws = _words(regs, a, 3) + if any(w is None for w in ws): return None + b = bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF, (ws[2]>>8)&0xFF, ws[2]&0xFF]) + return b + +def _plausible(v: float, kind: str) -> bool: + if v is None or v != v: return False + if kind == "temp": return -40.0 <= v <= 120.0 + if kind == "humi": return 0.0 <= v <= 100.0 + if kind == "batt": return 0.0 <= v <= 100.0 + if kind == "volt": return 0.0 <= v <= 60.0 # 태양광 전압 대략 범위(필요시 조정) + if kind == "curr": return -20.0 <= v <= 20.0 + if kind == "gps": return True # 위경도는 후처리에서 포맷 + return True + +def _try_ascii_float(b: bytes) -> Optional[float]: + s = "".join(chr(x) for x in b if 32 <= x <= 126).strip() + if not s: return None + # 허용 문자만 + if not re.fullmatch(r"[0-9\.\-\+\sA-Za-z]*", s): return None + # 숫자/기호만 추출 + m = re.search(r"[-+]?\d+(\.\d+)?", s) + if not m: return None + try: return float(m.group(0)) + except: return None + +def _try_24_or_48bit_scaled(b: bytes, scale: float=100.0) -> Optional[float]: + # 24비트 정수(상위 3바이트) 또는 48비트 정수→scale로 나눔 + if len(b) < 3: return None + i24 = int.from_bytes(b[0:3], "big", signed=False) + v24 = i24/scale + # 48비트도 후보 + i48 = int.from_bytes(b[:6], "big", signed=False) + v48 = i48/scale if i48 < 10**9 else None # 너무 크면 버림 + # 선택 + cand = [v for v in [v24, v48] if v is not None] + return cand[0] if cand else None + +def _try_float32_from_words(w1: int, w2: int, swap: bool) -> Optional[float]: + try: + if swap: + b = bytes([(w2>>8)&0xFF, w2&0xFF, (w1>>8)&0xFF, w1&0xFF]) + else: + b = bytes([(w1>>8)&0xFF, w1&0xFF, (w2>>8)&0xFF, w2&0xFF]) + return struct.unpack(">f", b)[0] + except Exception: + return None + +def _pair_bytes(regs: List[int], a: int) -> Optional[bytes]: + ws = _words(regs, a, 2) + if any(w is None for w in ws): return None + return bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF]) + +def _try_ascii_float4(b: bytes) -> Optional[float]: + s = "".join(chr(x) for x in b if 32 <= x <= 126).strip() + if not s: return None + m = re.search(r"[-+]?\d+(\.\d+)?", s) + if not m: return None + try: return float(m.group(0)) + except: return None + +def _try_int32_scaled(b: bytes, scale: float=100.0) -> Optional[float]: + if len(b) < 4: return None + i32 = int.from_bytes(b[:4], "big", signed=False) + return i32/scale + +def _decode_pair_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]: + b = _pair_bytes(regs, a) + if not b: return (None, "no-bytes") + # 1) ASCII + v = _try_ascii_float4(b) + if v is not None and _plausible(v, kind): return (v, "ascii4") + # 2) float32 swap/be + ws = _words(regs, a, 2) + w1, w2 = ws[0], ws[1] + for swap in (True, False): + v = _try_float32_from_words(w1, w2, swap=swap) + if v is not None and _plausible(v, kind): + return (v, "f32-"+("swap" if swap else "be")) + # 3) int32 /100 then /1000 + for sc, tag in [(100.0,"int32/100"), (1000.0,"int32/1000")]: + v = _try_int32_scaled(b, scale=sc) + if v is not None and _plausible(v, kind): return (v, tag) + return (None, "unknown") + + +def _decode_triplet_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]: + """3워드 필드 디코드. 반환: (value, how)""" + b = _triplet_bytes(regs, a) + if not b: return (None, "no-bytes") + # 1) ASCII 우선 + v = _try_ascii_float(b) + if v is not None and _plausible(v, kind): return (v, "ascii") + # 2) 24/48비트 정수 스케일(기본 /100) + v = _try_24_or_48bit_scaled(b, scale=100.0) + if v is not None and _plausible(v, kind): return (v, "int/100") + # 3) 32비트 float (w1,w2), word-swap/normal 모두 시도 + ws = _words(regs, a, 3) + w1, w2 = ws[0], ws[1] + for swap in (True, False): + v = _try_float32_from_words(w1, w2, swap=swap) + if v is not None and _plausible(v, kind): + return (v, "f32-"+("swap" if swap else "be")) + # 실패 + return (None, "unknown") + +def _decode_bulk_fields_v2(regs: List[int]) -> List[Dict[str, Any]]: + out: List[Dict[str, Any]] = [] + + def add_triplet(addr: int, key: str, name: str, unit: str="", digits:int=2, kind:str="generic", clamp:Tuple[float,float]=None): + v, how = _decode_triplet_numeric(regs, addr, kind) + if clamp and v is not None: + v = max(clamp[0], min(clamp[1], v)) + out.append({ + "addr": addr, "len":3, "key": key, "name": name, "type":"triplet", + "value": v, "unit": unit, "digits": digits, "how": how, + "raw_words": _words_hex(regs, addr, 3) + }) + + def add_u16(addr: int, key: str, name: str): + w = _words(regs, addr, 1)[0] + out.append({"addr": addr, "len":1, "key": key, "name": name, "type":"u16", + "value": w, "unit":"", "raw_words": _words_hex(regs, addr, 1)}) + + # 매핑 적용 + add_triplet(1, "temp", "온도", unit="℃", digits=2, kind="temp", clamp=(-50.0, 120.0)) + add_triplet(4, "humi", "습도", unit="%", digits=2, kind="humi", clamp=(0.0, 100.0)) + add_u16(7, "uv_state", "UV 상태") + add_u16(8, "fan_state", "FAN 상태") + add_u16(9, "lock", "잠김 상태") + add_u16(10, "chg_ing", "충전 중 상태") + add_u16(11, "chg_done", "충전 완료 상태") + add_u16(12, "helmet", "안전모 유무 상태") + add_triplet(13, "pv_volt", "태양광 전압", unit="V", digits=2, kind="volt", clamp=(0.0, 80.0)) + add_triplet(16, "pv_curr", "태양광 전류", unit="A", digits=2, kind="curr", clamp=(-50.0, 50.0)) + lat_v, lat_how = _decode_pair_numeric(regs, 19, "gps") + out.append({"addr":19, "len":2, "key":"gps_lat", "name":"GPS 위도", "type":"pair", "value":lat_v, "unit":"", "digits":6, "how":lat_how, "raw_words": _words_hex(regs, 19, 2)}) + add_u16(21, "gps_stat_lat", "GPS 센서 상태(위도)") + lon_v, lon_how = _decode_pair_numeric(regs, 22, "gps") + out.append({"addr":22, "len":2, "key":"gps_lon", "name":"GPS 경도", "type":"pair", "value":lon_v, "unit":"", "digits":6, "how":lon_how, "raw_words": _words_hex(regs, 22, 2)}) + add_u16(24, "gps_stat_lon", "GPS 센서 상태(경도)") + add_triplet(25, "batt_pct","배터리 잔량", unit="%", digits=1, kind="batt", clamp=(0.0, 100.0)) + + # 참고용 RAW 덤프(미포함 영역만) + covered = set() + for base in (1,4,13,16,25): + covered.update([base, base+1, base+2]) + covered.update([19,20,21,22,23,24]) + covered.update([7,8,9,10,11,12]) + + for a in range(1, 31): + if a in covered: continue + out.append({"addr": a, "len":1, "key": f"raw_{a}", "name": f"RAW@{a}", "type":"u16", + "value": _words(regs, a, 1)[0], "raw_words": _words_hex(regs, a, 1)}) + + out.sort(key=lambda x: x["addr"]) + return out + +@app.post("/modbus/read_bulk8") +async def mb_read_bulk8(req: MBReadBulk8Req): + key = _resolve_target(req.target) + start_addr = 0x0001 + qty = 30 # 슬라이드 8: 1~30 + frame = build_read_holding(req.unit, start_addr, qty) + parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) + if not parsed or "registers_be" not in parsed: + raise HTTPException(502, "invalid device response") + regs = parsed.get("registers_be", []) + fields = _decode_bulk_fields_v2(regs) + return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed, "fields": fields} + + + +# ===== Time-series endpoints ===== +@app.get("/timeseries/keys") +def ts_keys(): + return {"items": [{"key": k, "count": len(v)} for k, v in TS_BUFFERS.items()]} + +class TSSamplesReq(BaseModel): + target: str + limit: int = Field(100, ge=1, le=1000) + +@app.post("/timeseries/get") +def ts_get(req: TSSamplesReq): + key = _resolve_target(req.target) + dq = TS_BUFFERS.get(key) + if not dq: + return {"ok": True, "target": key, "count": 0, "samples": []} + n = min(len(dq), req.limit) + samples = list(dq)[-n:] + return {"ok": True, "target": key, "count": n, "samples": samples} + +# ----- UI ----- +@app.get("/ui", response_class=HTMLResponse) +def ui(): + return """ +RTU-over-TCP Modbus Master + + + +

RTU-over-TCP Modbus Master

+ +
+
+ 1) 연결 선택 + + 표에서 행 클릭 → 선택행 적용 +
+
keylast
+
+ + + +
+
+ +
+
2) 단말 제어작은 버튼
+
+ + + + + +
+
+ +
+
+ 3) 대시보드 + + 주기(초) + +
+
+
온도
대기중…
+
습도
%
대기중…
+
배터리
%
대기중…
+
+
+ +
+
+ 4) 일괄 상태 (PPT 8p 최신 매핑) + + /modbus/read_bulk8: addr=1, qty=30 +
+ + + +
주소항목Raw Words
+
+ +

결과

대기중…
+ + + +""" + +# ----- entrypoint ----- + + +@app.on_event("startup") +async def _start_poller(): + asyncio.create_task(_poll_devices_task()) + +async def run_http(): + import uvicorn + server = uvicorn.Server(uvicorn.Config(app, host=CTRL_HOST, port=CTRL_PORT, log_level="info", reload=False)) + await server.serve() + +async def main(): + await asyncio.gather(run_tcp(), run_http()) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/socket/requirements.txt b/socket/requirements.txt new file mode 100644 index 0000000..19453a1 --- /dev/null +++ b/socket/requirements.txt @@ -0,0 +1,4 @@ +fastapi +uvicorn[standard] +python-multipart +pymodbus>=3.5,<4 \ No newline at end of file diff --git a/socket/tcp_server.py b/socket/tcp_server.py new file mode 100644 index 0000000..b523134 --- /dev/null +++ b/socket/tcp_server.py @@ -0,0 +1,564 @@ +import os +import asyncio +import base64 +import json +from datetime import datetime, timezone +from typing import Dict, Optional, List + +TCP_HOST = os.getenv("TCP_HOST", "0.0.0.0") +TCP_PORT = int(os.getenv("TCP_PORT", "8181")) +MODE = os.getenv("MODE", "stream").lower() +LENGTH_BYTES = int(os.getenv("LENGTH_BYTES", "2")) +READ_CHUNK = int(os.getenv("READ_CHUNK", "65536")) +MAX_MSG = int(os.getenv("MAX_MSG", "1048576")) +IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "90")) +ACK_JSON = b'{"ok":true}\n' + +LOG_MODE = os.getenv("LOG_MODE", "both").lower() +LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", "4096")) + +CTRL_HOST = os.getenv("CTRL_HOST", "0.0.0.0") +CTRL_PORT = int(os.getenv("CTRL_PORT", "8182")) + +CLIENTS: Dict[str, asyncio.StreamWriter] = {} +META: Dict[str, Dict] = {} +DEV_INDEX: Dict[str, str] = {} +INBOX: Dict[str, asyncio.Queue] = {} + +def bytes_to_hex(b: bytes) -> str: + return "".join(f"{x:02x}" for x in b) + +def hexdump(b: bytes, width=16): + def p(x): return chr(x) if 32 <= x < 127 else "." + lines = [] + for i in range(0, len(b), width): + chunk = b[i:i+width] + hexpart = " ".join(f"{c:02x}" for c in chunk) + lines.append(f"{i:04x} {hexpart:<{width*3}} |{''.join(p(c) for c in chunk)}|") + return "\n".join(lines) + +def print_payload(peer, data: bytes): + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%fZ") + size = len(data) + print(f"[tcp] {ts} {peer} -> {size} bytes", flush=True) + sample = data[:LOG_MAX_BYTES] + if LOG_MODE in ("text", "both"): + print("----- TEXT (utf-8, truncated) -----", flush=True) + print(sample.decode("utf-8", errors="replace"), flush=True) + if LOG_MODE in ("hex", "both"): + print("----- HEXDUMP (truncated) -----", flush=True) + print(hexdump(sample), flush=True) + if size > LOG_MAX_BYTES: + print(f"... (truncated, showed first {LOG_MAX_BYTES} of {size} bytes)", flush=True) + +async def read_stream(r: asyncio.StreamReader): return (await r.read(READ_CHUNK)) or None +async def read_line(r: asyncio.StreamReader): return (await r.readline()) or None +async def read_length_prefixed(r: asyncio.StreamReader): + hdr = await r.readexactly(LENGTH_BYTES) + n = int.from_bytes(hdr, "big") + if n < 0 or n > MAX_MSG: return None + return await r.readexactly(n) + +async def handle_conn(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + addr = writer.get_extra_info("peername") + key = f"{addr[0]}:{addr[1]}" + CLIENTS[key] = writer + META[key] = {"addr": addr, "device_id": None, "last": datetime.now(timezone.utc)} + INBOX[key] = asyncio.Queue() + print(f"[tcp] connected: {key} (mode={MODE})", flush=True) + + read_once = read_stream if MODE == "stream" else (read_line if MODE == "line" else read_length_prefixed) + + try: + while True: + try: + chunk = await asyncio.wait_for(read_once(reader), timeout=1.0) + except asyncio.TimeoutError: + if (datetime.now(timezone.utc) - META[key]["last"]).total_seconds() > IDLE_TIMEOUT: + print(f"[tcp] idle timeout: {key}", flush=True) + break + continue + except asyncio.IncompleteReadError: + break + + if not chunk: break + + META[key]["last"] = datetime.now(timezone.utc) + print_payload(key, chunk) + + try: INBOX[key].put_nowait(chunk) + except Exception: pass + + try: + obj = json.loads(chunk.decode("utf-8", errors="ignore")) + if isinstance(obj, dict) and obj.get("device_id"): + META[key]["device_id"] = str(obj["device_id"]) + DEV_INDEX[str(obj["device_id"]) ] = key + except Exception: + pass + + try: + writer.write(ACK_JSON) + await writer.drain() + except ConnectionResetError: + break + + finally: + try: + writer.close() + await writer.wait_closed() + if key in INBOX: + q = INBOX.pop(key) + while not q.empty(): + try: q.get_nowait() + except Exception: break + except Exception: + pass + CLIENTS.pop(key, None) + did = META.get(key, {}).get("device_id") + if did and DEV_INDEX.get(did) == key: DEV_INDEX.pop(did, None) + META.pop(key, None) + print(f"[tcp] disconnected: {key}", flush=True) + +from fastapi import FastAPI, HTTPException, Body, Form +from fastapi.responses import HTMLResponse +from pydantic import BaseModel, Field + +try: + from fastapi.middleware.cors import CORSMiddleware +except Exception: + CORSMiddleware = None + +app = FastAPI(title="TCP Push Control") + +if CORSMiddleware: + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + ) + +class PushReq(BaseModel): + target: str = Field(..., description='"*" | "ip:port" | "dev:"') + data: str = Field(..., description='payload 문자열 (encoding에 따라 해석)') + encoding: str = Field("text", description='"text" | "hex" | "base64"') + append_newline: bool = Field(True, description='text일 때 끝에 \n 추가 여부') + await_response: bool = Field(False, description="보낸 뒤 TCP 응답을 기다려 HTTP로 반환할지") + response_timeout_ms: int = Field(3000, ge=1, le=30000, description="응답 대기 타임아웃(ms)") + +def resolve_targets(target: str) -> List[str]: + if target == "*": return list(CLIENTS.keys()) + if target.startswith("dev:"): + key = DEV_INDEX.get(target[4:]) + return [key] if key else [] + return [target] if target in CLIENTS else [] + +def decode_payload(data: str, enc: str, append_newline: bool) -> bytes: + enc = enc.lower() + if enc == "text": + b = data.encode("utf-8") + return b + (b"\n" if append_newline else b"") + if enc == "hex": + h = "".join(ch for ch in data if ch in "0123456789abcdefABCDEF") + return bytes.fromhex(h) + if enc == "base64": + return base64.b64decode(data) + raise HTTPException(status_code=400, detail="encoding must be text|hex|base64") + +@app.get("/clients") +def list_clients(): + out = [{"key": k, "device_id": m.get("device_id"), "last": m.get("last").isoformat()} for k,m in META.items()] + return {"count": len(out), "items": out} + +# ★ FIX: 경로를 '/push' 로 명시 (기존의 "/" 결합 실수 수정) +@app.post("/push") +async def push(req: PushReq): + keys = resolve_targets(req.target) + if not keys: raise HTTPException(status_code=404, detail="target not found or not connected") + if req.await_response and len(keys) != 1: + raise HTTPException(status_code=400, detail="await_response는 단일 대상일 때만 지원한다") + + payload = decode_payload(req.data, req.encoding, req.append_newline) + + sent, failed, responses = 0, 0, [] + for k in keys: + w = CLIENTS.get(k) + if not w: failed += 1; continue + + if req.await_response: + q = INBOX.get(k) + if q is None: raise HTTPException(status_code=500, detail="internal inbox missing") + while not q.empty(): + try: q.get_nowait() + except Exception: break + + try: + w.write(payload); await w.drain(); sent += 1 + except Exception: + failed += 1; continue + + if req.await_response: + try: + chunk = await asyncio.wait_for(INBOX[k].get(), timeout=req.response_timeout_ms/1000.0) + responses.append({"key": k, "size": len(chunk), "hex": bytes_to_hex(chunk), "text_preview": chunk.decode("utf-8", errors="replace")}) + except asyncio.TimeoutError: + responses.append({"key": k, "timeout": True, "hex": None, "text_preview": None}) + + return {"ok": True, "sent": sent, "failed": failed, "await_response": req.await_response, "responses": responses if req.await_response else None} + +# ---- Parser endpoints ---- +import struct, re + +def crc16_modbus(data: bytes) -> int: + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc >>= 1 + return crc & 0xFFFF + +def _clean_hex(s: str) -> str: + s = s.strip().replace("0x","").replace("0X","") + s = re.sub(r"[^0-9A-Fa-f]", "", s) + if len(s) % 2 == 1: s = "0" + s + return s.lower() + +def parse_modbus_response(hex_str: str): + raw_hex = _clean_hex(hex_str) + if not raw_hex: return {"error": "no hex"} + buf = bytes.fromhex(raw_hex) + if buf.endswith(b"\r\n"): buf = buf[:-2] + if len(buf) < 5: return {"error": "frame too short", "length": len(buf), "raw_hex": raw_hex} + + slave_id, function = buf[0], buf[1] + if function in (1,2,3,4): + byte_count = buf[2] + data_start, data_end = 3, 3 + byte_count + if data_end + 2 > len(buf): return {"error": "byte_count mismatch", "length": len(buf), "raw_hex": raw_hex} + data = buf[data_start:data_end] + provided_crc_lo, provided_crc_hi = buf[data_end], buf[data_end+1] + provided_crc = (provided_crc_hi << 8) | provided_crc_lo + calc_crc = crc16_modbus(buf[:data_end]) + crc_ok = (calc_crc == provided_crc) + regs = [int.from_bytes(data[i:i+2], "big") for i in range(0, len(data), 2) if i+2<=len(data)] + floats = [] + if len(data) >= 4 and len(data) % 4 == 0: + for i in range(0, len(data), 4): + w1, w2 = data[i:i+2], data[i+2:i+4] + floats.append(struct.unpack(">f", w2 + w1)[0]) + return {"id": slave_id, "function": function, "byte_count": byte_count, "data_hex": data.hex(), "registers_be": regs, "floats_wordswap": floats, "crc_provided_hex": f"{provided_crc:04x}", "crc_calculated_hex": f"{calc_crc:04x}", "crc_ok": crc_ok, "frame_hex": buf.hex()} + return {"id": slave_id, "function": function, "frame_hex": buf.hex(), "note": "non-standard or write-response"} + +from fastapi import HTTPException + +@app.post("/test") +def test_parse(hex_form: Optional[str] = Form(None), body: Optional[dict] = Body(None)): + hex_input = hex_form or (body.get("hex") if body else None) or (body.get("data") if body else None) + if not hex_input: + raise HTTPException(status_code=400, detail="hex (또는 data) 가 필요하다") + return {"ok": True, "input": hex_input, "result": parse_modbus_response(hex_input)} + +@app.post("/test_auto") +def test_auto(push_json: dict = Body(...)): + """ + /push 의 JSON 응답을 그대로 넣으면 내부에서 hex를 뽑아 파싱한다. + """ + hexv = None + try: + if push_json and push_json.get("responses"): + for r in push_json["responses"]: + if isinstance(r, dict) and r.get("hex"): + hexv = r["hex"] + break + except Exception: + pass + if not hexv: + raise HTTPException(status_code=400, detail="push JSON에서 hex 응답을 찾지 못함 (target='*'이거나 timeout)") + return {"ok": True, "input": {"from":"test_auto","hex":hexv}, "result": parse_modbus_response(hexv)} + +@app.get("/ui", response_class=HTMLResponse) +def ui(): + return """ + + + +/push 제어 + 데이터 읽기 (auto-parse) + + + + +

/push 제어 + 데이터 읽기

+ +
+
+
+ + +
+
+ + +
특정 ip:port를 선택해야 응답을 기다려 파싱 가능. '*'는 방송용.
+
+
+
연결 조회 중…
+
+ +
+

제어

+
+ + + + + + +
+
+
+ + +
+
+
+ +
+
+ +
+
+
+ +
+
+
+ +
+

데이터 읽기

+
+ + + + + +
+
+ +

/push 응답

대기 중…
+
+

파싱 결과

+
+
CRC 유효성
-
+
레지스터(2B)
-
+
Float(워드스왑)
-
+
파생값
-
+
+
대기 중…
+
+ + + +""" + +async def run_tcp(): + server = await asyncio.start_server(handle_conn, TCP_HOST, TCP_PORT, reuse_port=True) + addrs = ", ".join(str(s.getsockname()) for s in server.sockets) + print(f"[tcp] listening on {addrs} (mode={MODE}, log={LOG_MODE})", flush=True) + async with server: await server.serve_forever() + +async def run_http(): + import uvicorn + server = uvicorn.Server(uvicorn.Config(app, host=CTRL_HOST, port=CTRL_PORT, log_level="info", reload=False)) + await server.serve() + +async def main(): await asyncio.gather(run_tcp(), run_http()) +if __name__ == "__main__": asyncio.run(main())