概要
つくばチャレンジ2024を完走した際の実装内容は以下記事にまとめましたが、gpt-4oのVLMを用いて障害物回避判定を行う部分については、少し新規性があるかなと思うので、本記事で少し詳しくまとめました。
今回の実装で自律走行というリアルタイム性の高いシステムに、うまくクラウドベースのLLM,VLMを組み込めたか?というと、もう少しうまい切り口があったのでは…とも思うので、2025年はローカルLLMを使ってもう少し綺麗に組み込む方法を考えていきたいです。
つくばチャレンジの概要、全体システム、ソースコードは↓を参照してください。
www.robotech-note.com
自律走行ロボットにLLM、VLMをどう組み込んでいくか
2024年のつくばチャレンジに参加するうえで、新しい挑戦として、自律走行ロボットにクラウドベースのLLM,VLMを活用した何かを導入したい!と考えて開発を行いました。
まずはクラウドベースのLLM,VLMを自律走行ロボットに適用するための強みと課題を整理してみます。
強味
クラウドベースのLLM,VLMが使いたい!と思っても、そもそも使うメリットがなければ何の意味がありません、それではメリットが何かと考えると、以下の3つかなと感じました。
強味1 コンテキストを考慮した状況認識が行える
LLM,VLMは与えた言語・画像情報をもとに認識を行い、そこの言語ベースの常識的知識をもとに推論することができます。そのため、ただ画像認識等を行うよりももう一歩深い推論、状況認識ができると期待できます。とはいえ言語的知識が主なので限界はありますが…。
強味2 モデルの学習が不要
これはシンプル、そもそもクラウド上にサービス化されているので、モデルの学習が不要です。出力フォーマットに関しても言語で指示すればある程度その通りの出力を出してくれます。便利なやつめ。
強味3 その場の状況・やりたいことに応じたソースコードを出力できる
googleのSayCanをはじめとする、強化学習×LLMでLLMを使ってロボットを制御しようという試みですね。本当はこの流れを汲んだシステムを構築したかった・・・(はい、今回はこの強みはいかせていません)
課題
強味があれば課題もある。ということで課題も列挙していきます。この課題がなかなか厄介なんですよねー。
課題1 認識処理に一定の時間がかかる
gpt-4oのVLMを使おうと思うと、画質を落としても認識結果が返ってくるまで、およそ3~4秒程度を要します。クラウドサービスを使い、かつリッチな認識を行う以上やむを得ないことではありますが、リアルタイム性を求められるところで使うには致命的です。
例えば障害物検知に使おうとした場合、見つけてロボットに停止指令を送るころにはロボットはぶつかってしまっていますし、信号認識に使おうとしても遅延が大きすぎて認識してから動き出したのでは信号が変わってしまいます。(ということで、信号認識タスクは軽量なyoloを使用しました。パット見た目、gpt-4oでも信号と色は正確に認識できていそうでした)
課題2 結果が保証されない、定量的に評価できない
これが一番大きな課題で、現状LLM/VLMを用いたシステムは(かなり安定してきたとはいえ)確実な回答を返す保証がありませんし、その精度も定量的に評価することが困難です。よって、つくばチャレンジのような"失敗できないタスク"のミッションに使うにはリスクが高すぎるのが現状です。
課題3 認識のたびにお金がかかる
これは仕方ない!社会人なら払うしかない!
と、冗談はさておき、gpt-4oなら(2024年10月当時)1リクエスト0.5円程度だったので無視できるコストではあります。とはいえ、毎秒認識を行うような設計にしてしまうと、1時間で1800円。ちょっとロボットを動かす際に気が重くなりそうです。
今回の使い道:障害物を見つけて停止した後の回避動作の可否判定
考えた結果、やはりその場でコード生成させてロボットを動かす・・・といった動作はリスクが高すぎるので、「間違えても自律走行には大きな影響を及ぼさないけれど、うまく認識できるといいことが起きるタスク」に対してLLM/VLMを適用することとしました。
具体的には、Lidarを使って障害物認識を行いロボットが停止した後、停止障害物(カラーコーン等)を見つけて停止したのか、移動障害物(人やロボット等)をLLM/VLMを用いて判定し、静止障害物であればすぐに回避動作を行い、移動障害物であれば、相手が移動してくれることを期待してしばらく停止して待機するシステムにしました。回避をするときめられた走行ルートから外れるのでどうしても失敗しやすくなるため、可能な限り回避しないようにする戦略です。
このシステムをYOLO等を用いて実装しようとすると、静止障害物としてあり得そうな物体・移動障害物として考えられる物体をそれぞれアノテーション・学習し、モデルの認識結果をフィルタリングして目の前の物体を決定し、避けるかどうか判断する必要がありますが、LLM/VLMを使えば文字ベースで指示をしてAPIをたたくだけですみます。
この部分であれば、たとえ認識ミスをして静止障害物を移動障害物と認識して回避行動を起こしたとしても(多少走行失敗リスクは高くなりますが)問題ない点が好都合。認識モジュールがなければすべて回避動作を行うところを、移動障害物だから待機する、という動作ができるようになるだけでも、成功確率が上がっていると考えられます。
実装
実装方法は極めてシンプル、以下の流れで動作させるスクリプトを実装しました。ソースコードも一番最後に乗せておきます。
- ロボットが5秒以上停止したら、ロボット前方カメラの画像を取得する
- 以下のプロンプトと画像をもとにgpt-4oのAPIをたたく
- 返答の最初の1文字をパースし、認識結果をrosに流す
- 認識結果をもとにロボットの計画部が動作を決める
あなたはロボットオペレータです。ロボットが前方1m以内に障害物を検出して止まっており、その処理を考えてください\ ロボットのセンサは必ずしも100%の信頼性があるわけではないため、誤検知の可能性もあります。\ ロボットの前方に取り付けられたカメラの映像をもとにロボットの次の動作を判断し、\ 次のどのような行動を起こすべきかを1行目に数字だけで回答してください。\ 誤検知の可能性が高く、特に障害物がないので前に進んでも良さそうな場合は0,\ 障害物が前方にありそう、かつ、それが人やロボット等の移動障害物の場合にはしばらく待機して相手の移動を待つ戦略を取り、数値としては1、\ カラーコーン等の静止障害物の場合は、回避する必要があるため2を出力し、改行して理由を出力してください\ その際、わかるようであれば右か左かどちら方向に回避すればよりスムーズに回避できるかも教えてください
認識結果
上記アルゴリズムを走行中に活用した結果、基本的にはほとんどの環境で良好に動作し、本走行での安定した自律走行に活用できました。一方で、誤検知例のように、認識はできていてもラベルが間違っている等の誤検知も稀に存在しており、やはり、これをクリティカルなミッションに使うのは怖いという印象を抱きました。


まとめ
自律走行ロボットにLLM/VLMを使ってみた実例をまとめました。高度な認識が簡単な実装で実現できた一方で、認識結果の精度、信頼性を正確に評価するすべがないといった課題は当初の想像通り存在しており、現段階では使い道がある程度限定される印象です。
ここで紹介したケースのように、+αの要素としてLLM/VLMのうまい使いどころを見つけていく必要があるのか、それとも精度や信頼性がどんどん上がっていってクリティカルなミッションにも気兼ねなく使っていけるようになるのか、今後どう発展していくか楽しみです。
ソースコード
参考までにAPIをたたいて認識結果をパース、ROS1に流すコードをここに貼っておきます。gitには全ソースコードをあげているので、回避動作全体はgitを追ってみてください。
#!/usr/bin/env python3 import rospy from std_msgs.msg import Int32 from nav_msgs.msg import Odometry from sensor_msgs.msg import Image import base64 import time from openai import OpenAI import os from os.path import join, dirname from dotenv import load_dotenv from cv_bridge import CvBridge import cv2 # .envファイルからAPIキーを読み込む dotenv_path = join(dirname(__file__), '.env') load_dotenv(dotenv_path) API_KEY = os.environ.get("API_KEY") if not API_KEY: rospy.logerr("APIキーが見つかりません。.envファイルに設定してください。") exit(1) # OpenAIクライアントの初期化 client = OpenAI(api_key=API_KEY) class ObstacleDetector: def __init__(self): rospy.init_node('obstacle_detector', anonymous=True) # Publisher self.pub = rospy.Publisher('/gpt_output', Int32, queue_size=10) # Subscribers rospy.Subscriber('/ypspur_ros/odom', Odometry, self.odom_callback) rospy.Subscriber('/usb_cam/image_raw', Image, self.image_callback) # 状態変数 self.current_speed = 0.0 self.stationary_time = 0.0 self.last_gpt_output_time = 0.0 self.gpt_output = 99 # 設定 self.stationary_threshold = 0.3 self.stationary_duration = 1.0 self.gpt_request_interval = 10.0 self.gpt_output_expiry = 1.0 # 1秒ごとに出力更新 # 画像バッファ self.last_image = None def odom_callback(self, msg): # ロボットのX方向速度を取得 self.current_speed = abs(msg.twist.twist.linear.x) #print(self.current_speed) if self.current_speed < self.stationary_threshold: self.stationary_time += 1.0 / 10.0 # 約10Hzのコールバックを仮定 else: self.stationary_time = 0.0 def image_callback(self, msg): self.last_image = msg # 最新の画像を保存 def encode_image(self, image_msg): """ ROSの画像メッセージをJPEG形式に変換し、Base64でエンコードする。 """ try: # CvBridgeを使ってROS画像メッセージをOpenCV形式に変換 bridge = CvBridge() cv_image = bridge.imgmsg_to_cv2(image_msg, desired_encoding='bgr8') # JPEG形式にエンコード _, buffer = cv2.imencode('.jpg', cv_image) # Base64エンコード encoded_image = base64.b64encode(buffer).decode('utf-8') rospy.loginfo("画像エンコード成功") return encoded_image except Exception as e: rospy.logerr(f"画像エンコードエラー: {e}") return None # GPT-4Oに画像とテキストプロンプトを送信する関数 def send_image_and_prompt(self): """ 画像(Base64エンコード)とテキストプロンプトをGPT-4Oに送信する関数。 """ self.prompt_text = "あなたはロボットオペレータです。ロボットが前方1m以内に障害物を検出して止まっており、その処理を考えてください\ ロボットのセンサは必ずしも100%の信頼性があるわけではないため、誤検知の可能性もあります。\ ロボットの前方に取り付けられたカメラの映像をもとにロボットの次の動作を判断し、\ 次のどのような行動を起こすべきかを1行目に数字だけで回答してください。\ 誤検知の可能性が高く、特に障害物がないので前に進んでも良さそうな場合は0,\ 障害物が前方にありそう、かつ、それが人やロボット等の移動障害物の場合にはしばらく待機して相手の移動を待つ戦略を取り、数値としては1、\ カラーコーン等の静止障害物の場合は、回避する必要があるため2を出力し、改行して理由を出力してください\ その際、わかるようであれば右か左かどちら方向に回避すればよりスムーズに回避できるかも教えてください" if not self.last_image: rospy.logwarn("画像がありません。") return # 画像をエンコード encoded_image = self.encode_image(self.last_image) if not encoded_image: rospy.logerr("画像のエンコードに失敗しました。") return # リクエストの作成 try: # APIリクエストを送信 response = client.chat.completions.create( model="gpt-4o-mini", # モデルの指定 messages=[ { "role": "user", "content": [ {"type": "text", "text": self.prompt_text}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encoded_image}" }, }, ], } ], ) # GPTの出力を解析 response_message = response.choices[0].message.content response_text = response_message.encode("utf-8").decode("utf-8") if response_message else "No content returned" # 最初の数字をパース gpt_output = int(response_text.split()[0]) self.gpt_output = gpt_output # 結果を更新 self.last_gpt_output_time = rospy.get_time() # 成功時にタイムスタンプを更新 rospy.loginfo(f"GPTの応答: {response_text}") except Exception as e: rospy.logerr(f"GPTリクエストエラー: {e}") self.gpt_output = 99 except Exception as e: print(f"リクエスト中にエラーが発生しました: {e}") def publish_gpt_output(self): """ 1秒ごとにGPTの出力をROSトピックにパブリッシュする。 """ current_time = rospy.get_time() # ロボットが動いている場合、デフォルト値を送信 if self.current_speed > self.stationary_threshold: self.gpt_output = 99 # 1秒ごとに出力を更新 if current_time - self.last_gpt_output_time > self.gpt_output_expiry: self.pub.publish(self.gpt_output) #rospy.loginfo(f"パブリッシュ: {self.gpt_output}") def run(self): """ メインのループ処理。 """ rate = rospy.Rate(10) # 10Hz while not rospy.is_shutdown(): # 停止時間が一定以上の場合、GPTにリクエストを送信 if self.stationary_time >= self.stationary_duration: # 前回のリクエストから十分な時間が経過している場合 if rospy.get_time() - self.last_gpt_output_time > self.gpt_request_interval: self.send_image_and_prompt() # 1秒ごとにパブリッシュを行う self.publish_gpt_output() # ループの速度を保つ rate.sleep() if __name__ == "__main__": detector = ObstacleDetector() detector.run()