직접 주행
Pure Pursuit
자동차나 로봇이 경로 상의 한 지점(Look-ahead Point)을 바라보며 부드러운 곡선을 그리며 따라가게 하는 기법
실습 과제 : 목적지 바꿔가며 주행하기
`self.path` 리스트 내에 정의된 목적지 좌표를 파라미터를 통해 전달받을 수 있도록 위 노드를 수정
파라미터 전달할 때마다 바뀐 목적지를 향해 주행할 수 있도록 하는 것이 핵심
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseWithCovarianceStamped
from math import pow, atan2, sqrt, sin, cos, pi
from rcl_interfaces.msg import SetParametersResult
class PurePursuit(Node):
def __init__(self):
super().__init__('pure_pursuit_param_node')
# added part 1 : declare parameter
self.declare_parameter('goal_x', 0.0)
self.declare_parameter('goal_y', 0.0)
self.add_on_set_parameters_callback(self.parameter_callback)
# added part 2 : initialize variable via parameter
self.goal_x = self.get_parameter('goal_x').value
self.goal_y = self.get_parameter('goal_y').value
self.lookahead_distance = 0.5
self.linear_velocity = 0.2
self.goal_tolerance = 0.2
self.path = [
[self.goal_x, self.goal_y]
]
self.current_waypoint_index = 0
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
self.subscription_ = self.create_subscription(
PoseWithCovarianceStamped,
'/amcl_pose',
self.pose_callback,
10
)
self.current_x = 0.0
self.current_y = 0.0
self.current_yaw = 0.0
self.is_localized = False
self.timer = self.create_timer(0.5, self.control_loop)
self.get_logger().info("Pure Pursuit Node Started! Waiting for AMCL pose...")
# added part 3 : callback when parameter set
def parameter_callback(self, params):
for param in params:
if param.name == 'goal_x':
self.goal_x = param.value
self.get_logger().info(f'goal_x updated: {self.goal_x}')
if param.name == 'goal_y':
self.goal_y = param.value
self.get_logger().info(f'goal_y updated: {self.goal_y}')
self.path = [
[self.goal_x, self.goal_y]
]
self.current_waypoint_index = 0
return SetParametersResult(successful=True)
def pose_callback(self, msg):
self.get_logger().info(f"pose callback : {type(msg)}")
self.current_x = msg.pose.pose.position.x
self.current_y = msg.pose.pose.position.y
# formula to get current yaw (current yaw? angular.z degree!)
q = msg.pose.pose.orientation
self.current_yaw = atan2(
2.0 * (q.w * q.z + q.x * q.y),
1.0 - 2.0 * (q.y * q.y + q.z * q.z)
)
self.is_localized = True
def control_loop(self):
if not self.is_localized:
return
if self.current_waypoint_index >= len(self.path):
self.stop_robot()
return
goal_x = self.path[self.current_waypoint_index][0]
goal_y = self.path[self.current_waypoint_index][1]
dx = goal_x - self.current_x
dy = goal_y - self.current_y
distance = sqrt(pow(dx, 2) + pow(dy, 2))
if distance < self.goal_tolerance:
self.get_logger().info(f"Waypoint {self.current_waypoint_index} Reached!")
self.current_waypoint_index += 1
return
target_angle = atan2(dy, dx)
alpha = target_angle - self.current_yaw
if alpha > pi:
alpha -= 2 * pi
elif alpha < -pi:
alpha += 2 * pi
angular_velocity = self.linear_velocity * (2.0 * sin(alpha)) / distance
cmd = Twist()
cmd.linear.x = self.linear_velocity
cmd.angular.z = angular_velocity
if cmd.angular.z > 1.0:
cmd.angular.z = 1.0
if cmd.angular.z < -1.0:
cmd.angular.z = -1.0
self.publisher_.publish(cmd)
def stop_robot(self):
cmd = Twist()
cmd.linear.x = 0.0
cmd.angular.z = 0.0
self.publisher_.publish(cmd)
self.get_logger().info("All waypoints completed. Robot Stopped.")
def main(args=None):
rclpy.init(args=args)
node = PurePursuit()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.stop_robot()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
실습 과제 : 위 내용을 실제 로봇에 적용해보기
장애물 피하면서 가는 단순 제어 코드
실습 순서
(0. 맵 새로 생성 -> 위치 추정 런치 파일 실행, 주행 노드 실행, 초기 위치 알려주기)
1. 시뮬레이션 상에서 장애물 피해가는지 확인
2. 확인됐으면, 실제 로봇을 바닥에 간단한 미로를 만들어 주행시켜보기
파라미터 3개는 실험으로 조정
목적지 좌표는 맵 상의 좌표로(1.8, -0.6)
turtlebot3_ws/src/my_pure_pursuit/my_pure_pursuit/pure_pursuit_obstacle.py
# pure_pursuit_obstacle.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Twist, PoseWithCovarianceStamped
from sensor_msgs.msg import LaserScan
from math import pow, atan2, sqrt, sin, cos, pi
class PurePursuitLogic(Node):
def __init__(self):
super().__init__('pure_pursuit_obstacle')
# 사용자가 정해야 하는 파라미터 삼형제
self.lookahead_distance = 0.5 # 전방 주시 거리
self.linear_velocity = 0.15 # 이동 속도
self.obs_threshold = 0.45 # 장애물 감지 거리
# 목표 위치 (맵 상의 좌표)
self.path = [
[1.8, -0.6]
]
self.current_waypoint_index = 0
# AMCL pose용 QoS (TRANSIENT_LOCAL 필수)
amcl_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
# LiDAR scan용 QoS (BEST_EFFORT이 일반적)
scan_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=5
)
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
self.pose_sub = self.create_subscription(
PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, amcl_qos
)
self.scan_sub = self.create_subscription(
LaserScan, '/scan', self.scan_callback, scan_qos
)
self.current_x = 0.0
self.current_y = 0.0
self.current_yaw = 0.0
self.is_localized = False
# initial value.... 99.9 means 'no obstacle'
self.left_dist = 99.9
self.front_dist = 99.9
self.right_dist = 99.9
self.timer = self.create_timer(0.1, self.control_loop)
self.get_logger().info("Let's Go!")
# 위치와 회전 상태를 받으면 그에 대응하여 각도를 계산
def pose_callback(self, msg):
self.current_x = msg.pose.pose.position.x
self.current_y = msg.pose.pose.position.y
q = msg.pose.pose.orientation
self.current_yaw = atan2(
2.0 * (q.w * q.z + q.x * q.y),
1.0 - 2.0 * (q.y * q.y + q.z * q.z)
)
self.is_localized = True
def scan_callback(self, msg):
# 장애물 바라보는 각도
front_ranges = msg.ranges[0:40] + msg.ranges[-40:]
# 좌우 비교를 위한 각도
left_ranges = msg.ranges[20:70]
right_ranges = msg.ranges[-70:-20]
self.front_dist = self.get_min_dist(front_ranges)
self.left_dist = self.get_min_dist(left_ranges)
self.right_dist = self.get_min_dist(right_ranges)
# 바라보는 각도 중에서 가장 가깝게 감지되는 것을 구한다
def get_min_dist(self, range_list):
valid_values = [x for x in range_list if x > 0.05 and x < 10.0]
if valid_values:
return min(valid_values)
return 99.9
# 주기적으로 주행을 계산하는 함수
def control_loop(self):
if not self.is_localized:
return
if self.front_dist < self.obs_threshold:
self.get_logger().warn(f"Obstacle Ahead! ({self.front_dist:.2f}m)")
self.avoid_obstacle()
return
self.follow_path()
# 장애물 회피 로직
def avoid_obstacle(self):
cmd = Twist()
cmd.linear.x = 0.0
# 좌우로 트는 각도에 따라 잘 피할수도 못 피할수도
if self.left_dist > self.right_dist:
cmd.angular.z = 0.7
self.get_logger().info("Turn Left!")
else:
cmd.angular.z = -0.7
self.get_logger().info("Turn Right!")
self.publisher_.publish(cmd)
def follow_path(self):
if self.current_waypoint_index >= len(self.path):
cmd = Twist()
self.publisher_.publish(cmd)
return
goal_x = self.path[self.current_waypoint_index][0]
goal_y = self.path[self.current_waypoint_index][1]
dx = goal_x - self.current_x
dy = goal_y - self.current_y
distance = sqrt(pow(dx, 2) + pow(dy, 2))
if distance < 0.2: # Goal tolerance
self.get_logger().info(f"Waypoint {self.current_waypoint_index} Reached!")
self.current_waypoint_index += 1
return
target_angle = atan2(dy, dx)
alpha = target_angle - self.current_yaw
if alpha > pi:
alpha -= 2 * pi
elif alpha < -pi:
alpha += 2 * pi
cmd = Twist()
cmd.linear.x = self.linear_velocity
cmd.angular.z = self.linear_velocity * (2.0 * sin(alpha)) / distance
if cmd.angular.z > 1.0:
cmd.angular.z = 1.0
if cmd.angular.z < -1.0:
cmd.angular.z = -1.0
self.publisher_.publish(cmd)
def main(args=None):
rclpy.init(args=args)
node = PurePursuitLogic()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py에 entry point 추가
'console_scripts': [
'pure_pursuit_node = my_pure_pursuit.pure_pursuit_node:main',
'pure_pursuit_obstacle = my_pure_pursuit.pure_pursuit_obstacle:main',
],
[INFO] [1776908512.294925402] [pure_pursuit_obstacle]: Let's Go!
[WARN] [1776908515.076139346] [pure_pursuit_obstacle]: Obstacle Ahead! (0.45m)
[INFO] [1776908515.076616246] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.175811968] [pure_pursuit_obstacle]: Obstacle Ahead! (0.45m)
[INFO] [1776908515.176176068] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.275554189] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.275938689] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.375710511] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.376072111] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.475674332] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.476111832] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.575912653] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.576458854] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.675671475] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.676152375] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.775851796] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.776414195] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.675909324] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908516.676430723] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.776036974] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908516.776422574] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.876043025] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908516.876559325] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.975721876] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908516.976189675] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.075806524] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.076275524] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.175671372] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.176099171] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.276073419] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.276528719] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.375668767] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.376042167] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.475826815] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.476194014] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.576037762] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.576582762] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.675920810] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908517.676313010] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.775760658] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908517.776181157] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.875756205] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908517.876159405] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.975878553] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908517.976380153] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908518.075891598] [pure_pursuit_obstacle]: Obstacle Ahead! (0.43m)
[INFO] [1776908518.076472298] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908518.175782443] [pure_pursuit_obstacle]: Obstacle Ahead! (0.43m)
[INFO] [1776908518.176317643] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.275619220] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908528.276130820] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.375652756] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908528.376089456] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.475729693] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.476088593] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.575735429] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.576127829] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.675712166] [pure_pursuit_obstacle]: Obstacle Ahead! (0.39m)
[INFO] [1776908528.676104666] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.775894902] [pure_pursuit_obstacle]: Obstacle Ahead! (0.39m)
[INFO] [1776908528.776397802] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.875795239] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.876366538] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.975773575] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.976278575] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.075738410] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908529.076099010] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.176102743] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908529.176591443] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.275590577] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908529.275964577] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.375758311] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908529.376210311] [pure_pursuit_obstacle]: Turn Left!
[INFO] [1776908551.375630595] [pure_pursuit_obstacle]: Waypoint 0 Reached!

A* (A-star) 알고리즘을 이용해 주행 노드 작성하기
A*(이하 에이스타)는 시작점에서 목표점까지 가는 비용이 가장 적은 경로를 탐색하는 알고리즘

이를 구현하기 위해서는 (필수까진 아닐 수 있지만) heapq 모듈을 이해할 필요가 있음
파이썬 heapq와 완전 이진 트리 구조
파이썬의 `heapq` 모듈은 이진 힙(Binary Heap) 알고리즘을 구현한 모듈
힙(Heap)은 내부적으로 완전 이진 트리 형태를 유지하며 배열(리스트)로 관리됨
완전 이진 트리란 아래 두 가지 조건을 만족하는 트리
ㄴ마지막 레벨을 제외하고 모든 레벨이 노드로 꽉 차 있어야 함
ㄴ마지막 레벨은 왼쪽부터 오른쪽으로 순서대로 채워져야 함
완전 이진 트리는 빈 공간 없이 데이터가 채워지므로,
복잡한 포인터 없이 리스트(List)의 인덱스만으로 부모-자식 관계를 표현할 수 있음
예를 들어 어떤 힙이 `[1, 3, 5, 7, 9, 8]` 과 같이 채워져 있다면 완전 이진 트리 도식은 다음과 같이 표현
1
/ \
3 5
/ \ /
7 9 8
특정 노드의 인덱스를 `k`라고 할 때, 인덱스 규칙은 다음과 같음
ㄴ왼쪽 자식 인덱스: `2*k + 1`
ㄴ오른쪽 자식 인덱스: `2*k + 2`
ㄴ부모 인덱스: `(k - 1) // 2`
`heapq` 모듈은 부모 노드의 값은 항상 자식 노드의 값보다 작거나 같다는 규칙을 준수함
파이썬의 `heapq` 모듈
별도의 객체를 생성하지 않고, 일반 리스트를 힙처럼 다룸
import heapq
# 이 상태에서는 아직 그냥 리스트임
heap = []
원소 추가
트리의 가장 끝에 원소를 넣고,
부모와 비교하며 자리를 찾아 올라감
heapq.heappush(heap, 4)
heapq.heappush(heap, 1)
heapq.heappush(heap, 7)
heapq.heappush(heap, 3)
print(heap) # [1, 3, 7, 4]
최솟값 삭제 및 반환
가장 작은 값(루트, 인덱스 0)을 제거하여 반환
힙 구조 재정비
min_val = heapq.heappop(heap)
print(min_val) # 1 (가장 작은 값)
print(heap) # [3, 4, 7]
기존 리스트를 힙으로 변환하는 경우 재배열이 진행
data = [4, 1, 7, 3, 8, 5]
heapq.heapify(data)
print(data) # [1, 3, 5, 4, 8, 7]
순수 파이썬으로 구현한 A* 알고리즘 기반 경로 계획
import heapq
# 0은 빈 공간, 1은 막힌 곳으로 표현
grid_map = [
[0, 0, 0, 0, 1, 0],
[1, 1, 1, 0, 1, 0],
[0, 0, 0, 0, 0, 0],
[0, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0]
]
# 맵 관리 힙큐에 들어갈 데이터 하나를 표현한 클래스
class Node:
def __init__(self, parent=None, position=None):
self.parent = parent
self.position = position
self.g = 0 # 실제 비용 : 현재 위치까지 온 거리
self.h = 0 # 휴리스틱 : 앞으로 이동해야 할 거리
self.f = 0 # 위 두 개를 합친 총 점수 : 낮을수록 좋아!
# 노드끼리는 위치가 같으면 같은거야!
def __eq__(self, other):
return self.position == other.position
# 노드끼리는 에이스타 점수가 낮은 애가 작은거야!
def __lt__(self, other):
return self.f < other.f
# 미로, 시작점, 끝점 주면 경로를 파악해볼게~
def astar(maze, start, end):
start_node = Node(None, start)
end_node = Node(None, end)
open_list = [] # 방문할 후보들 (우선순위 큐) : 가지 않은 길
closed_list = [] # 이미 방문한 곳 : 이미 지나온 길
heapq.heappush(open_list, start_node)
# 앞으로 갈 수 있는 길을 쌓아나가는 반복문
while open_list:
# f 값이 가장 작은 노드를 꺼냄
current_node = heapq.heappop(open_list)
closed_list.append(current_node)
# 목표 도착 확인
if current_node == end_node:
path = []
current = current_node
while current is not None:
path.append(current.position)
current = current.parent
return path[::-1] # 역순으로 리턴 (시작->끝)
children = [] # 내가 갈 수 있는 후보들 넣어놓기
for new_position in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
node_position = (current_node.position[0] + new_position[0], current_node.position[1] + new_position[1])
# 미로 밖으로 벗어나는 경로라면 다음 반복으로 넘어가야해
if node_position[0] > (len(maze) - 1) or node_position[0] < 0 or node_position[1] > (len(maze[0]) -1) or node_position[1] < 0:
continue
# 벽이거나, 미확인 위치라면 다음 반복으로 넘어가야해
if maze[node_position[0]][node_position[1]] != 0:
continue
new_node = Node(current_node, node_position)
children.append(new_node)
for child in children:
# 이미 지나온 길은 패스하고 다음 반복으로 넘어가
if child in closed_list: continue
child.g = current_node.g + 1
# 휴리스틱 (직선 거리) : 피타고라스 기반 직선거리
child.h = ((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2)
child.f = child.g + child.h
heapq.heappush(open_list, child)
return None
# 실행 : (0,0) 부터 (4,5) 까지 가는 최단 경로 구하기
path = astar(grid_map, (0, 0), (4, 5))
print(f"최단 경로: {path}")
steezer@DESKTOP-TF8J569:~$ mkdir -p ~/astar_study
steezer@DESKTOP-TF8J569:~$ cd ~/astar_study
steezer@DESKTOP-TF8J569:~/astar_study$ nano astar_basic.py
steezer@DESKTOP-TF8J569:~/astar_study$ python3 astar_basic.py
최단 경로: [(0, 0), (0, 1), (0, 2), (0, 3), (1, 3), (2, 3), (2, 4), (2, 5), (3, 5), (4, 5)]
단계별 완성
1단계 : Map 구독
로봇이 자신이 주행할 맵 데이터를 받아들이는 단계
1차원 배열로 들어오는 데이터를 다루기 쉬운 2차원 행렬로 변환하는 것이 핵심
import rclpy
from rclpy.node import Node
from nav_msgs.msg import OccupancyGrid # 맵(/map) 토픽의 타입
import numpy as np
class MapReader(Node):
def __init__(self):
super().__init__('step1_map_reader')
# 실제 맵을 받아서 초기화해야 하는 속성들
self.map_data = None
self.map_resolution = 0.05
self.map_width = 0
self.map_height = 0
self.map_origin = [0.0, 0.0]
self.subscription = self.create_subscription(
OccupancyGrid,
'/map',
self.map_callback,
10
)
def map_callback(self, msg):
self.map_resolution = msg.info.resolution
self.map_width = msg.info.width
self.map_height = msg.info.height
self.map_origin = [msg.info.origin.position.x, msg.info.origin.position.y]
# 맵 데이터는 1차원 리스트 형태이므로, 다루기 쉬운 2차원 형태로 reshape 하기
# 이렇게 하면 특정 좌표가 벽인지 길인지 판단 가능. 예를 들면... self.map_data[y][x]의 값을 확인하는 식으로
self.map_data = np.array(msg.data).reshape((self.map_height, self.map_width))
self.get_logger().info(f"Map Received: {self.map_width} x {self.map_height}")
def main(args=None):
rclpy.init(args=args)
node = MapReader()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2단계 : 위치와 목표
로봇이 어디에 있는지, 어디로 가야할지 설정하는 작업이 필요
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
from math import atan2
class PoseGoalListener(Node):
def __init__(self):
super().__init__('step2_pose_goal')
self.current_pose = [0.0, 0.0] # 현재 위치
self.current_yaw = 0.0 # 현재 각도
self.goal_pose = [0.0, 0.0] # 목적지
self.sub_pose = self.create_subscription(
PoseWithCovarianceStamped,
'/amcl_pose',
self.pose_callback,
10
)
self.sub_goal = self.create_subscription(
PoseStamped,
'/goal_pose',
self.goal_callback,
10
)
def pose_callback(self, msg):
# x, y 위치
self.current_pose = [msg.pose.pose.position.x, msg.pose.pose.position.y]
q = msg.pose.pose.orientation # 쿼터니언
# 쿼터니언은 회전을 나타내는 4개 값의 묶음으로 로봇의 방향 정보
# 다음은 쿼터니언을 회전 각도를 나타내는 yaw(angular.z) 값으로 변환하는 공식
self.current_yaw = atan2(2.0*(q.w*q.z + q.x*q.y), 1.0-2.0*(q.y*q.y + q.z*q.z))
self.get_logger().info(f"Current: {self.current_pose}, Yaw: {self.current_yaw:.2f}")
def goal_callback(self, msg):
self.goal_pose = [msg.pose.position.x, msg.pose.position.y]
self.get_logger().info(f"New Goal: {self.goal_pose}")
def main(args=None):
rclpy.init(args=args)
node = PoseGoalListener()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
3단계 : 알고리즘 이용해 경로 찾기
맵, 시작점, 목표점을 받아 최단 경로 리스트 만들기
import heapq
from math import sqrt, atan2, sin, pi
import math
# 경로상의 한 점을 의미하는 클래스
class NodeAStar:
def __init__(self, parent=None, position=None):
self.parent = parent
self.position = position
self.g = 0
self.h = 0
self.f = 0
def __eq__(self, other):
return self.position == other.position
def __lt__(self, other):
return self.f < other.f
def run_astar(map_data, start, end, width, height):
# 시작위치와 끝위치가 지도 안에 있지 않을 경우에는 None 반환하고 끝내!
if not (0 <= start[0] < height and 0 <= start[1] < width): return None
if not (0 <= end[0] < height and 0 <= end[1] < width): return None
start_node = NodeAStar(None, start)
end_node = NodeAStar(None, end)
open_list = []
heapq.heappush(open_list, start_node) # 가지 않은 길에 시작 위치 추가하고 시~작!
visited = set() # 중복을 허용하지 않는 컬렉터
moves = [(0,1), (0,-1), (1,0), (-1,0), (1,1), (1,-1), (-1,1), (-1,-1)]
while open_list:
current_node = heapq.heappop(open_list)
visited.add(current_node.position)
# 만약 도착한 상태라면 지나온 길을 쭉 조회하고파
if current_node.position == end_node.position:
path = []
current = current_node
while current:
path.append(current.position)
current = current.parent
return path[::-1] # 리스트 역순 조회
for move in moves:
ny, nx = current_node.position[0] + move[0], current_node.position[1] + move[1]
if not (0 <= ny < height and 0 <= nx < width):
continue
if map_data[ny][nx] != 0:
continue
if (ny, nx) in visited:
continue
new_node = NodeAStar(current_node, (ny, nx))
new_node.g = current_node.g + 1
new_node.h = math.sqrt((ny - end[0])**2 + (nx - end[1])**2)
new_node.f = new_node.g + new_node.h
heapq.heappush(open_list, new_node)
return None
# 만들어진 경로를 따라 로봇을 제어하는 부분
def calculate_velocity(current_pose, current_yaw, target_point, linear_vel):
dx = target_point[0] - current_pose[0]
dy = target_point[1] - current_pose[1]
distance = sqrt(dx**2 + dy**2)
target_angle = atan2(dy, dx)
alpha = target_angle - current_yaw
if alpha > pi: alpha -= 2 * pi
elif alpha < -pi: alpha += 2 * pi
angular_vel = linear_vel * (2.0 * sin(alpha)) / distance
return angular_vel
'로보테크AI' 카테고리의 다른 글
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/28 (0) | 2026.04.28 |
|---|---|
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/24 (0) | 2026.04.24 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/22 (0) | 2026.04.22 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/21[멘토링] (0) | 2026.04.21 |
| 융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/20 (0) | 2026.04.20 |