# -*- coding: utf-8 -*- """ RTU-over-TCP Modbus Master server - TCP(8181): device connects - HTTP(8182): /ui, /clients, /push_raw, /modbus/read, /modbus/write, /modbus/read_bulk8 PPT 8페이지(일괄 읽기) 최신 매핑(사용자 제공): 온도 : 1~3 (3워드) 습도 : 4~6 (3워드) UV 상태 : 7 (1워드, uint16) FAN 상태 : 8 (1워드, uint16) 잠김 상태 : 9 (1워드, uint16) 충전 중 상태 : 10 (1워드, uint16) 충전 완료 상태 : 11 (1워드, uint16) 안전모 유무 상태 : 12 (1워드, uint16) 태양광 전압 : 13~15 (3워드) 태양광 전류 : 16~18 (3워드) GPS 위도 : 19~21 (3워드) GPS 경도 : 22~24 (3워드) 배터리 잔량 % : 25~27 (3워드) 메모: - 3워드 필드는 장치 펌웨어에 따라 ASCII(예: "26.4"), BCD/정수스케일, 또는 32bit float(+예약워드)일 수 있음. - 아래 디코더는 우선순위로 [ASCII -> 24/48bit 정수스케일 -> 32bit float(word-swap/normal)]을 시도한다. - 값의 합리성 체크(클램프)를 통해 가장 그럴듯한 해석을 최종 선택한다. """ import os, asyncio, struct, base64, re, socket, pymysql from datetime import datetime, timezone from typing import Dict, List, Any, Optional, Tuple TCP_HOST = os.getenv("TCP_HOST", "0.0.0.0") TCP_PORT = int(os.getenv("TCP_PORT", "8181")) CTRL_HOST = os.getenv("CTRL_HOST", "0.0.0.0") CTRL_PORT = int(os.getenv("CTRL_PORT", "8182")) IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "120")) READ_CHUNK = int(os.getenv("READ_CHUNK", "65536")) LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", "4096")) CLIENTS: Dict[str, asyncio.StreamWriter] = {} META: Dict[str, Dict] = {} INBOX: Dict[str, asyncio.Queue] = {} from collections import deque # ===== In-memory time-series buffers ===== TS_BUFFERS: Dict[str, deque] = {} TS_MAX = int(os.getenv("TS_MAX", "500")) # keep up to 500 samples per device TS_PERIOD_SEC = float(os.getenv("TS_PERIOD_SEC", "2.0")) # poll every 2 seconds # db연결 conn = pymysql.connect(host='49.238.167.71', user='helmet', password='helmet2824', db='helmet', charset='utf8') cur = conn.cursor() async def _poll_devices_task(): """Periodically read bulk8 (1..30) from all connected devices and store last TS_MAX samples.""" while True: keys = list(CLIENTS.keys()) for key in keys: try: start_addr, qty = 0x0001, 30 frame = build_read_holding(1, start_addr, qty) parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=2.0)) regs = parsed.get("registers_be", []) if not regs: continue fields = _decode_bulk_fields_v2(regs) def g(k): for f in fields: if f.get("key")==k: return f.get("value") return None sample = { "ts": datetime.now(timezone.utc).isoformat(), "key": key, "temp": g("temp"), "humi": g("humi"), "pv_volt": g("pv_volt"), "pv_curr": g("pv_curr"), "batt_pct": g("batt_pct"), "uv_state": g("uv_state"), "fan_state": g("fan_state"), "lock": g("lock"), "chg_ing": g("chg_ing"), "chg_done": g("chg_done"), "helmet": g("helmet"), } dq = TS_BUFFERS.get(key) if dq is None: dq = deque(maxlen=TS_MAX) TS_BUFFERS[key] = dq dq.append(sample) except Exception: pass await asyncio.sleep(TS_PERIOD_SEC) def hexdump(b: bytes, width=16) -> str: def p(x): return chr(x) if 32 <= x < 127 else "." lines = [] for i in range(0, len(b), width): chunk = b[i:i+width] hexpart = " ".join(f"{c:02x}" for c in chunk) lines.append(f"{i:04x} {hexpart:<{width*3}} |{''.join(p(c) for c in chunk)}|") return "\n".join(lines) def log_payload(peer: str, data: bytes): ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%fZ") size = len(data) print(f"[tcp] {ts} {peer} -> {size} bytes") sample = data[:LOG_MAX_BYTES] print(hexdump(sample)) if size > LOG_MAX_BYTES: print(f"... (truncated, showed first {LOG_MAX_BYTES} of {size} bytes)") # ----- MODBUS RTU helpers ----- def crc16_modbus(data: bytes) -> int: crc = 0xFFFF for b in data: crc ^= b for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF def add_crc(frame_wo_crc: bytes) -> bytes: c = crc16_modbus(frame_wo_crc) return frame_wo_crc + bytes([c & 0xFF, (c >> 8) & 0xFF]) def build_read_holding(unit: int, addr: int, qty: int) -> bytes: return add_crc(struct.pack(">B B H H", unit, 0x03, addr, qty)) def build_write_single(unit: int, addr: int, value: int) -> bytes: return add_crc(struct.pack(">B B H H", unit, 0x06, addr, value)) def clean_hex(s: str) -> str: s = s.strip().replace("0x","").replace("0X","") return re.sub(r"[^0-9A-Fa-f]", "", s).rjust((len(s)+1)//2*2, "0") def parse_modbus_03_or_06(resp: bytes) -> dict: buf = resp[:-2] if resp.endswith(b"\r\n") else resp if len(buf) < 5: return {"error":"frame too short","len":len(buf),"hex":buf.hex()} uid, fc = buf[0], buf[1] if fc == 0x03: bc = buf[2]; data_end = 3 + bc if data_end + 2 > len(buf): return {"error":"byte_count mismatch","len":len(buf),"hex":buf.hex()} data = buf[3:data_end] pcrc = (buf[data_end+1] << 8) | buf[data_end] ccrc = crc16_modbus(buf[:data_end]) regs = [int.from_bytes(data[i:i+2],"big") for i in range(0,len(data),2) if i+2<=len(data)] # 2워드 단위 float(word-swap)도 병행 저장(후보 해석용) floats_wordswap = [] if len(data) >= 4 and len(data) % 2 == 0: for i in range(0, len(data) - 3, 4): w1, w2 = data[i:i+2], data[i+2:i+4] try: floats_wordswap.append(struct.unpack(">f", w2 + w1)[0]) except Exception: floats_wordswap.append(float("nan")) return {"id":uid,"fc":fc,"byte_count":bc,"registers_be":regs,"floats_wordswap":floats_wordswap, "crc_ok":(pcrc==ccrc),"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()} if fc == 0x06: if len(buf) < 8: return {"error":"write echo too short","len":len(buf),"hex":buf.hex()} addr = (buf[2] << 8) | buf[3]; val = (buf[4] << 8) | buf[5] pcrc = (buf[7] << 8) | buf[6]; ccrc = crc16_modbus(buf[:6]) return {"id":uid,"fc":fc,"addr":addr,"value":val,"crc_ok":(pcrc==ccrc), "crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()} return {"id":uid,"fc":fc,"hex":buf.hex(),"note":"unsupported fc"} # ----- TCP accept ----- async def handle_conn(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info("peername") key = f"{addr[0]}:{addr[1]}" CLIENTS[key] = writer META[key] = {"addr": addr, "last": datetime.now(timezone.utc)} INBOX[key] = asyncio.Queue() print(f"[tcp] connected: {key}") try: while True: try: chunk = await asyncio.wait_for(reader.read(READ_CHUNK), timeout=1.0) except asyncio.TimeoutError: if (datetime.now(timezone.utc) - META[key]["last"]).total_seconds() > IDLE_TIMEOUT: print(f"[tcp] idle timeout: {key}"); break continue except asyncio.IncompleteReadError: break if not chunk: break META[key]["last"] = datetime.now(timezone.utc) log_payload(key, chunk) try: INBOX[key].put_nowait(chunk) except Exception: pass finally: try: writer.close(); await writer.wait_closed() except Exception: pass CLIENTS.pop(key, None); INBOX.pop(key, None); META.pop(key, None) print(f"[tcp] disconnected: {key}") async def run_tcp(): kwargs = {} if os.name != "nt" and hasattr(socket, "SO_REUSEPORT"): kwargs["reuse_port"] = True server = await asyncio.start_server(handle_conn, TCP_HOST, TCP_PORT, **kwargs) addrs = ", ".join(str(s.getsockname()) for s in server.sockets) print(f"[tcp] listening on {addrs}") async with server: await server.serve_forever() # ----- FastAPI ----- from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse try: from fastapi.middleware.cors import CORSMiddleware except Exception: CORSMiddleware = None from pydantic import BaseModel, Field app = FastAPI(title="RTU-over-TCP Modbus Master (device connects here)") if CORSMiddleware: app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @app.get("/clients") def list_clients(): return {"count": len(META), "items": [{"key":k, "last":v["last"].isoformat()} for k,v in META.items()]} def _resolve_target(target: str) -> str: # Accept patterns: # - "ip:port" exact # - "ip:*" pick latest connection for that IP # - "*" or "" pick latest connection regardless of IP (for DHCP devices) if target in CLIENTS: return target # wildcard or empty -> most recent if not target or str(target).strip() in ("*", "*:*"): if not META: raise HTTPException(404, "no active connections") return max(META.items(), key=lambda kv: kv[1]["last"])[0] if ":" not in target or target.endswith(":*"): ip = target.split(":")[0].strip() cand = [(k, v["last"]) for k, v in META.items() if k.split(":")[0] == ip] if not cand: raise HTTPException(404, f"no active connection for ip {ip}") cand.sort(key=lambda kv: kv[1], reverse=True) return cand[0][0] raise HTTPException(404, "target not connected") async def _send_and_wait(key: str, payload: bytes, timeout: float=3.0) -> bytes: w = CLIENTS.get(key) if not w: raise HTTPException(404, "target writer missing") q = INBOX.get(key) if q is None: raise HTTPException(500, "inbox missing") while not q.empty(): try: q.get_nowait() except Exception: break w.write(payload); await w.drain() try: resp = await asyncio.wait_for(q.get(), timeout=timeout) except asyncio.TimeoutError: raise HTTPException(504, "device response timeout") return resp class PushReq(BaseModel): target: str; data: str encoding: str = "hex"; append_newline: bool = False await_response: bool = True; response_timeout_ms: int = 3000 @app.post("/push_raw") async def push_raw(req: PushReq): key = _resolve_target(req.target) enc = (req.encoding or "hex").lower() if enc == "hex": payload = bytes.fromhex(clean_hex(req.data)) elif enc == "text": payload = req.data.encode() + (b"\n" if req.append_newline else b"") elif enc == "base64": payload = base64.b64decode(req.data) else: raise HTTPException(400, "encoding must be hex|text|base64") if req.await_response: resp = await _send_and_wait(key, payload, timeout=max(0.001, req.response_timeout_ms/1000.0)) return {"ok":True,"sent_hex":payload.hex(),"resp_hex":resp.hex(),"resp_preview":resp[:128].decode("utf-8","replace")} w = CLIENTS[key]; w.write(payload); await w.drain() return {"ok":True,"sent_hex":payload.hex(),"resp":None} class MBReadReq(BaseModel): target: str = Field(..., description="ip:port") unit: int = Field(1, ge=0, le=247) addr: int = Field(..., ge=0, le=0xFFFF) qty: int = Field(..., ge=1, le=125) class MBWriteReq(BaseModel): target: str; unit: int = Field(1, ge=0, le=247) addr: int = Field(..., ge=0, le=0xFFFF) value: int = Field(..., ge=0, le=0xFFFF) @app.post("/modbus/read") async def mb_read(req: MBReadReq): key = _resolve_target(req.target) frame = build_read_holding(req.unit, req.addr, req.qty) parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed} @app.post("/modbus/write") async def mb_write(req: MBWriteReq): key = _resolve_target(req.target) frame = build_write_single(req.unit, req.addr, req.value) parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed} # ====== Bulk8 (addr=1 qty=30) with NEW mapping ====== class MBReadBulk8Req(BaseModel): target: str = Field(..., description="ip:port") unit: int = Field(1, ge=0, le=247) def _words(regs: List[int], a: int, n: int) -> List[int]: i0 = a - 1 out = [] for i in range(n): j = i0 + i out.append(regs[j] if 0 <= j < len(regs) else None) return out def _words_hex(regs: List[int], a: int, n: int) -> str: ws = _words(regs, a, n) bytes_list = [] for w in ws: if w is None: bytes_list += ["..",".."] else: bytes_list += [f"{(w>>8)&0xFF:02X}", f"{w&0xFF:02X}"] return " ".join(bytes_list) def _triplet_bytes(regs: List[int], a: int) -> Optional[bytes]: ws = _words(regs, a, 3) if any(w is None for w in ws): return None b = bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF, (ws[2]>>8)&0xFF, ws[2]&0xFF]) return b def _plausible(v: float, kind: str) -> bool: if v is None or v != v: return False if kind == "temp": return -40.0 <= v <= 120.0 if kind == "humi": return 0.0 <= v <= 100.0 if kind == "batt": return 0.0 <= v <= 100.0 if kind == "volt": return 0.0 <= v <= 60.0 # 태양광 전압 대략 범위(필요시 조정) if kind == "curr": return -20.0 <= v <= 20.0 if kind == "gps": return True # 위경도는 후처리에서 포맷 return True def _try_ascii_float(b: bytes) -> Optional[float]: s = "".join(chr(x) for x in b if 32 <= x <= 126).strip() if not s: return None # 허용 문자만 if not re.fullmatch(r"[0-9\.\-\+\sA-Za-z]*", s): return None # 숫자/기호만 추출 m = re.search(r"[-+]?\d+(\.\d+)?", s) if not m: return None try: return float(m.group(0)) except: return None def _try_24_or_48bit_scaled(b: bytes, scale: float=100.0) -> Optional[float]: # 24비트 정수(상위 3바이트) 또는 48비트 정수→scale로 나눔 if len(b) < 3: return None i24 = int.from_bytes(b[0:3], "big", signed=False) v24 = i24/scale # 48비트도 후보 i48 = int.from_bytes(b[:6], "big", signed=False) v48 = i48/scale if i48 < 10**9 else None # 너무 크면 버림 # 선택 cand = [v for v in [v24, v48] if v is not None] return cand[0] if cand else None def _try_float32_from_words(w1: int, w2: int, swap: bool) -> Optional[float]: try: if swap: b = bytes([(w2>>8)&0xFF, w2&0xFF, (w1>>8)&0xFF, w1&0xFF]) else: b = bytes([(w1>>8)&0xFF, w1&0xFF, (w2>>8)&0xFF, w2&0xFF]) return struct.unpack(">f", b)[0] except Exception: return None def _pair_bytes(regs: List[int], a: int) -> Optional[bytes]: ws = _words(regs, a, 2) if any(w is None for w in ws): return None return bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF]) def _try_ascii_float4(b: bytes) -> Optional[float]: s = "".join(chr(x) for x in b if 32 <= x <= 126).strip() if not s: return None m = re.search(r"[-+]?\d+(\.\d+)?", s) if not m: return None try: return float(m.group(0)) except: return None def _try_int32_scaled(b: bytes, scale: float=100.0) -> Optional[float]: if len(b) < 4: return None i32 = int.from_bytes(b[:4], "big", signed=False) return i32/scale def _decode_pair_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]: b = _pair_bytes(regs, a) if not b: return (None, "no-bytes") # 1) ASCII v = _try_ascii_float4(b) if v is not None and _plausible(v, kind): return (v, "ascii4") # 2) float32 swap/be ws = _words(regs, a, 2) w1, w2 = ws[0], ws[1] for swap in (True, False): v = _try_float32_from_words(w1, w2, swap=swap) if v is not None and _plausible(v, kind): return (v, "f32-"+("swap" if swap else "be")) # 3) int32 /100 then /1000 for sc, tag in [(100.0,"int32/100"), (1000.0,"int32/1000")]: v = _try_int32_scaled(b, scale=sc) if v is not None and _plausible(v, kind): return (v, tag) return (None, "unknown") def _decode_triplet_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]: """3워드 필드 디코드. 반환: (value, how)""" b = _triplet_bytes(regs, a) if not b: return (None, "no-bytes") # 1) ASCII 우선 v = _try_ascii_float(b) if v is not None and _plausible(v, kind): return (v, "ascii") # 2) 24/48비트 정수 스케일(기본 /100) v = _try_24_or_48bit_scaled(b, scale=100.0) if v is not None and _plausible(v, kind): return (v, "int/100") # 3) 32비트 float (w1,w2), word-swap/normal 모두 시도 ws = _words(regs, a, 3) w1, w2 = ws[0], ws[1] for swap in (True, False): v = _try_float32_from_words(w1, w2, swap=swap) if v is not None and _plausible(v, kind): return (v, "f32-"+("swap" if swap else "be")) # 실패 return (None, "unknown") def _decode_bulk_fields_v2(regs: List[int]) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] def add_triplet(addr: int, key: str, name: str, unit: str="", digits:int=2, kind:str="generic", clamp:Tuple[float,float]=None): v, how = _decode_triplet_numeric(regs, addr, kind) if clamp and v is not None: v = max(clamp[0], min(clamp[1], v)) out.append({ "addr": addr, "len":3, "key": key, "name": name, "type":"triplet", "value": v, "unit": unit, "digits": digits, "how": how, "raw_words": _words_hex(regs, addr, 3) }) def add_u16(addr: int, key: str, name: str): w = _words(regs, addr, 1)[0] out.append({"addr": addr, "len":1, "key": key, "name": name, "type":"u16", "value": w, "unit":"", "raw_words": _words_hex(regs, addr, 1)}) # 매핑 적용 add_triplet(1, "temp", "온도", unit="℃", digits=2, kind="temp", clamp=(-50.0, 120.0)) add_triplet(4, "humi", "습도", unit="%", digits=2, kind="humi", clamp=(0.0, 100.0)) add_u16(7, "uv_state", "UV 상태") add_u16(8, "fan_state", "FAN 상태") add_u16(9, "lock", "잠김 상태") add_u16(10, "chg_ing", "충전 중 상태") add_u16(11, "chg_done", "충전 완료 상태") add_u16(12, "helmet", "안전모 유무 상태") add_triplet(13, "pv_volt", "태양광 전압", unit="V", digits=2, kind="volt", clamp=(0.0, 80.0)) add_triplet(16, "pv_curr", "태양광 전류", unit="A", digits=2, kind="curr", clamp=(-50.0, 50.0)) lat_v, lat_how = _decode_pair_numeric(regs, 19, "gps") out.append({"addr":19, "len":2, "key":"gps_lat", "name":"GPS 위도", "type":"pair", "value":lat_v, "unit":"", "digits":6, "how":lat_how, "raw_words": _words_hex(regs, 19, 2)}) add_u16(21, "gps_stat_lat", "GPS 센서 상태(위도)") lon_v, lon_how = _decode_pair_numeric(regs, 22, "gps") out.append({"addr":22, "len":2, "key":"gps_lon", "name":"GPS 경도", "type":"pair", "value":lon_v, "unit":"", "digits":6, "how":lon_how, "raw_words": _words_hex(regs, 22, 2)}) add_u16(24, "gps_stat_lon", "GPS 센서 상태(경도)") add_triplet(25, "batt_pct","배터리 잔량", unit="%", digits=1, kind="batt", clamp=(0.0, 100.0)) # 참고용 RAW 덤프(미포함 영역만) covered = set() for base in (1,4,13,16,25): covered.update([base, base+1, base+2]) covered.update([19,20,21,22,23,24]) covered.update([7,8,9,10,11,12]) for a in range(1, 31): if a in covered: continue out.append({"addr": a, "len":1, "key": f"raw_{a}", "name": f"RAW@{a}", "type":"u16", "value": _words(regs, a, 1)[0], "raw_words": _words_hex(regs, a, 1)}) out.sort(key=lambda x: x["addr"]) return out @app.post("/modbus/read_bulk8") async def mb_read_bulk8(req: MBReadBulk8Req): key = _resolve_target(req.target) start_addr = 0x0001 qty = 30 # 슬라이드 8: 1~30 frame = build_read_holding(req.unit, start_addr, qty) parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0)) if not parsed or "registers_be" not in parsed: raise HTTPException(502, "invalid device response") regs = parsed.get("registers_be", []) fields = _decode_bulk_fields_v2(regs) return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed, "fields": fields} # ===== Time-series endpoints ===== @app.get("/timeseries/keys") def ts_keys(): return {"items": [{"key": k, "count": len(v)} for k, v in TS_BUFFERS.items()]} class TSSamplesReq(BaseModel): target: str limit: int = Field(100, ge=1, le=1000) @app.post("/timeseries/get") def ts_get(req: TSSamplesReq): key = _resolve_target(req.target) dq = TS_BUFFERS.get(key) if not dq: return {"ok": True, "target": key, "count": 0, "samples": []} n = min(len(dq), req.limit) samples = list(dq)[-n:] # db 저장 now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cur.execute('insert into sensor_data values (null, "{}", "{}", {}, {}, {}, {})'.format(samples[0]['key'], now, samples[0]['lock'], samples[0]['helmet'], samples[0]['uv_state'], samples[0]['fan_state'])) conn.commit() cur.execute('insert into env_data values (null, "{}", "{}", {}, {}, {}, {})'.format(samples[0]['key'], now, 1, 1, samples[0]['temp'], samples[0]['humi'])) conn.commit() cur.execute('insert into battery values (null, "{}", "{}", {}, {}, {}, {})'.format(samples[0]['key'], now, samples[0]['batt_pct'], samples[0]['pv_volt'], samples[0]['pv_curr'], samples[0]['chg_done'])) conn.commit() return {"ok": True, "target": key, "count": n, "samples": samples} # ----- UI ----- @app.get("/ui", response_class=HTMLResponse) def ui(): return """ RTU-over-TCP Modbus Master

RTU-over-TCP Modbus Master

1) 연결 선택 표에서 행 클릭 → 선택행 적용
keylast
2) 단말 제어작은 버튼
3) 대시보드 주기(초)
온도
대기중…
습도
%
대기중…
배터리
%
대기중…
4) 일괄 상태 (PPT 8p 최신 매핑) /modbus/read_bulk8: addr=1, qty=30
주소항목Raw Words

결과

대기중…
""" # ----- entrypoint ----- @app.on_event("startup") async def _start_poller(): asyncio.create_task(_poll_devices_task()) async def run_http(): import uvicorn server = uvicorn.Server(uvicorn.Config(app, host=CTRL_HOST, port=CTRL_PORT, log_level="info", reload=False)) await server.serve() async def main(): await asyncio.gather(run_tcp(), run_http()) if __name__ == "__main__": asyncio.run(main())