# file: pi05_libero_run_and_record.py
import os
import random
import cv2
import torch
import numpy as np
from lerobot.envs.factory import make_env
from lerobot.policies.pi05.policy import PI05Policy
def make_libero_env(task_id: int):
env = make_env(
env_type="libero",
suite_names=["libero_spatial"],
task_ids=[task_id],
obs_type="image",
render_mode="rgb_array",
fps=30,
)
return env
def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
task_id = random.randint(0, 9) # libero_spatial tasks 0..9
print(f"Using libero_spatial task_id={task_id}")
env = make_libero_env(task_id)
policy = PI05Policy.from_pretrained("lerobot/pi05_libero_finetuned")
policy.to(device)
policy.eval()
out_path = f"pi05_libero_task_{task_id}.mp4"
fps = 30
frames = []
obs, info = env.reset()
done = False
step_idx = 0
instruction = info.get("lang_goal", None)
print(f"Instruction: {instruction}")
while not done:
with torch.no_grad():
action = policy.act(
obs=obs,
instruction=instruction,
device=device,
)
obs, reward, terminated, truncated, info = env.step(action)
done = bool(terminated or truncated)
frame = env.render()
if frame is not None:
frames.append(frame)
step_idx += 1
print(f"Episode finished, steps={step_idx}, success={info.get('is_success', False)}")
if len(frames) == 0:
print("No frames collected, check env.render() and render_mode.")
return
h, w, _ = frames[0].shape
writer = cv2.VideoWriter(
out_path,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(w, h),
)
for f in frames:
bgr = cv2.cvtColor(f, cv2.COLOR_RGB2BGR)
writer.write(bgr)
writer.release()
print(f"Saved video to {out_path}")
if __name__ == "__main__":
main()