Files
openpilot/selfdrive/controls/beep.py
Comma Device 3721ecbf8a Release 260111
2026-01-11 18:23:29 +08:00

140 lines
4.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import subprocess
import time
from cereal import car, messaging
from openpilot.common.realtime import Ratekeeper
import threading
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
class Beepd:
def __init__(self):
self.current_alert = AudibleAlert.none
self.enable_gpio()
self.startup_beep()
def enable_gpio(self):
# 尝试 export忽略已 export 的错误
try:
subprocess.run("echo 42 | sudo tee /sys/class/gpio/export",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
except Exception:
pass
subprocess.run("echo \"out\" | sudo tee /sys/class/gpio/gpio42/direction",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def _beep(self, on):
val = "1" if on else "0"
subprocess.run(f"echo \"{val}\" | sudo tee /sys/class/gpio/gpio42/value",
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
encoding='utf8')
def engage(self):
self._beep(True)
time.sleep(0.05)
self._beep(False)
def disengage(self):
for _ in range(2):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def warning(self):
for _ in range(3):
self._beep(True)
time.sleep(0.01)
self._beep(False)
time.sleep(0.01)
def startup_beep(self):
self._beep(True)
time.sleep(0.1)
self._beep(False)
def ding(self):
self._beep(True)
time.sleep(0.02)
self._beep(False)
def dong(self):
self._beep(True)
time.sleep(0.03)
self._beep(False)
def beep(self):
self._beep(True)
time.sleep(0.04)
self._beep(False)
def dispatch_beep(self, func):
threading.Thread(target=func, daemon=True).start()
def update_alert(self, new_alert):
if new_alert != self.current_alert:
self.current_alert = new_alert
print(f"[BEEP] New alert: {new_alert}")
if new_alert == AudibleAlert.engage:
self.dispatch_beep(self.engage)
elif new_alert == AudibleAlert.disengage:
self.dispatch_beep(self.disengage)
elif new_alert in [AudibleAlert.refuse, AudibleAlert.prompt, AudibleAlert.warningImmediate,AudibleAlert.warningSoft]:
self.dispatch_beep(self.warning)
elif new_alert in [AudibleAlert.longEngaged, AudibleAlert.longDisengaged, AudibleAlert.trafficSignGreen, AudibleAlert.trafficSignChanged, AudibleAlert.trafficError, AudibleAlert.bsdWarning, AudibleAlert.laneChange]:
self.dispatch_beep(self.ding)
elif new_alert in [AudibleAlert.stopStop, AudibleAlert.stopping, AudibleAlert.autoHold, AudibleAlert.engage2, AudibleAlert.disengage2, AudibleAlert.speedDown, AudibleAlert.audioTurn, AudibleAlert.reverseGear]:
self.dispatch_beep(self.dong)
elif new_alert in [AudibleAlert.audio1, AudibleAlert.audio2, AudibleAlert.audio3, AudibleAlert.audio4, AudibleAlert.audio5,
AudibleAlert.audio6, AudibleAlert.audio7, AudibleAlert.audio8, AudibleAlert.audio9, AudibleAlert.audio10]:
self.dispatch_beep(self.beep)
def get_audible_alert(self, sm):
if sm.updated['selfdriveState']:
new_alert = sm['selfdriveState'].alertSound.raw
self.update_alert(new_alert)
def test_beepd_thread(self):
frame = 0
rk = Ratekeeper(20)
pm = messaging.PubMaster(['selfdriveState'])
while True:
cs = messaging.new_message('selfdriveState')
if frame == 40:
cs.selfdriveState.alertSound = AudibleAlert.engage
if frame == 60:
cs.selfdriveState.alertSound = AudibleAlert.disengage
if frame == 80:
cs.selfdriveState.alertSound = AudibleAlert.prompt
pm.send("selfdriveState", cs)
frame += 1
rk.keep_time()
def beepd_thread(self, test=False):
if test:
threading.Thread(target=self.test_beepd_thread, daemon=True).start()
sm = messaging.SubMaster(['selfdriveState'])
rk = Ratekeeper(20)
while True:
sm.update(0)
self.get_audible_alert(sm)
rk.keep_time()
def main():
s = Beepd()
s.beepd_thread(test=False) # 改成 True 可启用模拟测试数据
if __name__ == "__main__":
main()