我们采用三重状态设计:
状态转换触发条件:
def update_state(self):
if not self.has_requests():
self.direction = Direction.IDLE
elif self.current_floor == 1:
self.direction = Direction.UP
elif self.current_floor == self.total_floors:
self.direction = Direction.DOWN采用双队列结构优化:
class RequestQueue:
def __init__(self):
self.up_requests = set() # 上行请求集合
self.down_requests = set() # 下行请求集合电梯像扫描仪一样往返运动:
def scan_algorithm(self):
if self.direction == Direction.UP:
next_floors = [f for f in range(self.current_floor+1, self.total_floors+1)]
else:
next_floors = [f for f in range(self.current_floor-1, 0, -1)]
for floor in next_floors:
if self.should_stop(floor):
return floor考虑三个关键因素:
算法最好情况最差情况FCFSO(1)O(n²)SCANO(n)O(n)SSTFO(nlogn)O(n²)
使用位图压缩请求记录:
self.request_bitmap = 0 # 32位整数存储请求
def add_request(self, floor):
self.request_bitmap |= 1 << floor分层绘制策略:
def draw_floor(stdscr, floor, elevator_pos):
color = curses.COLOR_RED if floor == elevator_pos else curses.COLOR_WHITE
stdscr.addstr(12-floor, 0, f"[{floor:2d}]", curses.color_pair(color))关键指标展示:
class ElevatorGroup:
def __init__(self, num):
self.elevators = [Elevator() for _ in range(num)]
def dispatch(self, floor):
return min(self.elevators, key=lambda e: abs(e.current_floor - floor))通过MQTT协议对接:
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
floor = int(msg.payload.decode())
elevator.add_request(floor)import unittest
class TestElevator(unittest.TestCase):
def test_emergency_stop(self):
e = Elevator()
e.add_request(5, Direction.UP)
e.handle_emergency()
self.assertEqual(e.direction, Direction.IDLE)开发环境:Python 3.10+
生产部署:
nohup python elevator_sim.py --floors 12 --speed 1.5 > log.txt &监控命令:
watch -n 1 'cat log.txt | tail -n 20'