로보테크AI

융합_로보테크 AI 자율주행 로봇 개발자 과정-26/04/23

steezer 2026. 4. 23. 18:30

직접 주행

 

Pure Pursuit

자동차나 로봇이 경로 상의 한 지점(Look-ahead Point)을 바라보며 부드러운 곡선을 그리며 따라가게 하는 기법

 

실습 과제 : 목적지 바꿔가며 주행하기

`self.path` 리스트 내에 정의된 목적지 좌표를 파라미터를 통해 전달받을 수 있도록 위 노드를 수정

파라미터 전달할 때마다 바뀐 목적지를 향해 주행할 수 있도록 하는 것이 핵심

 

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseWithCovarianceStamped
from math import pow, atan2, sqrt, sin, cos, pi
from rcl_interfaces.msg import SetParametersResult

class PurePursuit(Node):
    def __init__(self):
        super().__init__('pure_pursuit_param_node')

        # added part 1 : declare parameter
        self.declare_parameter('goal_x', 0.0)
        self.declare_parameter('goal_y', 0.0)
        self.add_on_set_parameters_callback(self.parameter_callback)

        # added part 2 : initialize variable via parameter
        self.goal_x = self.get_parameter('goal_x').value
        self.goal_y = self.get_parameter('goal_y').value

        self.lookahead_distance = 0.5
        self.linear_velocity = 0.2
        self.goal_tolerance = 0.2

        self.path = [
            [self.goal_x, self.goal_y]
        ]
        self.current_waypoint_index = 0

        self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
        self.subscription_ = self.create_subscription(
            PoseWithCovarianceStamped,
            '/amcl_pose',
            self.pose_callback,
            10
        )

        self.current_x = 0.0
        self.current_y = 0.0
        self.current_yaw = 0.0
        self.is_localized = False

        self.timer = self.create_timer(0.5, self.control_loop)
        self.get_logger().info("Pure Pursuit Node Started! Waiting for AMCL pose...")

    # added part 3 : callback when parameter set
    def parameter_callback(self, params):
        for param in params:
            if param.name == 'goal_x':
                self.goal_x = param.value
                self.get_logger().info(f'goal_x updated: {self.goal_x}')
            if param.name == 'goal_y':
                self.goal_y = param.value
                self.get_logger().info(f'goal_y updated: {self.goal_y}')

        self.path = [
            [self.goal_x, self.goal_y]
        ]
        self.current_waypoint_index = 0
        return SetParametersResult(successful=True)

    def pose_callback(self, msg):
        self.get_logger().info(f"pose callback : {type(msg)}")
        self.current_x = msg.pose.pose.position.x
        self.current_y = msg.pose.pose.position.y

        # formula to get current yaw (current yaw? angular.z degree!)
        q = msg.pose.pose.orientation
        self.current_yaw = atan2(
            2.0 * (q.w * q.z + q.x * q.y),
            1.0 - 2.0 * (q.y * q.y + q.z * q.z)
        )
        self.is_localized = True

    def control_loop(self):
        if not self.is_localized:
            return

        if self.current_waypoint_index >= len(self.path):
            self.stop_robot()
            return

        goal_x = self.path[self.current_waypoint_index][0]
        goal_y = self.path[self.current_waypoint_index][1]

        dx = goal_x - self.current_x
        dy = goal_y - self.current_y
        distance = sqrt(pow(dx, 2) + pow(dy, 2))

        if distance < self.goal_tolerance:
            self.get_logger().info(f"Waypoint {self.current_waypoint_index} Reached!")
            self.current_waypoint_index += 1
            return

        target_angle = atan2(dy, dx)
        alpha = target_angle - self.current_yaw

        if alpha > pi:
            alpha -= 2 * pi
        elif alpha < -pi:
            alpha += 2 * pi

        angular_velocity = self.linear_velocity * (2.0 * sin(alpha)) / distance

        cmd = Twist()
        cmd.linear.x = self.linear_velocity
        cmd.angular.z = angular_velocity

        if cmd.angular.z > 1.0:
            cmd.angular.z = 1.0
        if cmd.angular.z < -1.0:
            cmd.angular.z = -1.0

        self.publisher_.publish(cmd)

    def stop_robot(self):
        cmd = Twist()
        cmd.linear.x = 0.0
        cmd.angular.z = 0.0
        self.publisher_.publish(cmd)
        self.get_logger().info("All waypoints completed. Robot Stopped.")

def main(args=None):
    rclpy.init(args=args)
    node = PurePursuit()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.stop_robot()
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

실습 과제 : 위 내용을 실제 로봇에 적용해보기

장애물 피하면서 가는 단순 제어 코드 

실습 순서 

 

(0. 맵 새로 생성 ->  위치 추정 런치 파일 실행, 주행 노드 실행, 초기 위치 알려주기)

1. 시뮬레이션 상에서 장애물 피해가는지 확인
2. 확인됐으면, 실제 로봇을 바닥에 간단한 미로를 만들어 주행시켜보기

 

파라미터 3개는 실험으로 조정

목적지 좌표는 맵 상의 좌표로(1.8, -0.6)

 

turtlebot3_ws/src/my_pure_pursuit/my_pure_pursuit/pure_pursuit_obstacle.py

# pure_pursuit_obstacle.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Twist, PoseWithCovarianceStamped
from sensor_msgs.msg import LaserScan
from math import pow, atan2, sqrt, sin, cos, pi


class PurePursuitLogic(Node):
    def __init__(self):
        super().__init__('pure_pursuit_obstacle')

        # 사용자가 정해야 하는 파라미터 삼형제
        self.lookahead_distance = 0.5   # 전방 주시 거리
        self.linear_velocity = 0.15     # 이동 속도
        self.obs_threshold = 0.45       # 장애물 감지 거리

        # 목표 위치 (맵 상의 좌표)
        self.path = [
            [1.8, -0.6]
        ]
        self.current_waypoint_index = 0

        # AMCL pose용 QoS (TRANSIENT_LOCAL 필수)
        amcl_qos = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,
            durability=DurabilityPolicy.TRANSIENT_LOCAL,
            history=HistoryPolicy.KEEP_LAST,
            depth=1
        )

        # LiDAR scan용 QoS (BEST_EFFORT이 일반적)
        scan_qos = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            durability=DurabilityPolicy.VOLATILE,
            history=HistoryPolicy.KEEP_LAST,
            depth=5
        )

        self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
        self.pose_sub = self.create_subscription(
            PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, amcl_qos
        )
        self.scan_sub = self.create_subscription(
            LaserScan, '/scan', self.scan_callback, scan_qos
        )

        self.current_x = 0.0
        self.current_y = 0.0
        self.current_yaw = 0.0
        self.is_localized = False

        # initial value.... 99.9 means 'no obstacle'
        self.left_dist = 99.9
        self.front_dist = 99.9
        self.right_dist = 99.9

        self.timer = self.create_timer(0.1, self.control_loop)
        self.get_logger().info("Let's Go!")

    # 위치와 회전 상태를 받으면 그에 대응하여 각도를 계산
    def pose_callback(self, msg):
        self.current_x = msg.pose.pose.position.x
        self.current_y = msg.pose.pose.position.y
        q = msg.pose.pose.orientation
        self.current_yaw = atan2(
            2.0 * (q.w * q.z + q.x * q.y),
            1.0 - 2.0 * (q.y * q.y + q.z * q.z)
        )
        self.is_localized = True

    def scan_callback(self, msg):
        # 장애물 바라보는 각도
        front_ranges = msg.ranges[0:40] + msg.ranges[-40:]

        # 좌우 비교를 위한 각도
        left_ranges = msg.ranges[20:70]
        right_ranges = msg.ranges[-70:-20]

        self.front_dist = self.get_min_dist(front_ranges)
        self.left_dist = self.get_min_dist(left_ranges)
        self.right_dist = self.get_min_dist(right_ranges)

    # 바라보는 각도 중에서 가장 가깝게 감지되는 것을 구한다
    def get_min_dist(self, range_list):
        valid_values = [x for x in range_list if x > 0.05 and x < 10.0]
        if valid_values:
            return min(valid_values)
        return 99.9

    # 주기적으로 주행을 계산하는 함수
    def control_loop(self):
        if not self.is_localized:
            return

        if self.front_dist < self.obs_threshold:
            self.get_logger().warn(f"Obstacle Ahead! ({self.front_dist:.2f}m)")
            self.avoid_obstacle()
            return

        self.follow_path()

    # 장애물 회피 로직
    def avoid_obstacle(self):
        cmd = Twist()
        cmd.linear.x = 0.0

        # 좌우로 트는 각도에 따라 잘 피할수도 못 피할수도
        if self.left_dist > self.right_dist:
            cmd.angular.z = 0.7
            self.get_logger().info("Turn Left!")
        else:
            cmd.angular.z = -0.7
            self.get_logger().info("Turn Right!")

        self.publisher_.publish(cmd)

    def follow_path(self):
        if self.current_waypoint_index >= len(self.path):
            cmd = Twist()
            self.publisher_.publish(cmd)
            return

        goal_x = self.path[self.current_waypoint_index][0]
        goal_y = self.path[self.current_waypoint_index][1]

        dx = goal_x - self.current_x
        dy = goal_y - self.current_y
        distance = sqrt(pow(dx, 2) + pow(dy, 2))

        if distance < 0.2:  # Goal tolerance
            self.get_logger().info(f"Waypoint {self.current_waypoint_index} Reached!")
            self.current_waypoint_index += 1
            return

        target_angle = atan2(dy, dx)
        alpha = target_angle - self.current_yaw
        if alpha > pi:
            alpha -= 2 * pi
        elif alpha < -pi:
            alpha += 2 * pi

        cmd = Twist()
        cmd.linear.x = self.linear_velocity
        cmd.angular.z = self.linear_velocity * (2.0 * sin(alpha)) / distance

        if cmd.angular.z > 1.0:
            cmd.angular.z = 1.0
        if cmd.angular.z < -1.0:
            cmd.angular.z = -1.0

        self.publisher_.publish(cmd)


def main(args=None):
    rclpy.init(args=args)
    node = PurePursuitLogic()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

 

setup.py에 entry point 추가

'console_scripts': [
    'pure_pursuit_node = my_pure_pursuit.pure_pursuit_node:main',
    'pure_pursuit_obstacle = my_pure_pursuit.pure_pursuit_obstacle:main',
],
더보기

[INFO] [1776908512.294925402] [pure_pursuit_obstacle]: Let's Go!
[WARN] [1776908515.076139346] [pure_pursuit_obstacle]: Obstacle Ahead! (0.45m)
[INFO] [1776908515.076616246] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.175811968] [pure_pursuit_obstacle]: Obstacle Ahead! (0.45m)
[INFO] [1776908515.176176068] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.275554189] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.275938689] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.375710511] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.376072111] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.475674332] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.476111832] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.575912653] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.576458854] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.675671475] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.676152375] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908515.775851796] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908515.776414195] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.675909324] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908516.676430723] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.776036974] [pure_pursuit_obstacle]: Obstacle Ahead! (0.44m)
[INFO] [1776908516.776422574] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.876043025] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908516.876559325] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908516.975721876] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908516.976189675] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.075806524] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.076275524] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.175671372] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.176099171] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.276073419] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.276528719] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.375668767] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.376042167] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.475826815] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.476194014] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.576037762] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908517.576582762] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.675920810] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908517.676313010] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.775760658] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908517.776181157] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.875756205] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908517.876159405] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908517.975878553] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908517.976380153] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908518.075891598] [pure_pursuit_obstacle]: Obstacle Ahead! (0.43m)
[INFO] [1776908518.076472298] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908518.175782443] [pure_pursuit_obstacle]: Obstacle Ahead! (0.43m)
[INFO] [1776908518.176317643] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.275619220] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908528.276130820] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.375652756] [pure_pursuit_obstacle]: Obstacle Ahead! (0.41m)
[INFO] [1776908528.376089456] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.475729693] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.476088593] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.575735429] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.576127829] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.675712166] [pure_pursuit_obstacle]: Obstacle Ahead! (0.39m)
[INFO] [1776908528.676104666] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.775894902] [pure_pursuit_obstacle]: Obstacle Ahead! (0.39m)
[INFO] [1776908528.776397802] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.875795239] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.876366538] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908528.975773575] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908528.976278575] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.075738410] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908529.076099010] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.176102743] [pure_pursuit_obstacle]: Obstacle Ahead! (0.40m)
[INFO] [1776908529.176591443] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.275590577] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908529.275964577] [pure_pursuit_obstacle]: Turn Left!
[WARN] [1776908529.375758311] [pure_pursuit_obstacle]: Obstacle Ahead! (0.42m)
[INFO] [1776908529.376210311] [pure_pursuit_obstacle]: Turn Left!
[INFO] [1776908551.375630595] [pure_pursuit_obstacle]: Waypoint 0 Reached!


A* (A-star) 알고리즘을 이용해 주행 노드 작성하기

 

A*(이하 에이스타)는 시작점에서 목표점까지 가는 비용이 가장 적은 경로를 탐색하는 알고리즘

 

 

이를 구현하기 위해서는 (필수까진 아닐 수 있지만) heapq 모듈을 이해할 필요가 있음

 

 

파이썬 heapq와 완전 이진 트리 구조

파이썬의 `heapq` 모듈은 이진 힙(Binary Heap) 알고리즘을 구현한 모듈

힙(Heap)은 내부적으로 완전 이진 트리 형태를 유지하며 배열(리스트)로 관리됨

 

완전 이진 트리란 아래 두 가지 조건을 만족하는 트리
ㄴ마지막 레벨을 제외하고 모든 레벨이 노드로 꽉 차 있어야 함
ㄴ마지막 레벨은 왼쪽부터 오른쪽으로 순서대로 채워져야 함

 

완전 이진 트리는 빈 공간 없이 데이터가 채워지므로,

복잡한 포인터 없이 리스트(List)의 인덱스만으로 부모-자식 관계를 표현할 수 있음

예를 들어 어떤 힙이 `[1, 3, 5, 7, 9, 8]` 과 같이 채워져 있다면 완전 이진 트리 도식은 다음과 같이 표현

더보기

      1        
    /   \
   3     5     
  / \   /
 7   9 8       

 

특정 노드의 인덱스를 `k`라고 할 때, 인덱스 규칙은 다음과 같음

ㄴ왼쪽 자식 인덱스: `2*k + 1`
ㄴ오른쪽 자식 인덱스: `2*k + 2`
ㄴ부모 인덱스: `(k - 1) // 2`

`heapq` 모듈은 부모 노드의 값은 항상 자식 노드의 값보다 작거나 같다는 규칙을 준수함

 

파이썬의 `heapq` 모듈

별도의 객체를 생성하지 않고, 일반 리스트를 힙처럼 다룸

import heapq

# 이 상태에서는 아직 그냥 리스트임 
heap = []

 

원소 추가

트리의 가장 끝에 원소를 넣고,

부모와 비교하며 자리를 찾아 올라감

 

heapq.heappush(heap, 4)
heapq.heappush(heap, 1)
heapq.heappush(heap, 7)
heapq.heappush(heap, 3)

print(heap) # [1, 3, 7, 4]

 

최솟값 삭제 및 반환

가장 작은 값(루트, 인덱스 0)을 제거하여 반환

힙 구조 재정비

min_val = heapq.heappop(heap)
print(min_val) # 1 (가장 작은 값)
print(heap)    # [3, 4, 7]

 

기존 리스트를 힙으로 변환하는 경우 재배열이 진행

data = [4, 1, 7, 3, 8, 5]
heapq.heapify(data)
print(data) # [1, 3, 5, 4, 8, 7]

 

순수 파이썬으로 구현한 A* 알고리즘 기반 경로 계획

import heapq

# 0은 빈 공간, 1은 막힌 곳으로 표현
grid_map = [
    [0, 0, 0, 0, 1, 0],
    [1, 1, 1, 0, 1, 0],
    [0, 0, 0, 0, 0, 0],
    [0, 1, 1, 1, 1, 0],
    [0, 0, 0, 0, 0, 0]
]

# 맵 관리 힙큐에 들어갈 데이터 하나를 표현한 클래스 
class Node:
    def __init__(self, parent=None, position=None):
        self.parent = parent
        self.position = position
        self.g = 0  # 실제 비용 : 현재 위치까지 온 거리
        self.h = 0  # 휴리스틱 : 앞으로 이동해야 할 거리 
        self.f = 0  # 위 두 개를 합친 총 점수 : 낮을수록 좋아! 

    # 노드끼리는 위치가 같으면 같은거야! 
    def __eq__(self, other):
        return self.position == other.position
        
    # 노드끼리는 에이스타 점수가 낮은 애가 작은거야!
    def __lt__(self, other):
        return self.f < other.f

# 미로, 시작점, 끝점 주면 경로를 파악해볼게~
def astar(maze, start, end):
    start_node = Node(None, start)
    end_node = Node(None, end)
    
    open_list = []      # 방문할 후보들 (우선순위 큐) : 가지 않은 길
    closed_list = []    # 이미 방문한 곳 : 이미 지나온 길 
    
    heapq.heappush(open_list, start_node)
    
    # 앞으로 갈 수 있는 길을 쌓아나가는 반복문 
    while open_list:
        # f 값이 가장 작은 노드를 꺼냄
        current_node = heapq.heappop(open_list)
        closed_list.append(current_node)

        # 목표 도착 확인
        if current_node == end_node:
            path = []
            current = current_node
            while current is not None:
                path.append(current.position)
                current = current.parent
            return path[::-1] # 역순으로 리턴 (시작->끝)

        children = [] # 내가 갈 수 있는 후보들 넣어놓기 
        for new_position in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
            node_position = (current_node.position[0] + new_position[0], current_node.position[1] + new_position[1])

            # 미로 밖으로 벗어나는 경로라면 다음 반복으로 넘어가야해
            if node_position[0] > (len(maze) - 1) or node_position[0] < 0 or node_position[1] > (len(maze[0]) -1) or node_position[1] < 0:
                continue

            # 벽이거나, 미확인 위치라면 다음 반복으로 넘어가야해 
            if maze[node_position[0]][node_position[1]] != 0:
                continue

            new_node = Node(current_node, node_position)
            children.append(new_node)

        for child in children:
            # 이미 지나온 길은 패스하고 다음 반복으로 넘어가
            if child in closed_list: continue

            child.g = current_node.g + 1
            
            # 휴리스틱 (직선 거리) : 피타고라스 기반 직선거리 
            child.h = ((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2)
            child.f = child.g + child.h
            
            heapq.heappush(open_list, child)

    return None 

# 실행 : (0,0) 부터 (4,5) 까지 가는 최단 경로 구하기
path = astar(grid_map, (0, 0), (4, 5))
print(f"최단 경로: {path}")

 

steezer@DESKTOP-TF8J569:~$ mkdir -p ~/astar_study
steezer@DESKTOP-TF8J569:~$ cd ~/astar_study
steezer@DESKTOP-TF8J569:~/astar_study$ nano astar_basic.py
steezer@DESKTOP-TF8J569:~/astar_study$ python3 astar_basic.py

 

최단 경로: [(0, 0), (0, 1), (0, 2), (0, 3), (1, 3), (2, 3), (2, 4), (2, 5), (3, 5), (4, 5)]

 

 

단계별 완성

 

1단계 : Map 구독

로봇이 자신이 주행할 맵 데이터를 받아들이는 단계

1차원 배열로 들어오는 데이터를 다루기 쉬운 2차원 행렬로 변환하는 것이 핵심

import rclpy
from rclpy.node import Node
from nav_msgs.msg import OccupancyGrid # 맵(/map) 토픽의 타입
import numpy as np

class MapReader(Node):
    def __init__(self):
        super().__init__('step1_map_reader')
        
        # 실제 맵을 받아서 초기화해야 하는 속성들 
        self.map_data = None
        self.map_resolution = 0.05
        self.map_width = 0
        self.map_height = 0
        self.map_origin = [0.0, 0.0]

        self.subscription = self.create_subscription(
            OccupancyGrid,
            '/map',
            self.map_callback,
            10
        )

    def map_callback(self, msg):
        self.map_resolution = msg.info.resolution
        self.map_width = msg.info.width
        self.map_height = msg.info.height
        self.map_origin = [msg.info.origin.position.x, msg.info.origin.position.y]

        # 맵 데이터는 1차원 리스트 형태이므로, 다루기 쉬운 2차원 형태로 reshape 하기
        # 이렇게 하면 특정 좌표가 벽인지 길인지 판단 가능. 예를 들면... self.map_data[y][x]의 값을 확인하는 식으로
        self.map_data = np.array(msg.data).reshape((self.map_height, self.map_width))
        
        self.get_logger().info(f"Map Received: {self.map_width} x {self.map_height}")

def main(args=None):
    rclpy.init(args=args)
    node = MapReader()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

2단계 : 위치와 목표

로봇이 어디에 있는지, 어디로 가야할지 설정하는 작업이 필요

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
from math import atan2

class PoseGoalListener(Node):
    def __init__(self):
        super().__init__('step2_pose_goal')
        
        self.current_pose = [0.0, 0.0] # 현재 위치
        self.current_yaw = 0.0 # 현재 각도
        self.goal_pose = [0.0, 0.0] # 목적지 

        self.sub_pose = self.create_subscription(
            PoseWithCovarianceStamped,
            '/amcl_pose',
            self.pose_callback,
            10
        )
        
        self.sub_goal = self.create_subscription(
            PoseStamped,
            '/goal_pose',
            self.goal_callback,
            10
        )

    def pose_callback(self, msg):
        # x, y 위치
        self.current_pose = [msg.pose.pose.position.x, msg.pose.pose.position.y]
        
        q = msg.pose.pose.orientation # 쿼터니언
        
        # 쿼터니언은 회전을 나타내는 4개 값의 묶음으로 로봇의 방향 정보
        # 다음은 쿼터니언을 회전 각도를 나타내는 yaw(angular.z) 값으로 변환하는 공식
        self.current_yaw = atan2(2.0*(q.w*q.z + q.x*q.y), 1.0-2.0*(q.y*q.y + q.z*q.z))
        
        self.get_logger().info(f"Current: {self.current_pose}, Yaw: {self.current_yaw:.2f}")

    def goal_callback(self, msg):
        self.goal_pose = [msg.pose.position.x, msg.pose.position.y]
        self.get_logger().info(f"New Goal: {self.goal_pose}")

def main(args=None):
    rclpy.init(args=args)
    node = PoseGoalListener()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

3단계 : 알고리즘 이용해 경로 찾기

맵, 시작점, 목표점을 받아 최단 경로 리스트 만들기

import heapq
from math import sqrt, atan2, sin, pi
import math

# 경로상의 한 점을 의미하는 클래스
class NodeAStar:
    def __init__(self, parent=None, position=None):
        self.parent = parent
        self.position = position
        self.g = 0
        self.h = 0
        self.f = 0

    def __eq__(self, other):
        return self.position == other.position
    
    def __lt__(self, other):
        return self.f < other.f

def run_astar(map_data, start, end, width, height):
    # 시작위치와 끝위치가 지도 안에 있지 않을 경우에는 None 반환하고 끝내!
    if not (0 <= start[0] < height and 0 <= start[1] < width): return None
    if not (0 <= end[0] < height and 0 <= end[1] < width): return None

    start_node = NodeAStar(None, start)
    end_node = NodeAStar(None, end)
    
    open_list = []
    heapq.heappush(open_list, start_node) # 가지 않은 길에 시작 위치 추가하고 시~작!
    visited = set() # 중복을 허용하지 않는 컬렉터 

    moves = [(0,1), (0,-1), (1,0), (-1,0), (1,1), (1,-1), (-1,1), (-1,-1)]

    while open_list:
        current_node = heapq.heappop(open_list)
        visited.add(current_node.position)

        # 만약 도착한 상태라면 지나온 길을 쭉 조회하고파 
        if current_node.position == end_node.position:
            path = []
            current = current_node
            while current:
                path.append(current.position)
                current = current.parent
            return path[::-1] # 리스트 역순 조회 
	
        for move in moves:
            ny, nx = current_node.position[0] + move[0], current_node.position[1] + move[1]
            
            if not (0 <= ny < height and 0 <= nx < width):
                continue
            
            if map_data[ny][nx] != 0:
                continue
            
            if (ny, nx) in visited:
                continue

            new_node = NodeAStar(current_node, (ny, nx))
            new_node.g = current_node.g + 1
            new_node.h = math.sqrt((ny - end[0])**2 + (nx - end[1])**2)
            new_node.f = new_node.g + new_node.h
            
            heapq.heappush(open_list, new_node)
            
    return None
    
# 만들어진 경로를 따라 로봇을 제어하는 부분
def calculate_velocity(current_pose, current_yaw, target_point, linear_vel):
    dx = target_point[0] - current_pose[0]
    dy = target_point[1] - current_pose[1]
    
    distance = sqrt(dx**2 + dy**2)
    
    target_angle = atan2(dy, dx)
    alpha = target_angle - current_yaw
    
    if alpha > pi: alpha -= 2 * pi
    elif alpha < -pi: alpha += 2 * pi
    
    angular_vel = linear_vel * (2.0 * sin(alpha)) / distance
    
    return angular_vel