Files
openpilot/selfdrive/controls/controlsd.py

357 lines
15 KiB
Python
Raw Permalink Normal View History

2026-01-11 18:23:29 +08:00
#!/usr/bin/env python3
import math
from typing import SupportsFloat
from cereal import car, log
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog
import numpy as np
from collections import deque
from opendbc.car.car_helpers import interfaces
from opendbc.car.vehicle_model import VehicleModel
from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature, get_lag_adjusted_curvature
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.common.realtime import DT_CTRL, DT_MDL
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N
from selfdrive.modeld.modeld import LAT_SMOOTH_SECONDS
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
State = log.SelfdriveState.OpenpilotState
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
class Controls:
def __init__(self) -> None:
self.params = Params()
cloudlog.info("controlsd is waiting for CarParams")
self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
cloudlog.info("controlsd got CarParams")
self.CI = interfaces[self.CP.carFingerprint](self.CP)
self.disable_dm = False
self.sm = messaging.SubMaster(['liveDelay', 'liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
'carrotMan', 'lateralPlan', 'radarState',
'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
self.pm = messaging.PubMaster(['carControl', 'controlsState'])
self.steer_limited_by_controls = False
self.curvature = 0.0
self.desired_curvature = 0.0
self.pose_calibrator = PoseCalibrator()
self.calibrated_pose: Pose | None = None
self.side_state = {
"left": {"main": {"dRel": None, "lat": None}, "sub": {"dRel": None, "lat": None}},
"right": {"main": {"dRel": None, "lat": None}, "sub": {"dRel": None, "lat": None}},
}
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI)
def update(self):
self.sm.update(15)
if self.sm.updated["liveCalibration"]:
self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
if self.sm.updated["livePose"]:
device_pose = Pose.from_live_pose(self.sm['livePose'])
self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
def state_control(self):
CS = self.sm['carState']
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(lp.steerRatio, 0.1) * self.params.get_float("SteerRatioRate") * 0.01
custom_sr = self.params.get_float("CustomSR") * 0.1
sr = max(custom_sr if custom_sr > 1.0 else sr, 0.1)
self.VM.update_params(x, sr)
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
torque_params.frictionCoefficientFiltered)
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.sm['selfdriveState'].enabled
# carrot
gear = car.CarState.GearShifter
driving_gear = CS.gearShifter not in (gear.neutral, gear.park, gear.reverse, gear.unknown)
alkas = self.params.get_int("AlwaysOnLKAS") != 0
lateral_enabled = driving_gear and alkas
#self.soft_hold_active = CS.softHoldActive #car.OnroadEvent.EventName.softHold in [e.name for e in self.sm['onroadEvents']]
# Check which actuators can be enabled
standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
CC.latActive = ((self.sm['selfdriveState'].active or lateral_enabled) and CS.latEnabled and
not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill)
CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
self.LoC.reset()
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
t_since_plan = (self.sm.frame - self.sm.recv_frame['longitudinalPlan']) * DT_CTRL
accel, aTarget, jerk = self.LoC.update(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan, self.sm['radarState'])
actuators.accel = float(accel)
actuators.aTarget = float(aTarget)
actuators.jerk = float(jerk)
# Steering PID loop and lateral MPC
lat_plan = self.sm['lateralPlan']
curve_speed_abs = abs(self.sm['carrotMan'].vTurnSpeed)
self.lanefull_mode_enabled = (lat_plan.useLaneLines and curve_speed_abs > self.params.get_int("UseLaneLineCurveSpeed"))
lat_smooth_seconds = self.params.get_float("LatSmoothSec") * 0.01
steer_actuator_delay = self.params.get_float("SteerActuatorDelay") * 0.01
if steer_actuator_delay == 0.0:
steer_actuator_delay = self.sm['liveDelay'].lateralDelay
if not CC.latActive:
new_desired_curvature = self.curvature
elif self.lanefull_mode_enabled:
if len(lat_plan.curvatures) == 0:
new_desired_curvature = self.curvature
else:
def smooth_value(val, prev_val, tau):
alpha = 1 - np.exp(-DT_CTRL / tau) if tau > 0 else 1
return alpha * val + (1 - alpha) * prev_val
curvature = get_lag_adjusted_curvature(self.CP, CS.vEgo, lat_plan.psis, lat_plan.curvatures, steer_actuator_delay + lat_smooth_seconds, lat_plan.distances)
new_desired_curvature = smooth_value(curvature, self.desired_curvature, lat_smooth_seconds)
else:
new_desired_curvature = model_v2.action.desiredCurvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
actuators.curvature = float(self.desired_curvature)
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited_by_controls, self.desired_curvature,
CC, curvature_limited,
model_data=self.sm['modelV2'])
actuators.torque = float(steer)
actuators.steeringAngleDeg = float(steeringAngleDeg)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, SupportsFloat):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
return CC, lac_log
def _update_side(self, side: str, leads2, road_edge, bsd_state, hudControl):
def ema(prev, curr, a=0.02):
return curr if prev is None else prev * (1 - a) + curr * a
def set_hud(side_cap, name, val):
setattr(hudControl, f"lead{side_cap}{name}", float(val if val is not None else 0.0))
st = self.side_state[side]
if road_edge <= 2.0 or not leads2:
st["main"] = {"dRel": None, "lat": None}
st["sub"] = {"dRel": None, "lat": None}
if not bsd_state:
return
lead_main = leads2[0] if len(leads2) > 0 else None
side_cap = side.capitalize()
if bsd_state:
set_hud(side_cap, "Dist2", 1)
set_hud(side_cap, "Lat2", 3.2)
# 첫 번째가 10m 이내라면 sub 업데이트 + 두 번째를 main으로
elif len(leads2) > 1 and lead_main.dRel < 10:
st["sub"]["dRel"] = ema(st["sub"]["dRel"], lead_main.dRel)
st["sub"]["lat"] = ema(st["sub"]["lat"], abs(lead_main.dPath))
set_hud(side_cap, "Dist2", st["sub"]["dRel"])
set_hud(side_cap, "Lat2", st["sub"]["lat"])
lead_main = leads2[1]
if len(leads2) > 0:
st["main"]["dRel"] = ema(st["main"]["dRel"], lead_main.dRel)
st["main"]["lat"] = ema(st["main"]["lat"], abs(lead_main.dPath))
set_hud(side_cap, "Dist", st["main"]["dRel"])
set_hud(side_cap, "Lat", st["main"]["lat"])
def publish(self, CC, lac_log):
CS = self.sm['carState']
# Orientation and angle rates can be useful for carcontroller
# Only calibrated (car) frame is relevant for the carcontroller
if self.calibrated_pose is not None:
CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
#acceleration_value = list(self.sm['liveLocationKalman'].accelerationCalibrated.value)
#if len(acceleration_value) > 2:
# if abs(acceleration_value[0]) > 16.0:
# print("Collision detected. disable openpilot, restart")
# self.params.put_bool("OpenpilotEnabledToggle", False)
# self.params.put_int("SoftRestartTriggered", 1)
CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
desired_kph = min(CS.vCruiseCluster, self.sm['carrotMan'].desiredSpeed)
setSpeed = float(desired_kph * CV.KPH_TO_MS)
speeds = self.sm['longitudinalPlan'].speeds
if len(speeds):
CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
vCluRatio = CS.vCluRatio if CS.vCluRatio > 0.5 else 1.0
setSpeed = speeds[-1] / vCluRatio
hudControl = CC.hudControl
hudControl.activeCarrot = self.sm['carrotMan'].activeCarrot
hudControl.atcDistance = self.sm['carrotMan'].xDistToTurn
lp = self.sm['longitudinalPlan']
if self.CP.pcmCruise:
speed_from_pcm = self.params.get_int("SpeedFromPCM")
if speed_from_pcm == 1: #toyota
hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
elif speed_from_pcm == 2:
hudControl.setSpeed = float(max(30/3.6, desired_kph * CV.KPH_TO_MS))
elif speed_from_pcm == 3: # honda
hudControl.setSpeed = setSpeed if lp.xState == 3 else float(desired_kph * CV.KPH_TO_MS)
else:
hudControl.setSpeed = float(max(30/3.6, setSpeed))
else:
hudControl.setSpeed = setSpeed if lp.xState == 3 else float(desired_kph * CV.KPH_TO_MS)
hudControl.speedVisible = CC.enabled
hudControl.lanesVisible = CC.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
radarState = self.sm['radarState']
leadOne = radarState.leadOne
hudControl.leadDistance = leadOne.dRel if leadOne.status else 0
hudControl.leadRelSpeed = leadOne.vRel if leadOne.status else 0
hudControl.leadRadar = 1 if leadOne.radar else 0
hudControl.leadDPath = leadOne.dPath
meta = self.sm['modelV2'].meta
hudControl.modelDesire = 1 if meta.desire == log.Desire.turnLeft else 2 if meta.desire == log.Desire.turnRight else 0
self._update_side("left", radarState.leadsLeft2, meta.distanceToRoadEdgeLeft, CS.leftBlindspot, hudControl)
self._update_side("right", radarState.leadsRight2, meta.distanceToRoadEdgeRight, CS.rightBlindspot, hudControl)
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
if self.sm.valid['driverAssistance']:
hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
if self.sm['selfdriveState'].active:
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited_by_controls = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited_by_controls = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2
# TODO: both controlsState and carControl valids should be set by
# sm.all_checks(), but this creates a circular dependency
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
cs = dat.controlsState
cs.curvature = self.curvature
cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
cs.desiredCurvature = self.desired_curvature
cs.longControlState = self.LoC.long_control_state
cs.upAccelCmd = float(self.LoC.pid.p)
cs.uiAccelCmd = float(self.LoC.pid.i)
cs.ufAccelCmd = float(self.LoC.pid.f)
cs.forceDecel = bool(self.sm['selfdriveState'].state == State.softDisabling)
lat_tuning = self.CP.lateralTuning.which()
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
cs.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
cs.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
cs.lateralControlState.torqueState = lac_log
cs.activeLaneLine = self.lanefull_mode_enabled
self.pm.send('controlsState', dat)
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
def run(self):
rk = Ratekeeper(100, print_delay_threshold=None)
while True:
self.update()
CC, lac_log = self.state_control()
self.publish(CC, lac_log)
rk.monitor_time()
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.run()
if __name__ == "__main__":
main()