690 lines
31 KiB
Python
690 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
RTU-over-TCP Modbus Master server
|
|
- TCP(8181): device connects
|
|
- HTTP(8182): /ui, /clients, /push_raw, /modbus/read, /modbus/write, /modbus/read_bulk8
|
|
|
|
PPT 8페이지(일괄 읽기) 최신 매핑(사용자 제공):
|
|
온도 : 1~3 (3워드)
|
|
습도 : 4~6 (3워드)
|
|
UV 상태 : 7 (1워드, uint16)
|
|
FAN 상태 : 8 (1워드, uint16)
|
|
잠김 상태 : 9 (1워드, uint16)
|
|
충전 중 상태 : 10 (1워드, uint16)
|
|
충전 완료 상태 : 11 (1워드, uint16)
|
|
안전모 유무 상태 : 12 (1워드, uint16)
|
|
태양광 전압 : 13~15 (3워드)
|
|
태양광 전류 : 16~18 (3워드)
|
|
GPS 위도 : 19~21 (3워드)
|
|
GPS 경도 : 22~24 (3워드)
|
|
배터리 잔량 % : 25~27 (3워드)
|
|
|
|
메모:
|
|
- 3워드 필드는 장치 펌웨어에 따라 ASCII(예: "26.4"), BCD/정수스케일, 또는 32bit float(+예약워드)일 수 있음.
|
|
- 아래 디코더는 우선순위로 [ASCII -> 24/48bit 정수스케일 -> 32bit float(word-swap/normal)]을 시도한다.
|
|
- 값의 합리성 체크(클램프)를 통해 가장 그럴듯한 해석을 최종 선택한다.
|
|
"""
|
|
|
|
import os, asyncio, struct, base64, re, socket
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
TCP_HOST = os.getenv("TCP_HOST", "0.0.0.0")
|
|
TCP_PORT = int(os.getenv("TCP_PORT", "8181"))
|
|
CTRL_HOST = os.getenv("CTRL_HOST", "0.0.0.0")
|
|
CTRL_PORT = int(os.getenv("CTRL_PORT", "8182"))
|
|
IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "120"))
|
|
READ_CHUNK = int(os.getenv("READ_CHUNK", "65536"))
|
|
LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", "4096"))
|
|
|
|
CLIENTS: Dict[str, asyncio.StreamWriter] = {}
|
|
META: Dict[str, Dict] = {}
|
|
INBOX: Dict[str, asyncio.Queue] = {}
|
|
|
|
from collections import deque
|
|
|
|
# ===== In-memory time-series buffers =====
|
|
TS_BUFFERS: Dict[str, deque] = {}
|
|
TS_MAX = int(os.getenv("TS_MAX", "500")) # keep up to 500 samples per device
|
|
TS_PERIOD_SEC = float(os.getenv("TS_PERIOD_SEC", "2.0")) # poll every 2 seconds
|
|
|
|
async def _poll_devices_task():
|
|
"""Periodically read bulk8 (1..30) from all connected devices and store last TS_MAX samples."""
|
|
while True:
|
|
keys = list(CLIENTS.keys())
|
|
for key in keys:
|
|
try:
|
|
start_addr, qty = 0x0001, 30
|
|
frame = build_read_holding(1, start_addr, qty)
|
|
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=2.0))
|
|
regs = parsed.get("registers_be", [])
|
|
if not regs:
|
|
continue
|
|
fields = _decode_bulk_fields_v2(regs)
|
|
def g(k):
|
|
for f in fields:
|
|
if f.get("key")==k:
|
|
return f.get("value")
|
|
return None
|
|
sample = {
|
|
"ts": datetime.now(timezone.utc).isoformat(),
|
|
"key": key,
|
|
"temp": g("temp"),
|
|
"humi": g("humi"),
|
|
"pv_volt": g("pv_volt"),
|
|
"pv_curr": g("pv_curr"),
|
|
"batt_pct": g("batt_pct"),
|
|
"uv_state": g("uv_state"),
|
|
"fan_state": g("fan_state"),
|
|
"lock": g("lock"),
|
|
"chg_ing": g("chg_ing"),
|
|
"chg_done": g("chg_done"),
|
|
"helmet": g("helmet"),
|
|
}
|
|
dq = TS_BUFFERS.get(key)
|
|
if dq is None:
|
|
dq = deque(maxlen=TS_MAX)
|
|
TS_BUFFERS[key] = dq
|
|
dq.append(sample)
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(TS_PERIOD_SEC)
|
|
|
|
|
|
|
|
def hexdump(b: bytes, width=16) -> str:
|
|
def p(x): return chr(x) if 32 <= x < 127 else "."
|
|
lines = []
|
|
for i in range(0, len(b), width):
|
|
chunk = b[i:i+width]
|
|
hexpart = " ".join(f"{c:02x}" for c in chunk)
|
|
lines.append(f"{i:04x} {hexpart:<{width*3}} |{''.join(p(c) for c in chunk)}|")
|
|
return "\n".join(lines)
|
|
|
|
def log_payload(peer: str, data: bytes):
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%fZ")
|
|
size = len(data)
|
|
print(f"[tcp] {ts} {peer} -> {size} bytes")
|
|
sample = data[:LOG_MAX_BYTES]
|
|
print(hexdump(sample))
|
|
if size > LOG_MAX_BYTES:
|
|
print(f"... (truncated, showed first {LOG_MAX_BYTES} of {size} bytes)")
|
|
|
|
# ----- MODBUS RTU helpers -----
|
|
def crc16_modbus(data: bytes) -> int:
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
if crc & 1: crc = (crc >> 1) ^ 0xA001
|
|
else: crc >>= 1
|
|
return crc & 0xFFFF
|
|
|
|
def add_crc(frame_wo_crc: bytes) -> bytes:
|
|
c = crc16_modbus(frame_wo_crc)
|
|
return frame_wo_crc + bytes([c & 0xFF, (c >> 8) & 0xFF])
|
|
|
|
def build_read_holding(unit: int, addr: int, qty: int) -> bytes:
|
|
return add_crc(struct.pack(">B B H H", unit, 0x03, addr, qty))
|
|
|
|
def build_write_single(unit: int, addr: int, value: int) -> bytes:
|
|
return add_crc(struct.pack(">B B H H", unit, 0x06, addr, value))
|
|
|
|
def clean_hex(s: str) -> str:
|
|
s = s.strip().replace("0x","").replace("0X","")
|
|
return re.sub(r"[^0-9A-Fa-f]", "", s).rjust((len(s)+1)//2*2, "0")
|
|
|
|
def parse_modbus_03_or_06(resp: bytes) -> dict:
|
|
buf = resp[:-2] if resp.endswith(b"\r\n") else resp
|
|
if len(buf) < 5: return {"error":"frame too short","len":len(buf),"hex":buf.hex()}
|
|
uid, fc = buf[0], buf[1]
|
|
if fc == 0x03:
|
|
bc = buf[2]; data_end = 3 + bc
|
|
if data_end + 2 > len(buf): return {"error":"byte_count mismatch","len":len(buf),"hex":buf.hex()}
|
|
data = buf[3:data_end]
|
|
pcrc = (buf[data_end+1] << 8) | buf[data_end]
|
|
ccrc = crc16_modbus(buf[:data_end])
|
|
regs = [int.from_bytes(data[i:i+2],"big") for i in range(0,len(data),2) if i+2<=len(data)]
|
|
# 2워드 단위 float(word-swap)도 병행 저장(후보 해석용)
|
|
floats_wordswap = []
|
|
if len(data) >= 4 and len(data) % 2 == 0:
|
|
for i in range(0, len(data) - 3, 4):
|
|
w1, w2 = data[i:i+2], data[i+2:i+4]
|
|
try:
|
|
floats_wordswap.append(struct.unpack(">f", w2 + w1)[0])
|
|
except Exception:
|
|
floats_wordswap.append(float("nan"))
|
|
return {"id":uid,"fc":fc,"byte_count":bc,"registers_be":regs,"floats_wordswap":floats_wordswap,
|
|
"crc_ok":(pcrc==ccrc),"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()}
|
|
if fc == 0x06:
|
|
if len(buf) < 8: return {"error":"write echo too short","len":len(buf),"hex":buf.hex()}
|
|
addr = (buf[2] << 8) | buf[3]; val = (buf[4] << 8) | buf[5]
|
|
pcrc = (buf[7] << 8) | buf[6]; ccrc = crc16_modbus(buf[:6])
|
|
return {"id":uid,"fc":fc,"addr":addr,"value":val,"crc_ok":(pcrc==ccrc),
|
|
"crc_provided":f"{pcrc:04x}","crc_calc":f"{ccrc:04x}","hex":buf.hex()}
|
|
return {"id":uid,"fc":fc,"hex":buf.hex(),"note":"unsupported fc"}
|
|
|
|
# ----- TCP accept -----
|
|
async def handle_conn(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
addr = writer.get_extra_info("peername")
|
|
key = f"{addr[0]}:{addr[1]}"
|
|
CLIENTS[key] = writer
|
|
META[key] = {"addr": addr, "last": datetime.now(timezone.utc)}
|
|
INBOX[key] = asyncio.Queue()
|
|
print(f"[tcp] connected: {key}")
|
|
try:
|
|
while True:
|
|
try:
|
|
chunk = await asyncio.wait_for(reader.read(READ_CHUNK), timeout=1.0)
|
|
except asyncio.TimeoutError:
|
|
if (datetime.now(timezone.utc) - META[key]["last"]).total_seconds() > IDLE_TIMEOUT:
|
|
print(f"[tcp] idle timeout: {key}"); break
|
|
continue
|
|
except asyncio.IncompleteReadError:
|
|
break
|
|
if not chunk: break
|
|
META[key]["last"] = datetime.now(timezone.utc)
|
|
log_payload(key, chunk)
|
|
try: INBOX[key].put_nowait(chunk)
|
|
except Exception: pass
|
|
finally:
|
|
try: writer.close(); await writer.wait_closed()
|
|
except Exception: pass
|
|
CLIENTS.pop(key, None); INBOX.pop(key, None); META.pop(key, None)
|
|
print(f"[tcp] disconnected: {key}")
|
|
|
|
async def run_tcp():
|
|
kwargs = {}
|
|
if os.name != "nt" and hasattr(socket, "SO_REUSEPORT"):
|
|
kwargs["reuse_port"] = True
|
|
server = await asyncio.start_server(handle_conn, TCP_HOST, TCP_PORT, **kwargs)
|
|
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
|
|
print(f"[tcp] listening on {addrs}")
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
# ----- FastAPI -----
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
try:
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
except Exception:
|
|
CORSMiddleware = None
|
|
from pydantic import BaseModel, Field
|
|
|
|
app = FastAPI(title="RTU-over-TCP Modbus Master (device connects here)")
|
|
if CORSMiddleware:
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
|
|
@app.get("/clients")
|
|
def list_clients():
|
|
return {"count": len(META), "items": [{"key":k, "last":v["last"].isoformat()} for k,v in META.items()]}
|
|
|
|
def _resolve_target(target: str) -> str:
|
|
# Accept patterns:
|
|
# - "ip:port" exact
|
|
# - "ip:*" pick latest connection for that IP
|
|
# - "*" or "" pick latest connection regardless of IP (for DHCP devices)
|
|
if target in CLIENTS:
|
|
return target
|
|
# wildcard or empty -> most recent
|
|
if not target or str(target).strip() in ("*", "*:*"):
|
|
if not META:
|
|
raise HTTPException(404, "no active connections")
|
|
return max(META.items(), key=lambda kv: kv[1]["last"])[0]
|
|
if ":" not in target or target.endswith(":*"):
|
|
ip = target.split(":")[0].strip()
|
|
cand = [(k, v["last"]) for k, v in META.items() if k.split(":")[0] == ip]
|
|
if not cand:
|
|
raise HTTPException(404, f"no active connection for ip {ip}")
|
|
cand.sort(key=lambda kv: kv[1], reverse=True)
|
|
return cand[0][0]
|
|
raise HTTPException(404, "target not connected")
|
|
|
|
|
|
async def _send_and_wait(key: str, payload: bytes, timeout: float=3.0) -> bytes:
|
|
w = CLIENTS.get(key)
|
|
if not w: raise HTTPException(404, "target writer missing")
|
|
q = INBOX.get(key)
|
|
if q is None: raise HTTPException(500, "inbox missing")
|
|
while not q.empty():
|
|
try: q.get_nowait()
|
|
except Exception: break
|
|
w.write(payload); await w.drain()
|
|
try:
|
|
resp = await asyncio.wait_for(q.get(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
raise HTTPException(504, "device response timeout")
|
|
return resp
|
|
|
|
class PushReq(BaseModel):
|
|
target: str; data: str
|
|
encoding: str = "hex"; append_newline: bool = False
|
|
await_response: bool = True; response_timeout_ms: int = 3000
|
|
|
|
@app.post("/push_raw")
|
|
async def push_raw(req: PushReq):
|
|
key = _resolve_target(req.target)
|
|
enc = (req.encoding or "hex").lower()
|
|
if enc == "hex": payload = bytes.fromhex(clean_hex(req.data))
|
|
elif enc == "text": payload = req.data.encode() + (b"\n" if req.append_newline else b"")
|
|
elif enc == "base64": payload = base64.b64decode(req.data)
|
|
else: raise HTTPException(400, "encoding must be hex|text|base64")
|
|
if req.await_response:
|
|
resp = await _send_and_wait(key, payload, timeout=max(0.001, req.response_timeout_ms/1000.0))
|
|
return {"ok":True,"sent_hex":payload.hex(),"resp_hex":resp.hex(),"resp_preview":resp[:128].decode("utf-8","replace")}
|
|
w = CLIENTS[key]; w.write(payload); await w.drain()
|
|
return {"ok":True,"sent_hex":payload.hex(),"resp":None}
|
|
|
|
class MBReadReq(BaseModel):
|
|
target: str = Field(..., description="ip:port")
|
|
unit: int = Field(1, ge=0, le=247)
|
|
addr: int = Field(..., ge=0, le=0xFFFF)
|
|
qty: int = Field(..., ge=1, le=125)
|
|
|
|
class MBWriteReq(BaseModel):
|
|
target: str; unit: int = Field(1, ge=0, le=247)
|
|
addr: int = Field(..., ge=0, le=0xFFFF)
|
|
value: int = Field(..., ge=0, le=0xFFFF)
|
|
|
|
@app.post("/modbus/read")
|
|
async def mb_read(req: MBReadReq):
|
|
key = _resolve_target(req.target)
|
|
frame = build_read_holding(req.unit, req.addr, req.qty)
|
|
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
|
|
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed}
|
|
|
|
@app.post("/modbus/write")
|
|
async def mb_write(req: MBWriteReq):
|
|
key = _resolve_target(req.target)
|
|
frame = build_write_single(req.unit, req.addr, req.value)
|
|
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
|
|
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed}
|
|
|
|
# ====== Bulk8 (addr=1 qty=30) with NEW mapping ======
|
|
class MBReadBulk8Req(BaseModel):
|
|
target: str = Field(..., description="ip:port")
|
|
unit: int = Field(1, ge=0, le=247)
|
|
|
|
def _words(regs: List[int], a: int, n: int) -> List[int]:
|
|
i0 = a - 1
|
|
out = []
|
|
for i in range(n):
|
|
j = i0 + i
|
|
out.append(regs[j] if 0 <= j < len(regs) else None)
|
|
return out
|
|
|
|
def _words_hex(regs: List[int], a: int, n: int) -> str:
|
|
ws = _words(regs, a, n)
|
|
bytes_list = []
|
|
for w in ws:
|
|
if w is None: bytes_list += ["..",".."]
|
|
else: bytes_list += [f"{(w>>8)&0xFF:02X}", f"{w&0xFF:02X}"]
|
|
return " ".join(bytes_list)
|
|
|
|
def _triplet_bytes(regs: List[int], a: int) -> Optional[bytes]:
|
|
ws = _words(regs, a, 3)
|
|
if any(w is None for w in ws): return None
|
|
b = bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF, (ws[2]>>8)&0xFF, ws[2]&0xFF])
|
|
return b
|
|
|
|
def _plausible(v: float, kind: str) -> bool:
|
|
if v is None or v != v: return False
|
|
if kind == "temp": return -40.0 <= v <= 120.0
|
|
if kind == "humi": return 0.0 <= v <= 100.0
|
|
if kind == "batt": return 0.0 <= v <= 100.0
|
|
if kind == "volt": return 0.0 <= v <= 60.0 # 태양광 전압 대략 범위(필요시 조정)
|
|
if kind == "curr": return -20.0 <= v <= 20.0
|
|
if kind == "gps": return True # 위경도는 후처리에서 포맷
|
|
return True
|
|
|
|
def _try_ascii_float(b: bytes) -> Optional[float]:
|
|
s = "".join(chr(x) for x in b if 32 <= x <= 126).strip()
|
|
if not s: return None
|
|
# 허용 문자만
|
|
if not re.fullmatch(r"[0-9\.\-\+\sA-Za-z]*", s): return None
|
|
# 숫자/기호만 추출
|
|
m = re.search(r"[-+]?\d+(\.\d+)?", s)
|
|
if not m: return None
|
|
try: return float(m.group(0))
|
|
except: return None
|
|
|
|
def _try_24_or_48bit_scaled(b: bytes, scale: float=100.0) -> Optional[float]:
|
|
# 24비트 정수(상위 3바이트) 또는 48비트 정수→scale로 나눔
|
|
if len(b) < 3: return None
|
|
i24 = int.from_bytes(b[0:3], "big", signed=False)
|
|
v24 = i24/scale
|
|
# 48비트도 후보
|
|
i48 = int.from_bytes(b[:6], "big", signed=False)
|
|
v48 = i48/scale if i48 < 10**9 else None # 너무 크면 버림
|
|
# 선택
|
|
cand = [v for v in [v24, v48] if v is not None]
|
|
return cand[0] if cand else None
|
|
|
|
def _try_float32_from_words(w1: int, w2: int, swap: bool) -> Optional[float]:
|
|
try:
|
|
if swap:
|
|
b = bytes([(w2>>8)&0xFF, w2&0xFF, (w1>>8)&0xFF, w1&0xFF])
|
|
else:
|
|
b = bytes([(w1>>8)&0xFF, w1&0xFF, (w2>>8)&0xFF, w2&0xFF])
|
|
return struct.unpack(">f", b)[0]
|
|
except Exception:
|
|
return None
|
|
|
|
def _pair_bytes(regs: List[int], a: int) -> Optional[bytes]:
|
|
ws = _words(regs, a, 2)
|
|
if any(w is None for w in ws): return None
|
|
return bytes([(ws[0]>>8)&0xFF, ws[0]&0xFF, (ws[1]>>8)&0xFF, ws[1]&0xFF])
|
|
|
|
def _try_ascii_float4(b: bytes) -> Optional[float]:
|
|
s = "".join(chr(x) for x in b if 32 <= x <= 126).strip()
|
|
if not s: return None
|
|
m = re.search(r"[-+]?\d+(\.\d+)?", s)
|
|
if not m: return None
|
|
try: return float(m.group(0))
|
|
except: return None
|
|
|
|
def _try_int32_scaled(b: bytes, scale: float=100.0) -> Optional[float]:
|
|
if len(b) < 4: return None
|
|
i32 = int.from_bytes(b[:4], "big", signed=False)
|
|
return i32/scale
|
|
|
|
def _decode_pair_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]:
|
|
b = _pair_bytes(regs, a)
|
|
if not b: return (None, "no-bytes")
|
|
# 1) ASCII
|
|
v = _try_ascii_float4(b)
|
|
if v is not None and _plausible(v, kind): return (v, "ascii4")
|
|
# 2) float32 swap/be
|
|
ws = _words(regs, a, 2)
|
|
w1, w2 = ws[0], ws[1]
|
|
for swap in (True, False):
|
|
v = _try_float32_from_words(w1, w2, swap=swap)
|
|
if v is not None and _plausible(v, kind):
|
|
return (v, "f32-"+("swap" if swap else "be"))
|
|
# 3) int32 /100 then /1000
|
|
for sc, tag in [(100.0,"int32/100"), (1000.0,"int32/1000")]:
|
|
v = _try_int32_scaled(b, scale=sc)
|
|
if v is not None and _plausible(v, kind): return (v, tag)
|
|
return (None, "unknown")
|
|
|
|
|
|
def _decode_triplet_numeric(regs: List[int], a: int, kind: str) -> Tuple[Optional[float], str]:
|
|
"""3워드 필드 디코드. 반환: (value, how)"""
|
|
b = _triplet_bytes(regs, a)
|
|
if not b: return (None, "no-bytes")
|
|
# 1) ASCII 우선
|
|
v = _try_ascii_float(b)
|
|
if v is not None and _plausible(v, kind): return (v, "ascii")
|
|
# 2) 24/48비트 정수 스케일(기본 /100)
|
|
v = _try_24_or_48bit_scaled(b, scale=100.0)
|
|
if v is not None and _plausible(v, kind): return (v, "int/100")
|
|
# 3) 32비트 float (w1,w2), word-swap/normal 모두 시도
|
|
ws = _words(regs, a, 3)
|
|
w1, w2 = ws[0], ws[1]
|
|
for swap in (True, False):
|
|
v = _try_float32_from_words(w1, w2, swap=swap)
|
|
if v is not None and _plausible(v, kind):
|
|
return (v, "f32-"+("swap" if swap else "be"))
|
|
# 실패
|
|
return (None, "unknown")
|
|
|
|
def _decode_bulk_fields_v2(regs: List[int]) -> List[Dict[str, Any]]:
|
|
out: List[Dict[str, Any]] = []
|
|
|
|
def add_triplet(addr: int, key: str, name: str, unit: str="", digits:int=2, kind:str="generic", clamp:Tuple[float,float]=None):
|
|
v, how = _decode_triplet_numeric(regs, addr, kind)
|
|
if clamp and v is not None:
|
|
v = max(clamp[0], min(clamp[1], v))
|
|
out.append({
|
|
"addr": addr, "len":3, "key": key, "name": name, "type":"triplet",
|
|
"value": v, "unit": unit, "digits": digits, "how": how,
|
|
"raw_words": _words_hex(regs, addr, 3)
|
|
})
|
|
|
|
def add_u16(addr: int, key: str, name: str):
|
|
w = _words(regs, addr, 1)[0]
|
|
out.append({"addr": addr, "len":1, "key": key, "name": name, "type":"u16",
|
|
"value": w, "unit":"", "raw_words": _words_hex(regs, addr, 1)})
|
|
|
|
# 매핑 적용
|
|
add_triplet(1, "temp", "온도", unit="℃", digits=2, kind="temp", clamp=(-50.0, 120.0))
|
|
add_triplet(4, "humi", "습도", unit="%", digits=2, kind="humi", clamp=(0.0, 100.0))
|
|
add_u16(7, "uv_state", "UV 상태")
|
|
add_u16(8, "fan_state", "FAN 상태")
|
|
add_u16(9, "lock", "잠김 상태")
|
|
add_u16(10, "chg_ing", "충전 중 상태")
|
|
add_u16(11, "chg_done", "충전 완료 상태")
|
|
add_u16(12, "helmet", "안전모 유무 상태")
|
|
add_triplet(13, "pv_volt", "태양광 전압", unit="V", digits=2, kind="volt", clamp=(0.0, 80.0))
|
|
add_triplet(16, "pv_curr", "태양광 전류", unit="A", digits=2, kind="curr", clamp=(-50.0, 50.0))
|
|
lat_v, lat_how = _decode_pair_numeric(regs, 19, "gps")
|
|
out.append({"addr":19, "len":2, "key":"gps_lat", "name":"GPS 위도", "type":"pair", "value":lat_v, "unit":"", "digits":6, "how":lat_how, "raw_words": _words_hex(regs, 19, 2)})
|
|
add_u16(21, "gps_stat_lat", "GPS 센서 상태(위도)")
|
|
lon_v, lon_how = _decode_pair_numeric(regs, 22, "gps")
|
|
out.append({"addr":22, "len":2, "key":"gps_lon", "name":"GPS 경도", "type":"pair", "value":lon_v, "unit":"", "digits":6, "how":lon_how, "raw_words": _words_hex(regs, 22, 2)})
|
|
add_u16(24, "gps_stat_lon", "GPS 센서 상태(경도)")
|
|
add_triplet(25, "batt_pct","배터리 잔량", unit="%", digits=1, kind="batt", clamp=(0.0, 100.0))
|
|
|
|
# 참고용 RAW 덤프(미포함 영역만)
|
|
covered = set()
|
|
for base in (1,4,13,16,25):
|
|
covered.update([base, base+1, base+2])
|
|
covered.update([19,20,21,22,23,24])
|
|
covered.update([7,8,9,10,11,12])
|
|
|
|
for a in range(1, 31):
|
|
if a in covered: continue
|
|
out.append({"addr": a, "len":1, "key": f"raw_{a}", "name": f"RAW@{a}", "type":"u16",
|
|
"value": _words(regs, a, 1)[0], "raw_words": _words_hex(regs, a, 1)})
|
|
|
|
out.sort(key=lambda x: x["addr"])
|
|
return out
|
|
|
|
@app.post("/modbus/read_bulk8")
|
|
async def mb_read_bulk8(req: MBReadBulk8Req):
|
|
key = _resolve_target(req.target)
|
|
start_addr = 0x0001
|
|
qty = 30 # 슬라이드 8: 1~30
|
|
frame = build_read_holding(req.unit, start_addr, qty)
|
|
parsed = parse_modbus_03_or_06(await _send_and_wait(key, frame, timeout=3.0))
|
|
if not parsed or "registers_be" not in parsed:
|
|
raise HTTPException(502, "invalid device response")
|
|
regs = parsed.get("registers_be", [])
|
|
fields = _decode_bulk_fields_v2(regs)
|
|
return {"ok": True, "target": key, "tx_hex": frame.hex(), "rx": parsed, "fields": fields}
|
|
|
|
|
|
|
|
# ===== Time-series endpoints =====
|
|
@app.get("/timeseries/keys")
|
|
def ts_keys():
|
|
return {"items": [{"key": k, "count": len(v)} for k, v in TS_BUFFERS.items()]}
|
|
|
|
class TSSamplesReq(BaseModel):
|
|
target: str
|
|
limit: int = Field(100, ge=1, le=1000)
|
|
|
|
@app.post("/timeseries/get")
|
|
def ts_get(req: TSSamplesReq):
|
|
key = _resolve_target(req.target)
|
|
dq = TS_BUFFERS.get(key)
|
|
if not dq:
|
|
return {"ok": True, "target": key, "count": 0, "samples": []}
|
|
n = min(len(dq), req.limit)
|
|
samples = list(dq)[-n:]
|
|
return {"ok": True, "target": key, "count": n, "samples": samples}
|
|
|
|
# ----- UI -----
|
|
@app.get("/ui", response_class=HTMLResponse)
|
|
def ui():
|
|
return """<!doctype html>
|
|
<html lang='ko'><head><meta charset='utf-8'/><title>RTU-over-TCP Modbus Master</title>
|
|
<meta name='viewport' content='width=device-width, initial-scale=1'/>
|
|
<style>
|
|
body{font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,'Noto Sans KR',sans-serif;max-width:1024px;margin:24px auto;padding:0 16px}
|
|
.card{background:#fff;border:1px solid #e5e7eb;border-radius:12px;padding:14px;box-shadow:0 1px 2px rgba(0,0,0,.03);margin-bottom:12px}
|
|
input{padding:6px 8px;border:1px solid #d1d5db;border-radius:8px;font-size:14px}
|
|
button.btn{display:inline-flex;align-items:center;gap:6px;padding:6px 10px;border:1px solid #d1d5db;border-radius:8px;background:#111827;color:#fff;font-size:14px;cursor:pointer}
|
|
button.btn.ghost{background:#fff;color:#111827}
|
|
table{border-collapse:collapse;width:100%} th,td{border:1px solid #e5e7eb;padding:6px 8px;text-align:left;font-size:14px}
|
|
.row{display:flex;gap:8px;flex-wrap:wrap;align-items:center}
|
|
.muted{color:#6b7280;font-size:12px}
|
|
.pill{display:inline-block;padding:4px 8px;border-radius:999px;background:#eef2ff;color:#1e3a8a;font-weight:600}
|
|
.controls .btn{margin:4px 4px 0 0}
|
|
.dash{display:grid;grid-template-columns:repeat(3, minmax(180px, 1fr));gap:10px}
|
|
.stat{background:#0f172a;color:#e5e7eb;border-radius:14px;padding:14px;box-shadow:0 2px 8px rgba(0,0,0,.12)}
|
|
.stat .label{font-size:13px;color:#94a3b8;margin-bottom:6px}
|
|
.stat .value{font-size:40px;line-height:1.1;font-weight:800;letter-spacing:-0.5px}
|
|
.stat .unit{font-size:18px;margin-left:4px;font-weight:600;opacity:.9}
|
|
.stat .sub{font-size:12px;color:#94a3b8;margin-top:6px}
|
|
.kvs td:nth-child(1){width:70px;color:#64748b;font-weight:600}
|
|
.kvs td:nth-child(2){width:220px}
|
|
.kvs td:nth-child(3){width:180px;color:#111827;font-weight:700}
|
|
.kvs td:nth-child(4){color:#6b7280;font-family:ui-monospace, SFMono-Regular, Menlo, Consolas, 'Courier New', monospace}
|
|
</style></head>
|
|
<body>
|
|
<h1>RTU-over-TCP Modbus Master</h1>
|
|
|
|
<div class='card'>
|
|
<div class='row'>
|
|
<span class="pill">1) 연결 선택</span>
|
|
<button class="btn ghost" id='refresh'>연결 목록 새로고침</button>
|
|
<span class="muted">표에서 행 클릭 → 선택행 적용</span>
|
|
</div>
|
|
<table id='tbl'><thead><tr><th>key</th><th>last</th></tr></thead><tbody></tbody></table>
|
|
<div class="row" style="margin-top:8px">
|
|
<input id='target' placeholder='ip 또는 ip:*' style="min-width:260px">
|
|
<input id='unit' type='number' value='1' min='0' max='247' style="width:90px"><label class="row" style="gap:6px;margin-left:8px"><input id="followIp" type="checkbox"> <span>IP로 자동추적</span></label>
|
|
<button class="btn" id='useSel'>선택행 적용</button>
|
|
</div>
|
|
</div>
|
|
|
|
<div class='card'>
|
|
<div class='row'><span class="pill">2) 단말 제어</span><span class="muted">작은 버튼</span></div>
|
|
<div class="controls">
|
|
<button class="btn" onclick="actionWrite(0x0016,1)">잠금해제</button>
|
|
<button class="btn" onclick="actionWrite(0x0014,1)">UV ON</button>
|
|
<button class="btn" onclick="actionWrite(0x0014,0)">UV OFF</button>
|
|
<button class="btn" onclick="actionWrite(0x0015,1)">FAN ON</button>
|
|
<button class="btn" onclick="actionWrite(0x0015,0)">FAN OFF</button>
|
|
</div>
|
|
</div>
|
|
|
|
<div class='card'>
|
|
<div class='row'>
|
|
<span class="pill">3) 대시보드</span>
|
|
<label class="row" style="gap:6px"><input id="auto" type="checkbox" checked> <span>자동 새로고침</span></label>
|
|
<span class="muted">주기(초)</span><input id="period" type="number" value="2" min="1" max="60" style="width:80px">
|
|
<button class="btn ghost" onclick="refreshDashboard()">즉시 갱신</button>
|
|
</div>
|
|
<div class="dash" style="margin-top:10px">
|
|
<div class="stat"><div class="label">온도</div><div><span id="tempVal" class="value">—</span><span class="unit">℃</span></div><div id="tempSub" class="sub">대기중…</div></div>
|
|
<div class="stat"><div class="label">습도</div><div><span id="humiVal" class="value">—</span><span class="unit">%</span></div><div id="humiSub" class="sub">대기중…</div></div>
|
|
<div class="stat"><div class="label">배터리</div><div><span id="battVal" class="value">—</span><span class="unit">%</span></div><div id="battSub" class="sub">대기중…</div></div>
|
|
</div>
|
|
</div>
|
|
|
|
<div class='card'>
|
|
<div class='row'>
|
|
<span class="pill">4) 일괄 상태 (PPT 8p 최신 매핑)</span>
|
|
<button class="btn ghost" onclick="refreshBulk8()">일괄 갱신</button>
|
|
<span class="muted">/modbus/read_bulk8: addr=1, qty=30</span>
|
|
</div>
|
|
<table id="tbl8" class="kvs">
|
|
<thead><tr><th>주소</th><th>항목</th><th>값</th><th>Raw Words</th></tr></thead>
|
|
<tbody></tbody>
|
|
</table>
|
|
</div>
|
|
|
|
<div class='card'><h2>결과</h2><pre id='out'>대기중…</pre></div>
|
|
|
|
<script>
|
|
let selectedKey = ""; let timer = null;
|
|
async function loadClients(){
|
|
const res = await fetch('/clients'); const j = await res.json();
|
|
const tbody = document.querySelector('#tbl tbody'); tbody.innerHTML='';
|
|
for(const it of (j.items||[])){
|
|
const tr = document.createElement('tr');
|
|
tr.innerHTML = `<td>${it.key}</td><td>${it.last}</td>`;
|
|
tr.onclick = () => { [...tbody.children].forEach(x => x.style.background=''); tr.style.background = '#f1f5f9'; selectedKey = it.key; const ip = it.key.split(':')[0]; if(document.getElementById('followIp').checked){ document.getElementById('target').value = ip + ':*'; } else { document.getElementById('target').value = it.key; } };
|
|
tbody.appendChild(tr);
|
|
}
|
|
}
|
|
function parseNum(v){ const s=(v||'').toString(); return s.startsWith('0x')||s.startsWith('0X') ? parseInt(s,16) : parseInt(s,10); }
|
|
function getTU(){
|
|
const t = document.getElementById('target').value.trim() || selectedKey;
|
|
const u = parseNum(document.getElementById('unit').value||'1');
|
|
if(!t) throw new Error('target을 선택/입력하세요'); return {target:t, unit:u};
|
|
}
|
|
document.getElementById('refresh').onclick=loadClients;
|
|
document.getElementById('useSel').onclick=()=>{ if(!selectedKey){ alert('표에서 연결을 선택하세요'); return; } const ip = selectedKey.split(':')[0]; document.getElementById('target').value = document.getElementById('followIp').checked ? (ip+':*') : selectedKey; };
|
|
|
|
async function actionWrite(addr, value){
|
|
try{
|
|
const {target, unit} = getTU();
|
|
const res=await fetch('/modbus/write',{method:'POST',headers:{'Content-Type':'application/json'}, body:JSON.stringify({target,unit,addr,value})});
|
|
document.getElementById('out').textContent=await res.text();
|
|
}catch(e){ alert(e.message); }
|
|
}
|
|
|
|
function fmt(v, digits=1){ if(v===null||v===undefined||isNaN(v)) return '—'; return Number(v).toFixed(digits); }
|
|
function nowStr(){ return new Date().toLocaleString(); }
|
|
|
|
function renderBulkTable(fields){
|
|
const tbody = document.querySelector('#tbl8 tbody'); tbody.innerHTML = "";
|
|
for(const f of fields.sort((a,b)=>a.addr-b.addr)){
|
|
const valStr = (typeof f.value==='number') ? fmt(f.value, f.digits||2) + (f.unit||'') : '—';
|
|
const tr = document.createElement('tr');
|
|
tr.innerHTML = `<td>${f.addr}${f.len===3?'~'+(f.addr+2):''}</td><td>${f.name}</td><td>${valStr}</td><td>${f.raw_words}</td>`;
|
|
tbody.appendChild(tr);
|
|
}
|
|
}
|
|
|
|
async function refreshBulk8(){
|
|
try{
|
|
const {target, unit} = getTU();
|
|
const res=await fetch('/modbus/read_bulk8',{method:'POST',headers:{'Content-Type':'application/json'}, body:JSON.stringify({target,unit})});
|
|
const j = await res.json(); if(!j.ok) throw new Error('read_bulk8 failed');
|
|
renderBulkTable(j.fields||[]);
|
|
const find = (k)=> (j.fields||[]).find(x=>x.key===k);
|
|
const t = find('temp'); const h = find('humi'); const b = find('batt_pct');
|
|
if(t){ document.getElementById('tempVal').textContent = fmt(t.value, t.digits||1); document.getElementById('tempSub').textContent = "갱신: "+nowStr(); }
|
|
if(h){ document.getElementById('humiVal').textContent = fmt(h.value, h.digits||1); document.getElementById('humiSub').textContent = "갱신: "+nowStr(); }
|
|
if(b){ document.getElementById('battVal').textContent = fmt(b.value, b.digits||0); document.getElementById('battSub').textContent = "갱신: "+nowStr(); }
|
|
document.getElementById('out').textContent = JSON.stringify(j, null, 2);
|
|
}catch(e){
|
|
document.getElementById('tempSub').textContent = "에러: " + e.message;
|
|
document.getElementById('humiSub').textContent = "에러: " + e.message;
|
|
document.getElementById('battSub').textContent = "에러: " + e.message;
|
|
}
|
|
}
|
|
|
|
async function refreshDashboard(){ await refreshBulk8(); }
|
|
function setupAuto(){ const auto=document.getElementById('auto'), period=document.getElementById('period'); if(timer){clearInterval(timer); timer=null;} if(auto.checked){ const ms=Math.max(1, parseInt(period.value||"2",10))*1000; timer=setInterval(refreshDashboard, ms);} }
|
|
document.getElementById('auto').addEventListener('change', setupAuto);
|
|
document.getElementById('period').addEventListener('change', setupAuto);
|
|
|
|
document.getElementById('followIp').checked = (localStorage.getItem('followIp')||'0')==='1';document.getElementById('followIp').addEventListener('change', e=>localStorage.setItem('followIp', e.target.checked?'1':'0'));loadClients(); setupAuto(); setTimeout(refreshDashboard, 200);
|
|
</script>
|
|
</body></html>
|
|
"""
|
|
|
|
# ----- entrypoint -----
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def _start_poller():
|
|
asyncio.create_task(_poll_devices_task())
|
|
|
|
async def run_http():
|
|
import uvicorn
|
|
server = uvicorn.Server(uvicorn.Config(app, host=CTRL_HOST, port=CTRL_PORT, log_level="info", reload=False))
|
|
await server.serve()
|
|
|
|
async def main():
|
|
await asyncio.gather(run_tcp(), run_http())
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|