import fcntl import json import math import os import socket import struct import subprocess import threading import time import numpy as np from datetime import datetime from ftplib import FTP from cereal import log import cereal.messaging as messaging from openpilot.common.realtime import Ratekeeper from openpilot.common.params import Params from openpilot.common.filter_simple import MyMovingAverage from openpilot.system.hardware import PC, TICI from openpilot.selfdrive.navd.helpers import Coordinate from opendbc.car.common.conversions import Conversions as CV from openpilot.common.gps import get_gps_location_service nav_type_mapping = { 12: ("turn", "left", 1), 16: ("turn", "sharp left", 1), 1000: ("turn", "slight left", 1), 1001: ("turn", "slight right", 2), 1002: ("fork", "slight left", 3), 1003: ("fork", "slight right", 4), 1006: ("off ramp", "left", 3), 1007: ("off ramp", "right", 4), 13: ("turn", "right", 2), 19: ("turn", "sharp right", 2), 102: ("off ramp", "slight left", 3), 105: ("off ramp", "slight left", 3), 112: ("off ramp", "slight left", 3), 115: ("off ramp", "slight left", 3), 101: ("off ramp", "slight right", 4), 104: ("off ramp", "slight right", 4), 111: ("off ramp", "slight right", 4), 114: ("off ramp", "slight right", 4), 7: ("fork", "left", 3), 44: ("fork", "left", 3), 17: ("fork", "left", 3), 75: ("fork", "left", 3), 76: ("fork", "left", 3), 118: ("fork", "left", 3), 6: ("fork", "right", 4), 43: ("fork", "right", 4), 73: ("fork", "right", 4), 74: ("fork", "right", 4), 123: ("fork", "right", 4), 124: ("fork", "right", 4), 117: ("fork", "right", 4), 131: ("rotary", "slight right", 5), 132: ("rotary", "slight right", 5), 140: ("rotary", "slight left", 5), 141: ("rotary", "slight left", 5), 133: ("rotary", "right", 5), 134: ("rotary", "sharp right", 5), 135: ("rotary", "sharp right", 5), 136: ("rotary", "sharp left", 5), 137: ("rotary", "sharp left", 5), 138: ("rotary", "sharp left", 5), 139: ("rotary", "left", 5), 142: ("rotary", "straight", 5), 14: ("turn", "uturn", 5), 201: ("arrive", "straight", 5), 51: ("notification", "straight", None), 52: ("notification", "straight", None), 53: ("notification", "straight", None), 54: ("notification", "straight", None), 55: ("notification", "straight", None), 153: ("", "", 6), #TG 154: ("", "", 6), #TG 249: ("", "", 6) #TG } import collections class CarrotServ: def __init__(self): self.params = Params() self.params_memory = Params("/dev/shm/params") self.nRoadLimitSpeed = 30 self.nRoadLimitSpeed_last = 30 self.nRoadLimitSpeed_counter = 0 self.active_carrot = 0 ## 1: CarrotMan Active, 2: sdi active , 3: speed decel active, 4: section active, 5: bump active, 6: speed limit active self.active_count = 0 self.active_sdi_count = 0 self.active_sdi_count_max = 200 # 20 sec self.active_kisa_count = 0 self.nSdiType = -1 self.nSdiSpeedLimit = 0 self.nSdiSection = 0 self.nSdiDist = 0 self.nSdiBlockType = -1 self.nSdiBlockSpeed = 0 self.nSdiBlockDist = 0 self.nTBTDist = 0 self.nTBTTurnType = -1 self.szTBTMainText = "" self.szNearDirName = "" self.szFarDirName = "" self.nTBTNextRoadWidth = 0 self.nTBTDistNext = 0 self.nTBTTurnTypeNext = -1 self.szTBTMainTextNext = "" self.nGoPosDist = 0 self.nGoPosTime = 0 self.szPosRoadName = "" self.nSdiPlusType = -1 self.nSdiPlusSpeedLimit = 0 self.nSdiPlusDist = 0 self.nSdiPlusBlockType = -1 self.nSdiPlusBlockSpeed = 0 self.nSdiPlusBlockDist = 0 self.goalPosX = 0.0 self.goalPosY = 0.0 self.szGoalName = "" self.vpPosPointLatNavi = 0.0 self.vpPosPointLonNavi = 0.0 self.vpPosPointLat = 0.0 self.vpPosPointLon = 0.0 self.roadcate = 8 self.nPosSpeed = 0.0 self.nPosAngle = 0.0 self.nPosAnglePhone = 0.0 self.diff_angle_count = 0 self.last_calculate_gps_time = 0 self.last_update_gps_time = 0 self.last_update_gps_time_phone = 0 self.last_update_gps_time_navi = 0 self.bearing_offset = 0.0 self.bearing_measured = 0.0 self.bearing = 0.0 self.gps_valid = False self.phone_gps_accuracy = 0.0 self.gps_accuracy_device = 0.0 self.phone_latitude = 0.0 self.phone_longitude = 0.0 self.phone_gps_frame = 0 self.totalDistance = 0 self.xSpdLimit = 0 self.xSpdDist = 0 self.xSpdType = -1 self.xTurnInfo = -1 self.xDistToTurn = 0 self.xTurnInfoNext = -1 self.xDistToTurnNext = 0 self.navType, self.navModifier = "invalid", "" self.navTypeNext, self.navModifierNext = "invalid", "" self.carrotIndex = 0 self.carrotCmdIndex = 0 self.carrotCmd = "" self.carrotArg = "" self.carrotCmdIndex_last = 0 self.traffic_light_q = collections.deque(maxlen=int(2.0/0.1)) # 2 secnods self.traffic_light_count = -1 self.traffic_state = 0 self.left_spd_sec = 0 self.left_tbt_sec = 0 self.left_sec = 100 self.max_left_sec = 100 self.carrot_left_sec = 100 self.sdi_inform = False self.atc_paused = False self.atc_activate_count = 0 self.gas_override_speed = 0 self.gas_pressed_state = False self.source_last = "none" self.debugText = "" # 默认语言,稍后在 update_params 中从 Params 读取覆盖, # 规则:main_ko -> 韩语;main_zh-CHS -> 中文;其他 -> 英文 self.lang = "en" self.update_params() def update_params(self): self.autoNaviSpeedBumpSpeed = float(self.params.get_int("AutoNaviSpeedBumpSpeed")) self.autoNaviSpeedBumpTime = float(self.params.get_int("AutoNaviSpeedBumpTime")) self.autoNaviSpeedCtrlEnd = float(self.params.get_int("AutoNaviSpeedCtrlEnd")) self.autoNaviSpeedCtrlMode = self.params.get_int("AutoNaviSpeedCtrlMode") self.autoNaviSpeedSafetyFactor = float(self.params.get_int("AutoNaviSpeedSafetyFactor")) * 0.01 self.autoNaviSpeedDecelRate = float(self.params.get_int("AutoNaviSpeedDecelRate")) * 0.01 self.autoNaviCountDownMode = self.params.get_int("AutoNaviCountDownMode") self.turnSpeedControlMode= self.params.get_int("TurnSpeedControlMode") self.mapTurnSpeedFactor= self.params.get_float("MapTurnSpeedFactor") * 0.01 self.autoTurnControlSpeedTurn = self.params.get_int("AutoTurnControlSpeedTurn") self.autoTurnMapChange = self.params.get_int("AutoTurnMapChange") self.autoTurnControl = self.params.get_int("AutoTurnControl") self.autoTurnControlTurnEnd = self.params.get_int("AutoTurnControlTurnEnd") #self.autoNaviSpeedDecelRate = float(self.params.get_int("AutoNaviSpeedDecelRate")) * 0.01 self.autoCurveSpeedLowerLimit = int(self.params.get("AutoCurveSpeedLowerLimit")) self.is_metric = self.params.get_bool("IsMetric") self.autoRoadSpeedLimitOffset = self.params.get_int("AutoRoadSpeedLimitOffset") # 读取语言设置:优先使用 LanguageSetting,与 UI 保持一致;回退读取可能存在的 "lang" try: lang_val = self.params.get('LanguageSetting', encoding='utf8') or self.params.get('lang', encoding='utf8') except Exception: lang_val = None if isinstance(lang_val, bytes): try: lang_val = lang_val.decode('utf8') except Exception: lang_val = None if lang_val == "main_ko": self.lang = "ko" elif lang_val == "main_zh-CHS": self.lang = "zh" else: self.lang = "en" def _update_cmd(self): if self.carrotCmdIndex != self.carrotCmdIndex_last: self.carrotCmdIndex_last = self.carrotCmdIndex command_handlers = { "DETECT": self._handle_detect_command, } handler = command_handlers.get(self.carrotCmd) if handler: handler(self.carrotArg) self.traffic_light_q.append((-1, -1, "none", 0.0)) self.traffic_light_count -= 1 if self.traffic_light_count < 0: self.traffic_light_count = -1 self.traffic_state = 0 def _handle_detect_command(self, xArg): elements = [e.strip() for e in xArg.split(',')] if len(elements) >= 4: try: state = elements[0] value1 = float(elements[1]) value2 = float(elements[2]) value3 = float(elements[3]) self.traffic_light(value1, value2, state, value3) self.traffic_light_count = int(0.5 / 0.1) except ValueError: pass def traffic_light(self, x, y, color, cnf): traffic_red = 0 traffic_green = 0 traffic_left = 0 traffic_red_trig = 0 traffic_green_trig = 0 traffic_left_trig = 0 for pdata in self.traffic_light_q: px, py, pcolor,pcnf = pdata if abs(x - px) < 0.2 and abs(y - py) < 0.2: if pcolor in ["Green Light", "Left turn"]: if color in ["Red Light", "Yellow Light"]: traffic_red_trig += cnf traffic_red += cnf elif color in ["Green Light", "Left turn"]: traffic_green += cnf elif pcolor in ["Red Light", "Yellow Light"]: if color in ["Green Light"]: #, "Left turn"]: traffic_green_trig += cnf traffic_green += cnf elif color in ["Left turn"]: traffic_left_trig += cnf traffic_left += cnf elif color in ["Red Light", "Yellow Light"]: traffic_red += cnf #print(self.traffic_light_q) if traffic_red_trig > 0: self.traffic_state = 1 #self._add_log("Red light triggered") #print("Red light triggered") elif traffic_green_trig > 0 and traffic_green > traffic_red: #주변에 red light의 cnf보다 더 크면 출발... 감지오류로 출발하는경우가 생김. self.traffic_state = 2 #self._add_log("Green light triggered") #print("Green light triggered") elif traffic_left_trig > 0: self.traffic_state = 3 elif traffic_red > 0: self.traffic_state = 1 #self._add_log("Red light continued") #print("Red light continued") elif traffic_green > 0: self.traffic_state = 2 #self._add_log("Green light continued") #print("Green light continued") else: self.traffic_state = 0 #print("TrafficLight none") self.traffic_light_q.append((x,y,color,cnf)) def calculate_current_speed(self, left_dist, safe_speed_kph, safe_time, safe_decel_rate): safe_speed = safe_speed_kph / 3.6 safe_dist = safe_speed * safe_time decel_dist = left_dist - safe_dist if decel_dist <= 0: return safe_speed_kph # v_i^2 = v_f^2 + 2ad temp = safe_speed**2 + 2 * safe_decel_rate * decel_dist # 공식에서 감속 적용 if temp < 0: speed_mps = safe_speed else: speed_mps = math.sqrt(temp) return max(safe_speed_kph, min(250, speed_mps * 3.6)) def _update_tbt(self): #xTurnInfo : 1: left turn, 2: right turn, 3: left lane change, 4: right lane change, 5: rotary, 6: tg, 7: arrive or uturn turn_type_mapping = { 12: ("turn", "left", 1), 16: ("turn", "sharp left", 1), 13: ("turn", "right", 2), 19: ("turn", "sharp right", 2), 102: ("off ramp", "slight left", 3), 105: ("off ramp", "slight left", 3), 112: ("off ramp", "slight left", 3), 115: ("off ramp", "slight left", 3), 101: ("off ramp", "slight right", 4), 104: ("off ramp", "slight right", 4), 111: ("off ramp", "slight right", 4), 114: ("off ramp", "slight right", 4), 7: ("fork", "left", 3), 44: ("fork", "left", 3), 17: ("fork", "left", 3), 75: ("fork", "left", 3), 76: ("fork", "left", 3), 118: ("fork", "left", 3), 6: ("fork", "right", 4), 43: ("fork", "right", 4), 73: ("fork", "right", 4), 74: ("fork", "right", 4), 123: ("fork", "right", 4), 124: ("fork", "right", 4), 117: ("fork", "right", 4), 131: ("rotary", "slight right", 5), 132: ("rotary", "slight right", 5), 140: ("rotary", "slight left", 5), 141: ("rotary", "slight left", 5), 133: ("rotary", "right", 5), 134: ("rotary", "sharp right", 5), 135: ("rotary", "sharp right", 5), 136: ("rotary", "sharp left", 5), 137: ("rotary", "sharp left", 5), 138: ("rotary", "sharp left", 5), 139: ("rotary", "left", 5), 142: ("rotary", "straight", 5), 14: ("turn", "uturn", 7), 201: ("arrive", "straight", 8), 51: ("notification", "straight", 0), 52: ("notification", "straight", 0), 53: ("notification", "straight", 0), 54: ("notification", "straight", 0), 55: ("notification", "straight", 0), 153: ("", "", 6), #TG 154: ("", "", 6), #TG 249: ("", "", 6) #TG } if self.nTBTTurnType in turn_type_mapping: self.navType, self.navModifier, self.xTurnInfo = turn_type_mapping[self.nTBTTurnType] else: self.navType, self.navModifier, self.xTurnInfo = "invalid", "", -1 if self.nTBTTurnTypeNext in turn_type_mapping: self.navTypeNext, self.navModifierNext, self.xTurnInfoNext = turn_type_mapping[self.nTBTTurnTypeNext] else: self.navTypeNext, self.navModifierNext, self.xTurnInfoNext = "invalid", "", -1 if self.nTBTDist > 0 and self.xTurnInfo > 0: self.xDistToTurn = self.nTBTDist if self.nTBTDistNext > 0 and self.xTurnInfoNext > 0: self.xDistToTurnNext = self.nTBTDistNext + self.nTBTDist def _get_sdi_descr(self, nSdiType): # 多语言映射:ko(韩语,原始),zh(简体中文),en(英文)。 sdi_ko = { 0: "신호과속", 1: "과속 (고정식)", 2: "구간단속 시작", 3: "구간단속 끝", 4: "구간단속중", 5: "꼬리물기단속카메라", 6: "신호 단속", 7: "과속 (이동식)", 8: "고정식 과속위험 구간(박스형)", 9: "버스전용차로구간", 10: "가변 차로 단속", 11: "갓길 감시 지점", 12: "끼어들기 금지", 13: "교통정보 수집지점", 14: "방범용cctv", 15: "과적차량 위험구간", 16: "적재 불량 단속", 17: "주차단속 지점", 18: "일방통행도로", 19: "철길 건널목", 20: "어린이 보호구역(스쿨존 시작 구간)", 21: "어린이 보호구역(스쿨존 끝 구간)", 22: "과속방지턱", 23: "lpg충전소", 24: "터널 구간", 25: "휴게소", 26: "톨게이트", 27: "안개주의 지역", 28: "유해물질 지역", 29: "사고다발", 30: "급커브지역", 31: "급커브구간1", 32: "급경사구간", 33: "야생동물 교통사고 잦은 구간", 34: "우측시야불량지점", 35: "시야불량지점", 36: "좌측시야불량지점", 37: "신호위반다발구간", 38: "과속운행다발구간", 39: "교통혼잡지역", 40: "방향별차로선택지점", 41: "무단횡단사고다발지점", 42: "갓길 사고 다발 지점", 43: "과속 사발 다발 지점", 44: "졸음 사고 다발 지점", 45: "사고다발지점", 46: "보행자 사고다발지점", 47: "차량도난사고 상습발생지점", 48: "낙석주의지역", 49: "결빙주의지역", 50: "병목지점", 51: "합류 도로", 52: "추락주의지역", 53: "지하차도 구간", 54: "주택밀집지역(교통진정지역)", 55: "인터체인지", 56: "분기점", 57: "휴게소(lpg충전가능)", 58: "교량", 59: "제동장치사고다발지점", 60: "중앙선침범사고다발지점", 61: "통행위반사고다발지점", 62: "목적지 건너편 안내", 63: "졸음 쉼터 안내", 64: "노후경유차단속", 65: "터널내 차로변경단속", 66: "", } sdi_en = { 0: "Signal speed enforcement", 1: "Speed camera (fixed)", 2: "Section control start", 3: "Section control end", 4: "Under section control", 5: "Block-the-box camera", 6: "Signal violation enforcement", 7: "Speed camera (mobile)", 8: "Fixed speed camera zone (box)", 9: "Bus-only lane zone", 10: "Reversible/variable lane enforcement", 11: "Shoulder surveillance point", 12: "No cut-in", 13: "Traffic data collection point", 14: "Security CCTV", 15: "Overloaded vehicle risk zone", 16: "Improper loading enforcement", 17: "Parking enforcement point", 18: "One-way road", 19: "Railroad crossing", 20: "School zone start", 21: "School zone end", 22: "Speed bump", 23: "LPG station", 24: "Tunnel section", 25: "Rest area", 26: "Toll gate", 27: "Fog caution area", 28: "Hazardous materials area", 29: "Accident-prone section", 30: "Sharp curve area", 31: "Sharp curve section 1", 32: "Steep slope section", 33: "Wild animal crossing area", 34: "Poor visibility (right)", 35: "Poor visibility", 36: "Poor visibility (left)", 37: "Frequent signal violations", 38: "Frequent speeding", 39: "Traffic congestion area", 40: "Lane selection by direction", 41: "Frequent jaywalking accidents", 42: "Frequent shoulder accidents", 43: "Frequent speeding accidents", 44: "Frequent drowsy driving accidents", 45: "Accident-prone spot", 46: "Frequent pedestrian accidents", 47: "Frequent vehicle theft", 48: "Falling rock caution area", 49: "Icy road caution area", 50: "Bottleneck point", 51: "Merging road", 52: "Cliff/Drop caution area", 53: "Underpass section", 54: "Residential area (traffic calming)", 55: "Interchange", 56: "Junction", 57: "Rest area (LPG available)", 58: "Bridge", 59: "Frequent brake failure accidents", 60: "Center line invasion accidents", 61: "Violation-of-passage accidents", 62: "Destination on opposite side", 63: "Drowsy rest area", 64: "Old diesel control", 65: "Lane change enforcement in tunnel", 66: "", } sdi_zh = { 0: "信号测速/闯灯拍照", 1: "固定测速摄像头", 2: "区间测速开始", 3: "区间测速结束", 4: "区间测速中", 5: "路口压线摄像头", 6: "闯红灯拍照", 7: "流动测速摄像头", 8: "测速拍照", 9: "公交专用车道区间", 10: "可变/潮汐车道拍照", 11: "应急车道拍照", 12: "禁止加塞", 13: "交通信息采集点", 14: "治安监控", 15: "超载车辆风险区", 16: "装载不当拍照", 17: "违停拍照点", 18: "单行道", 19: "铁路道口", 20: "学校区域开始", 21: "学校区域结束", 22: "减速带", 23: "LPG加气站", 24: "隧道区间", 25: "服务区", 26: "ETC计费拍照", 27: "多雾路段", 28: "危险品区域", 29: "事故多发路段", 30: "急弯路段", 31: "急弯区段1", 32: "陡坡路段", 33: "野生动物出没路段", 34: "右侧视野不良点", 35: "视野不良点", 36: "左侧视野不良点", 37: "闯红灯多发", 38: "超速多发", 39: "交通拥堵区域", 40: "按方向选择车道点", 41: "行人乱穿马路多发处", 42: "应急车道事故多发", 43: "超速事故多发", 44: "疲劳驾驶事故多发", 45: "事故多发点", 46: "行人事故多发点", 47: "车辆盗窃多发点", 48: "落石危险路段", 49: "路面结冰危险", 50: "瓶颈路段", 51: "汇入道路", 52: "坠落危险路段", 53: "地下车道区间", 54: "居民区(交通缓和)", 55: "立交", 56: "分岔点", 57: "服务区(可加气)", 58: "桥梁", 59: "制动故障事故多发点", 60: "越线事故多发点", 61: "违法通行事故多发点", 62: "目的地在对面", 63: "瞌睡停车区", 64: "老旧柴油车管制", 65: "隧道内变道拍照", 66: "", } sdi_map = sdi_en if self.lang == "ko": sdi_map = sdi_ko elif self.lang == "zh": sdi_map = sdi_zh return sdi_map.get(nSdiType, "") def _update_sdi(self): #sdiBlockType # 1: startOSEPS: 구간단속시작 # 2: inOSEPS: 구간단속중 # 3: endOSEPS: 구간단속종료 # 0:감속안함,1:과속카메라,2:+사고방지턱,3:+이동식카메라 if self.nSdiType in [0,1,2,3,4,7,8, 75, 76] and self.nSdiSpeedLimit > 0 and self.autoNaviSpeedCtrlMode > 0: self.xSpdLimit = self.nSdiSpeedLimit * self.autoNaviSpeedSafetyFactor self.xSpdDist = self.nSdiDist self.xSpdType = self.nSdiType if self.nSdiBlockType in [2,3]: self.xSpdDist = self.nSdiBlockDist self.xSpdType = 4 elif self.nSdiType == 7 and self.autoNaviSpeedCtrlMode < 3: #이동식카메라 self.xSpdLimit = self.xSpdDist = 0 elif (self.nSdiPlusType == 22 or self.nSdiType == 22) and self.roadcate > 1 and self.autoNaviSpeedCtrlMode >= 2: # speed bump, roadcate:0,1: highway self.xSpdLimit = self.autoNaviSpeedBumpSpeed self.xSpdDist = self.nSdiPlusDist if self.nSdiPlusType == 22 else self.nSdiDist self.xSpdType = 22 else: self.xSpdLimit = 0 self.xSpdType = -1 self.xSpdDist = 0 def _update_gps(self, v_ego, sm, gps_service): gps = sm[gps_service] #print(f"location = {sm.valid[llk]}, {sm.updated[llk]}, {sm.recv_frame[llk]}, {sm.recv_time[llk]}") if not sm.updated['carState'] or not sm.updated['carControl']: # or not sm.updated[llk]: return self.nPosAngle CS = sm['carState'] CC = sm['carControl'] self.gps_valid = sm.updated[gps_service] and gps.hasFix now = time.monotonic() gps_updated_phone = (now - self.last_update_gps_time_phone) < 3 gps_updated_navi = (now - self.last_update_gps_time_navi) < 3 bearing = self.nPosAngle if gps_updated_phone: self.bearing_offset = 0.0 elif self.gps_valid: bearing = self.nPosAngle = gps.bearingDeg if self.gps_valid: self.bearing_offset = 0.0 elif self.active_carrot > 0: bearing = self.nPosAnglePhone self.bearing_offset = 0.0 #print(f"bearing = {bearing:.1f}, posA=={self.nPosAngle:.1f}, posP=={self.nPosAnglePhone:.1f}, offset={self.bearing_offset:.1f}, {gps_updated_phone}, {gps_updated_navi}") gpsDelayTimeAdjust = 0.0 if gps_updated_navi: gpsDelayTimeAdjust = 0 #1.0 external_gps_update_timedout = not (gps_updated_phone or gps_updated_navi) #print(f"gps_valid = {self.gps_valid}, bearing = {bearing:.1f}, pos = {location.positionGeodetic.value[0]:.6f}, {location.positionGeodetic.value[1]:.6f}") if self.gps_valid and external_gps_update_timedout: # 내부GPS가 자동하고 carrotman으로부터 gps신호가 없는경우 self.vpPosPointLatNavi = gps.latitude self.vpPosPointLonNavi = gps.longitude self.last_calculate_gps_time = now #sm.recv_time[llk] elif gps_updated_navi: # carrot navi로부터 gps신호가 수신되는 경우.. if abs(self.bearing_measured - bearing) < 0.1: self.diff_angle_count += 1 else: self.diff_angle_count = 0 self.bearing_measured = bearing if self.diff_angle_count > 5: # 조향각도변화가 거의 없을때만 업데이트 diff_angle = (self.nPosAngle - bearing) % 360 if diff_angle > 180: diff_angle -= 360 self.bearing_offset = self.bearing_offset * 0.9 + diff_angle * 0.1 bearing_calculated = (bearing + self.bearing_offset) % 360 dt = now - self.last_calculate_gps_time #print(f"dt = {dt:.1f}, {self.vpPosPointLatNavi}, {self.vpPosPointLonNavi}") if dt > 5.0: self.vpPosPointLat, self.vpPosPointLon = 0.0, 0.0 elif dt == 0: self.vpPosPointLat, self.vpPosPointLon = self.vpPosPointLatNavi, self.vpPosPointLonNavi else: self.vpPosPointLat, self.vpPosPointLon = self.estimate_position(float(self.vpPosPointLatNavi), float(self.vpPosPointLonNavi), v_ego, bearing_calculated, dt + gpsDelayTimeAdjust) #self.debugText = " {} {:.1f},{:.1f}={:.1f}+{:.1f}".format(self.active_sdi_count, self.nPosAngle, bearing_calculated, bearing, self.bearing_offset) #print("nPosAngle = {:.1f},{:.1f} = {:.1f}+{:.1f}".format(self.nPosAngle, bearing_calculated, bearing, self.bearing_offset)) return float(bearing_calculated) def estimate_position(self, lat, lon, speed, angle, dt): R = 6371000 angle_rad = math.radians(angle) delta_d = speed * dt delta_lat = delta_d * math.cos(angle_rad) / R new_lat = lat + math.degrees(delta_lat) delta_lon = delta_d * math.sin(angle_rad) / (R * math.cos(math.radians(lat))) new_lon = lon + math.degrees(delta_lon) return new_lat, new_lon def update_auto_turn(self, v_ego_kph, sm, x_turn_info, x_dist_to_turn, check_steer=False): turn_speed = self.autoTurnControlSpeedTurn fork_speed = self.nRoadLimitSpeed stop_speed = 1 turn_dist_for_speed = self.autoTurnControlTurnEnd * turn_speed / 3.6 # 5 fork_dist_for_speed = self.autoTurnControlTurnEnd * fork_speed / 3.6 # 5 stop_dist_for_speed = 5 start_fork_dist = np.interp(self.nRoadLimitSpeed, [30, 50, 100], [160, 200, 350]) start_turn_dist = np.interp(self.nTBTNextRoadWidth, [5, 10], [43, 60]) turn_info_mapping = { 1: {"type": "turn left", "speed": turn_speed, "dist": turn_dist_for_speed, "start": start_fork_dist}, 2: {"type": "turn right", "speed": turn_speed, "dist": turn_dist_for_speed, "start": start_fork_dist}, 5: {"type": "straight", "speed": turn_speed, "dist": turn_dist_for_speed, "start": start_turn_dist}, 3: {"type": "fork left", "speed": fork_speed, "dist": fork_dist_for_speed, "start": start_fork_dist}, 4: {"type": "fork right", "speed": fork_speed, "dist": fork_dist_for_speed, "start": start_fork_dist}, 6: {"type": "straight", "speed": fork_speed, "dist": fork_dist_for_speed, "start": start_fork_dist}, 7: {"type": "straight", "speed": stop_speed, "dist": stop_dist_for_speed, "start": 1000}, 8: {"type": "straight", "speed": stop_speed, "dist": stop_dist_for_speed, "start": 1000}, } default_mapping = {"type": "none", "speed": 0, "dist": 0, "start": 1000} mapping = turn_info_mapping.get(x_turn_info, default_mapping) atc_type = mapping["type"] atc_speed = mapping["speed"] atc_dist = mapping["dist"] atc_start_dist = mapping["start"] if x_dist_to_turn > atc_start_dist: atc_type += " prepare" if check_steer: self.atc_activate_count = min(0, self.atc_activate_count - 1) else: if check_steer: self.atc_activate_count = max(0, self.atc_activate_count + 1) if atc_type in ["turn left", "turn right"] and x_dist_to_turn > start_turn_dist: atc_type = "atc left" if atc_type == "turn left" else "atc right" if self.autoTurnMapChange > 0 and check_steer: #print(f"x_dist_to_turn: {x_dist_to_turn}, atc_start_dist: {atc_start_dist}") #print(f"atc_activate_count: {self.atc_activate_count}") if self.atc_activate_count == 2: self.carrotCmdIndex += 100 self.carrotCmd = "DISPLAY"; self.carrotArg = "MAP"; elif self.atc_activate_count == -50: self.carrotCmdIndex += 100 self.carrotCmd = "DISPLAY"; self.carrotArg = "ROAD"; if check_steer: if 0 <= x_dist_to_turn < atc_start_dist and atc_type in ["fork left", "fork right"]: if not self.atc_paused: steering_pressed = sm["carState"].steeringPressed steering_torque = sm["carState"].steeringTorque if steering_pressed and steering_torque < 0 and atc_type in ["fork left", "atc left"]: self.atc_paused = True elif steering_pressed and steering_torque > 0 and atc_type in ["fork right", "atc right"]: self.atc_paused = True else: self.atc_paused = False if self.atc_paused: atc_type += " canceled" atc_desired = 250 if atc_speed > 0 and x_dist_to_turn > 0: decel = self.autoNaviSpeedDecelRate safe_sec = 2.0 atc_desired = min(atc_desired, self.calculate_current_speed(x_dist_to_turn - atc_dist, atc_speed, safe_sec, decel)) return atc_desired, atc_type, atc_speed, atc_dist def update_nav_instruction(self, sm): if sm.alive['navInstruction'] and sm.valid['navInstruction']: msg_nav = sm['navInstruction'] self.nGoPosDist = int(msg_nav.distanceRemaining) self.nGoPosTime = int(msg_nav.timeRemaining) if self.active_kisa_count <= 0 and msg_nav.speedLimit > 0: self.nRoadLimitSpeed = max(30, round(msg_nav.speedLimit * CV.MS_TO_KPH)) self.xDistToTurn = int(msg_nav.maneuverDistance) self.szTBTMainText = msg_nav.maneuverPrimaryText self.xTurnInfo = -1 for key, value in nav_type_mapping.items(): if value[0] == msg_nav.maneuverType and value[1] == msg_nav.maneuverModifier: self.xTurnInfo = value[2] break self.debugText = f"{self.nRoadLimitSpeed if self.is_metric else self.nRoadLimitSpeed * CV.KPH_TO_MPH:.0f},{msg_nav.maneuverType},{msg_nav.maneuverModifier} " #print(msg_nav) #print(f"navInstruction: {self.xTurnInfo}, {self.xDistToTurn}, {self.szTBTMainText}") def update_kisa(self, data): self.active_kisa_count = 100 if "kisawazecurrentspd" in data: pass if "kisawazeroadspdlimit" in data: road_limit_speed = data["kisawazeroadspdlimit"] if road_limit_speed > 0: print(f"kisawazeroadspdlimit: {road_limit_speed} km/h") if not self.is_metric: road_limit_speed *= CV.MPH_TO_KPH self.nRoadLimitSpeed = road_limit_speed if "kisawazealert" in data: pass if "kisawazeendalert" in data: pass if "kisawazeroadname" in data: print(f"kisawazeroadname: {data['kisawazeroadname']}") self.szPosRoadName = data["kisawazeroadname"] if "kisawazereportid" in data and "kisawazealertdist" in data: id_str = data["kisawazereportid"] dist_str = data["kisawazealertdist"].lower() import re match = re.search(r'(\d+)', dist_str) distance = int(match.group(1)) if match else 0 if not self.is_metric: distance = int(distance * 0.3048) print(f"{id_str}: {distance} m") xSpdType = -1 if 'camera' in id_str: xSpdType = 101 # 101: waze speed cam, 100: police elif 'police' in id_str: xSpdType = 100 if xSpdType >= 0: offset = 5 if self.is_metric else 5 * CV.MPH_TO_KPH self.xSpdLimit = self.nRoadLimitSpeed + offset self.xSpdDist = distance self.xSpdType =xSpdType def update_navi(self, remote_ip, sm, pm, vturn_speed, coords, distances, route_speed, gps_service): self.debugText = "" self.update_params() if sm.alive['carState'] and sm.alive['selfdriveState']: CS = sm['carState'] v_ego = CS.vEgo v_ego_kph = v_ego * 3.6 distanceTraveled = sm['selfdriveState'].distanceTraveled delta_dist = distanceTraveled - self.totalDistance self.totalDistance = distanceTraveled if CS.speedLimit > 0 and self.active_carrot <= 1: self.nRoadLimitSpeed = CS.speedLimit else: v_ego = v_ego_kph = 0 delta_dist = 0 CS = None road_speed_limit_changed = True if self.nRoadLimitSpeed != self.nRoadLimitSpeed_last else False self.nRoadLimitSpeed_last = self.nRoadLimitSpeed #self.bearing = self.nPosAngle #self._update_gps(v_ego, sm) self.bearing = self._update_gps(v_ego, sm, gps_service) self.xSpdDist = max(self.xSpdDist - delta_dist, -1000) self.xDistToTurn = self.xDistToTurn - delta_dist self.xDistToTurnNext = self.xDistToTurnNext - delta_dist self.active_count = max(self.active_count - 1, 0) self.active_sdi_count = max(self.active_sdi_count - 1, 0) self.active_kisa_count = max(self.active_kisa_count - 1, 0) if self.active_kisa_count > 0: self.active_carrot = 2 elif self.active_count > 0: self.active_carrot = 2 if self.active_sdi_count > 0 else 1 else: self.active_carrot = 0 if self.autoRoadSpeedLimitOffset >= 0 and self.active_carrot>=2: if self.nRoadLimitSpeed >= 30: road_speed_limit_offset = self.autoRoadSpeedLimitOffset if not self.is_metric: road_speed_limit_offset *= CV.KPH_TO_MPH limit_speed = self.nRoadLimitSpeed + road_speed_limit_offset else: limit_speed = 200 if self.active_carrot <= 1: self.xSpdType = self.navType = self.xTurnInfo = self.xTurnInfoNext = -1 self.nSdiType = self.nSdiBlockType = self.nSdiPlusBlockType = -1 self.nTBTTurnType = self.nTBTTurnTypeNext = -1 self.roadcate = 8 self.nGoPosDist = 0 if self.active_carrot <= 1 or self.active_kisa_count > 0: self.update_nav_instruction(sm) if self.xSpdType < 0 or (self.xSpdType not in [100,101] and self.xSpdDist <= 0) or (self.xSpdType in [100,101] and self.xSpdDist < -250): self.xSpdType = -1 self.xSpdDist = self.xSpdLimit = 0 if self.xTurnInfo < 0 or self.xDistToTurn < -50: if self.xDistToTurn > 0: self.xDistToTurn = 0 self.xTurnInfo = -1 self.xDistToTurnNext = 0 self.xTurnInfoNext = -1 sdi_speed = 250 hda_active = False ### 과속카메라, 사고방지턱 if (self.xSpdDist > 0 or self.xSpdType in [100, 101]) and self.active_carrot > 0: safe_sec = self.autoNaviSpeedBumpTime if self.xSpdType == 22 else self.autoNaviSpeedCtrlEnd decel = self.autoNaviSpeedDecelRate sdi_speed = min(sdi_speed, self.calculate_current_speed(self.xSpdDist, self.xSpdLimit, safe_sec, decel)) self.active_carrot = 5 if self.xSpdType == 22 else 3 if self.xSpdType == 4 or (self.xSpdType in [100, 101] and self.xSpdDist <= 0): sdi_speed = self.xSpdLimit self.active_carrot = 4 elif CS is not None and CS.speedLimit > 0 and CS.speedLimitDistance > 0: sdi_speed = min(sdi_speed, self.calculate_current_speed(CS.speedLimitDistance, CS.speedLimit * self.autoNaviSpeedSafetyFactor, self.autoNaviSpeedCtrlEnd, self.autoNaviSpeedDecelRate)) #self.active_carrot = 6 hda_active = True #print(f"sdi_speed: {sdi_speed}, hda_active: {hda_active}, xSpdType: {self.xSpdType}, xSpdDist: {self.xSpdDist}, active_carrot: {self.active_carrot}, v_ego_kph: {v_ego_kph}, nRoadLimitSpeed: {self.nRoadLimitSpeed}") ### TBT 속도제어 atc_desired, self.atcType, self.atcSpeed, self.atcDist = self.update_auto_turn(v_ego*3.6, sm, self.xTurnInfo, self.xDistToTurn, True) atc_desired_next, _, _, _ = self.update_auto_turn(v_ego*3.6, sm, self.xTurnInfoNext, self.xDistToTurnNext, False) if self.nSdiType >= 0: # or self.active_carrot > 0: pass # self.debugText = ( # f"Atc:{atc_desired:.1f}," + # f"{self.xTurnInfo}:{self.xDistToTurn:.1f}, " + # f"I({self.nTBTNextRoadWidth},{self.roadcate}) " + # f"Atc2:{atc_desired_next:.1f}," + # f"{self.xTurnInfoNext},{self.xDistToTurnNext:.1f}" # ) #self.debugText = "" #f" {self.nSdiType}/{self.nSdiSpeedLimit}/{self.nSdiDist},BLOCK:{self.nSdiBlockType}/{self.nSdiBlockSpeed}/{self.nSdiBlockDist}, PLUS:{self.nSdiPlusType}/{self.nSdiPlusSpeedLimit}/{self.nSdiPlusDist}" #elif self.nGoPosDist > 0 and self.active_carrot > 1: # self.debugText = " 목적지:{:.1f}km/{:.1f}분 남음".format(self.nGoPosDist/1000., self.nGoPosTime / 60) else: #self.debugText = "" pass if self.autoTurnControl not in [2, 3]: # auto turn speed control atc_desired = atc_desired_next = 250 if self.autoTurnControl not in [1,2]: # auto turn control self.atcType = "none" speed_n_sources = [ (atc_desired, "atc"), (atc_desired_next, "atc2"), (sdi_speed, "hda" if hda_active else "bump" if self.xSpdType == 22 else "section" if self.xSpdType == 4 else "police" if self.xSpdType == 100 else "waze" if self.xSpdType == 101 else "cam"), (limit_speed, "road"), ] if self.turnSpeedControlMode in [1,2]: speed_n_sources.append((max(abs(vturn_speed), self.autoCurveSpeedLowerLimit), "vturn")) route_speed = max(route_speed * self.mapTurnSpeedFactor, self.autoCurveSpeedLowerLimit) if self.turnSpeedControlMode == 2: if -500 < self.xDistToTurn < 500: speed_n_sources.append((route_speed, "route")) elif self.turnSpeedControlMode in [3, 4]: speed_n_sources.append((route_speed, "route")) #speed_n_sources.append((self.calculate_current_speed(dist, speed * self.mapTurnSpeedFactor, 0, 1.2), "route")) model_turn_speed = max(sm['modelV2'].meta.modelTurnSpeed, self.autoCurveSpeedLowerLimit) if model_turn_speed < 200 and abs(vturn_speed) < 120: speed_n_sources.append((model_turn_speed, "model")) desired_speed, source = min(speed_n_sources, key=lambda x: x[0]) if CS is not None: if source != self.source_last: self.gas_override_speed = 0 self.gas_pressed_state = CS.gasPressed if CS.vEgo < 0.1 or desired_speed > 150 or source in ["cam", "section", "police"] or CS.brakePressed or road_speed_limit_changed: self.gas_override_speed = 0 elif CS.gasPressed and not self.gas_pressed_state: self.gas_override_speed = max(v_ego_kph, self.gas_override_speed) else: self.gas_pressed_state = False self.source_last = source if desired_speed < self.gas_override_speed: source = "gas" desired_speed = self.gas_override_speed self.debugText += f"route={route_speed:.1f}"#f"desired={desired_speed:.1f},{source},g={self.gas_override_speed:.0f}" left_spd_sec = 100 left_tbt_sec = 100 if self.autoNaviCountDownMode > 0: if self.xSpdType == 22 and self.autoNaviCountDownMode == 1: # speed bump pass else: if self.xSpdDist > 0: left_spd_sec = min(self.left_spd_sec, int(max(self.xSpdDist - v_ego, 1) / max(1, v_ego) + 0.5)) if self.xDistToTurn > 0: left_tbt_sec = min(self.left_tbt_sec, int(max(self.xDistToTurn - v_ego, 1) / max(1, v_ego) + 0.5)) self.left_spd_sec = left_spd_sec self.left_tbt_sec = left_tbt_sec left_sec = min(left_spd_sec, left_tbt_sec) if left_sec > 11: self.left_sec = 100 self.max_left_sec = 100 else: self.sdi_inform = True if source in ["cam", "hda"] else False self.max_left_sec = min(11, max(6, int(v_ego_kph/10) + 1)) if left_sec != self.left_sec: if left_sec == self.max_left_sec and self.sdi_inform: self.carrot_left_sec = 11 elif 1 <= left_sec < self.max_left_sec: self.carrot_left_sec = left_sec elif left_sec == 0 and self.left_sec == 1: self.carrot_left_sec = left_sec self.left_sec = left_sec self._update_cmd() msg = messaging.new_message('carrotMan') msg.valid = True msg.carrotMan.activeCarrot = self.active_carrot msg.carrotMan.nRoadLimitSpeed = int(self.nRoadLimitSpeed) msg.carrotMan.remote = remote_ip msg.carrotMan.xSpdType = int(self.xSpdType) msg.carrotMan.xSpdLimit = int(self.xSpdLimit) msg.carrotMan.xSpdDist = int(self.xSpdDist) msg.carrotMan.xSpdCountDown = int(left_spd_sec) msg.carrotMan.xTurnInfo = int(self.xTurnInfo) msg.carrotMan.xDistToTurn = int(self.xDistToTurn) msg.carrotMan.xTurnCountDown = int(left_tbt_sec) msg.carrotMan.atcType = self.atcType msg.carrotMan.vTurnSpeed = int(vturn_speed) msg.carrotMan.szPosRoadName = self.szPosRoadName + self.debugText msg.carrotMan.szTBTMainText = self.szTBTMainText msg.carrotMan.desiredSpeed = int(desired_speed) msg.carrotMan.desiredSource = source msg.carrotMan.carrotCmdIndex = int(self.carrotCmdIndex) msg.carrotMan.carrotCmd = self.carrotCmd msg.carrotMan.carrotArg = self.carrotArg msg.carrotMan.trafficState = self.traffic_state msg.carrotMan.xPosSpeed = float(v_ego_kph) #float(self.nPosSpeed) msg.carrotMan.xPosAngle = float(self.bearing) msg.carrotMan.xPosLat = float(self.vpPosPointLat) msg.carrotMan.xPosLon = float(self.vpPosPointLon) msg.carrotMan.nGoPosDist = self.nGoPosDist msg.carrotMan.nGoPosTime = self.nGoPosTime msg.carrotMan.szSdiDescr = self._get_sdi_descr(-1 if self.nSdiType == 0 and self.nSdiDist == 0 else self.nSdiType) #coords_str = ";".join([f"{x},{y}" for x, y in coords]) coords_str = ";".join([f"{x:.2f},{y:.2f},{d:.2f}" for (x, y), d in zip(coords, distances, strict=False)]) msg.carrotMan.naviPaths = coords_str msg.carrotMan.leftSec = int(self.carrot_left_sec) pm.send('carrotMan', msg) inst = messaging.new_message('navInstructionCarrot') if self.active_carrot > 1 and self.active_kisa_count <= 0: inst.valid = True instruction = inst.navInstructionCarrot instruction.distanceRemaining = self.nGoPosDist instruction.timeRemaining = self.nGoPosTime instruction.speedLimit = self.nRoadLimitSpeed / 3.6 if self.nRoadLimitSpeed > 0 else 0 instruction.maneuverDistance = float(self.nTBTDist) instruction.maneuverSecondaryText = self.szNearDirName if self.szFarDirName and len(self.szFarDirName): instruction.maneuverSecondaryText += "[{}]".format(self.szFarDirName) instruction.maneuverPrimaryText = self.szTBTMainText instruction.timeRemainingTypical = self.nGoPosTime navType, navModifier, xTurnInfo1 = "invalid", "", -1 if self.nTBTTurnType in nav_type_mapping: navType, navModifier, xTurnInfo1 = nav_type_mapping[self.nTBTTurnType] navTypeNext, navModifierNext, xTurnInfoNext = "invalid", "", -1 if self.nTBTTurnTypeNext in nav_type_mapping: navTypeNext, navModifierNext, xTurnInfoNext = nav_type_mapping[self.nTBTTurnTypeNext] instruction.maneuverType = navType instruction.maneuverModifier = navModifier maneuvers = [] if self.nTBTTurnType >= 0: maneuver = {} maneuver['distance'] = float(self.xDistToTurn) maneuver['type'] = navType maneuver['modifier'] = navModifier maneuvers.append(maneuver) if self.nTBTDistNext >= self.nTBTDist: maneuver = {} maneuver['distance'] = float(self.nTBTDistNext) maneuver['type'] = navTypeNext maneuver['modifier'] = navModifierNext maneuvers.append(maneuver) instruction.allManeuvers = maneuvers elif sm.alive['navInstruction'] and sm.valid['navInstruction']: inst.navInstructionCarrot = sm['navInstruction'] pm.send('navInstructionCarrot', inst) def _update_system_time(self, epoch_time_remote, timezone_remote): epoch_time = int(time.time()) if epoch_time_remote > 0: epoch_time_offset = epoch_time_remote - epoch_time print(f"epoch_time_offset = {epoch_time_offset}") if abs(epoch_time_offset) > 60: os.system(f"sudo timedatectl set-timezone {timezone_remote}") formatted_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch_time_remote)) print(f"Setting system time to: {formatted_time}") os.system(f'sudo date -s "{formatted_time}"') def set_time(self, epoch_time, timezone): import datetime new_time = datetime.datetime.utcfromtimestamp(epoch_time) localtime_path = "/data/etc/localtime" no_timezone = False try: if os.path.getsize(localtime_path) == 0: no_timezone = True except (FileNotFoundError, OSError): no_timezone = True diff = datetime.datetime.utcnow() - new_time if abs(diff) < datetime.timedelta(seconds=10) and not no_timezone: #print(f"Time diff too small: {diff}") return print(f"Setting time to {new_time}, diff={diff}") zoneinfo_path = f"/usr/share/zoneinfo/{timezone}" if os.path.exists(localtime_path) or os.path.islink(localtime_path): try: subprocess.run(["sudo", "rm", "-f", localtime_path], check=True) print(f"Removed existing file or link: {localtime_path}") except subprocess.CalledProcessError as e: print(f"Error removing {localtime_path}: {e}") return try: subprocess.run(["sudo", "ln", "-s", zoneinfo_path, localtime_path], check=True) print(f"Timezone successfully set to: {timezone}") except subprocess.CalledProcessError as e: print(f"Failed to set timezone to {timezone}: {e}") try: subprocess.run(f"TZ=UTC date -s '{new_time}'", shell=True, check=True) #subprocess.run() except subprocess.CalledProcessError: print("timed.failed_setting_time") def update(self, json): if json is None: return if "carrotIndex" in json: self.carrotIndex = int(json.get("carrotIndex")) #print(json) if self.carrotIndex % 60 == 0 and "epochTime" in json: # op는 ntp를 사용하기때문에... 필요없는 루틴으로 보임. timezone_remote = json.get("timezone", "Asia/Seoul") if not PC: self.set_time(int(json.get("epochTime")), timezone_remote) #self._update_system_time(int(json.get("epochTime")), timezone_remote) if "carrotCmd" in json: #print(json.get("carrotCmd"), json.get("carrotArg")) self.carrotCmdIndex = self.carrotIndex self.carrotCmd = json.get("carrotCmd") self.carrotArg = json.get("carrotArg") print(f"carrotCmd = {self.carrotCmd}, {self.carrotArg}") self.active_count = 80 now = time.monotonic() if "goalPosX" in json: self.goalPosX = float(json.get("goalPosX", self.goalPosX)) self.goalPosY = float(json.get("goalPosY", self.goalPosY)) self.szGoalName = json.get("szGoalName", self.szGoalName) if "nRoadLimitSpeed" in json: #print(json) self.active_sdi_count = self.active_sdi_count_max ### roadLimitSpeed nRoadLimitSpeed = int(json.get("nRoadLimitSpeed", 20)) if nRoadLimitSpeed > 0: if nRoadLimitSpeed > 200: nRoadLimitSpeed = (nRoadLimitSpeed - 20) / 10 elif nRoadLimitSpeed == 120: nRoadLimitSpeed = 115 # 120 -> 115 fix bug else: nRoadLimitSpeed = 30 #self.nRoadLimitSpeed = nRoadLimitSpeed if self.nRoadLimitSpeed != nRoadLimitSpeed: self.nRoadLimitSpeed_counter += 1 if self.nRoadLimitSpeed_counter > 5: self.nRoadLimitSpeed = nRoadLimitSpeed else: self.nRoadLimitSpeed_counter = 0 ### SDI self.nSdiType = int(json.get("nSdiType", -1)) self.nSdiSpeedLimit = int(json.get("nSdiSpeedLimit", 0)) self.nSdiSection = int(json.get("nSdiSection", -1)) self.nSdiDist = int(json.get("nSdiDist", -1)) self.nSdiBlockType = int(json.get("nSdiBlockType", -1)) self.nSdiBlockSpeed = int(json.get("nSdiBlockSpeed", 0)) self.nSdiBlockDist = int(json.get("nSdiBlockDist", 0)) self.nSdiPlusType = int(json.get("nSdiPlusType", -1)) self.nSdiPlusSpeedLimit = int(json.get("nSdiPlusSpeedLimit", 0)) self.nSdiPlusDist = int(json.get("nSdiPlusDist", 0)) self.nSdiPlusBlockType = int(json.get("nSdiPlusBlockType", -1)) self.nSdiPlusBlockSpeed = int(json.get("nSdiPlusBlockSpeed", 0)) self.nSdiPlusBlockDist = int(json.get("nSdiPlusBlockDist", 0)) self.roadcate = int(json.get("roadcate", 0)) ## GuidePoint self.nTBTDist = int(json.get("nTBTDist", 0)) self.nTBTTurnType = int(json.get("nTBTTurnType", -1)) self.szTBTMainText = json.get("szTBTMainText", "") self.szNearDirName = json.get("szNearDirName", "") self.szFarDirName = json.get("szFarDirName", "") self.nTBTNextRoadWidth = int(json.get("nTBTNextRoadWidth", 0)) self.nTBTDistNext = int(json.get("nTBTDistNext", 0)) self.nTBTTurnTypeNext = int(json.get("nTBTTurnTypeNext", -1)) self.szTBTMainTextNext = json.get("szTBTMainText", "") self.nGoPosDist = int(json.get("nGoPosDist", 0)) self.nGoPosTime = int(json.get("nGoPosTime", 0)) self.szPosRoadName = json.get("szPosRoadName", "") if self.szPosRoadName == "null": self.szPosRoadName = "" self.vpPosPointLatNavi = float(json.get("vpPosPointLat", 0.0)) self.vpPosPointLonNavi = float(json.get("vpPosPointLon", 0.0)) if self.vpPosPointLatNavi != 0.0: self.last_update_gps_time_navi = self.last_calculate_gps_time = now self.nPosAngle = float(json.get("nPosAngle", self.nPosAngle)) self.nPosSpeed = float(json.get("nPosSpeed", self.nPosSpeed)) self._update_tbt() self._update_sdi() print( f"sdi = {self.nSdiType}, {self.nSdiSpeedLimit}, {self.nSdiPlusType}, " + f"tbt = {self.nTBTTurnType}, {self.nTBTDist}, " + f"next = {self.nTBTTurnTypeNext}, {self.nTBTDistNext}" ) #print(json) else: #print(json) pass # 3초간 navi 데이터가 없으면, phone gps로 업데이트 if "latitude" in json: self.nPosAnglePhone = float(json.get("heading", self.nPosAngle)) self.phone_latitude = float(json.get("latitude", self.vpPosPointLatNavi)) self.phone_longitude = float(json.get("longitude", self.vpPosPointLonNavi)) self.phone_gps_accuracy = float(json.get("accuracy", 0)) if self.phone_gps_accuracy < 15.0: self.phone_gps_frame += 1 if (now - self.last_update_gps_time_navi) > 3.0: self.vpPosPointLatNavi = self.phone_latitude self.vpPosPointLonNavi = self.phone_longitude self.nPosAngle = self.nPosAnglePhone # self.nPosSpeed = self.ve # TODO speed from v_ego self.last_update_gps_time_phone = self.last_calculate_gps_time = now self.nPosSpeed = float(json.get("gps_speed", 0)) print(f"phone gps: {self.vpPosPointLatNavi}, {self.vpPosPointLonNavi}, {self.phone_gps_accuracy}, {self.nPosSpeed}") import traceback def main(): print("CarrotManager Started") #print("Carrot GitBranch = {}, {}".format(Params().get("GitBranch"), Params().get("GitCommitDate"))) # 延迟导入,避免与 carrot_man 中导入 CarrotServ 的循环依赖 from openpilot.selfdrive.carrot.carrot_man import CarrotMan carrot_man = CarrotMan() print(f"CarrotMan {carrot_man}") threading.Thread(target=carrot_man.kisa_app_thread).start() while True: try: carrot_man.carrot_man_thread() except Exception as e: print(f"carrot_man error...: {e}") traceback.print_exc() time.sleep(10) if __name__ == "__main__": main()