opentau.envs.robocasa

Environment wrapper for RoboCasa365 kitchen tasks.

Ported from upstream LeRobot’s lerobot/envs/robocasa.py and reshaped to OpenTau’s LIBERO conventions: cameras are remapped to camera0/camera1/… so opentau.envs.utils.preprocess_observation() and the policy’s num_cams zero-fill path consume them exactly like LIBERO, and the vec-env builder shards tasks across accelerator ranks so distributed eval and the _rank{N}-strip uniqueness assumption in opentau.scripts.eval.collect_grid_summary_videos() both hold.

The underlying simulator (robocasa / robosuite 1.5) is imported lazily inside RoboCasaEnv._ensure_env() and _resolve_tasks(), so importing this module (e.g. in the CPU test suite) never requires the sim to be installed.

Functions

convert_action(flat_action)

Split a flat (12,) action vector into a RoboCasa action dict.

create_robocasa_envs(task, n_envs[, ...])

Create vectorized RoboCasa365 environments with a consistent return shape.

Classes

RoboCasaEnv(task[, camera_name, obs_type, ...])

Gym wrapper for RoboCasa365 kitchen environments.

class opentau.envs.robocasa.RoboCasaEnv(task: str, camera_name: str | Sequence[str] = 'robot0_agentview_left,robot0_eye_in_hand,robot0_agentview_right', obs_type: str = 'pixels_agent_pos', render_mode: str = 'rgb_array', observation_width: int = 256, observation_height: int = 256, visualization_width: int = 512, visualization_height: int = 512, split: str | None = None, episode_length: int | None = None, obj_registries: Sequence[str] = ('lightwheel',), episode_index: int = 0, camera_name_mapping: dict[str, list[str]] | None = None)[source]

Bases: Env

Gym wrapper for RoboCasa365 kitchen environments.

Wraps RoboCasaGymEnv from the robocasa package and converts its dict-based observations/actions into the flat arrays OpenTau expects. Raw camera frames are remapped to camera{i} keys (see camera_name_mapping) so the policy input structure matches LIBERO.

__init__(task: str, camera_name: str | Sequence[str] = 'robot0_agentview_left,robot0_eye_in_hand,robot0_agentview_right', obs_type: str = 'pixels_agent_pos', render_mode: str = 'rgb_array', observation_width: int = 256, observation_height: int = 256, visualization_width: int = 512, visualization_height: int = 512, split: str | None = None, episode_length: int | None = None, obj_registries: Sequence[str] = ('lightwheel',), episode_index: int = 0, camera_name_mapping: dict[str, list[str]] | None = None)[source]

Initialize the RoboCasaEnv.

Parameters:
  • task – RoboCasa task name (e.g. "CloseFridge").

  • camera_name – Raw RoboCasa camera name(s); comma-separated string or sequence. Both count and order are driven by this value.

  • obs_type"pixels" or "pixels_agent_pos".

  • render_mode – Rendering mode for the environment.

  • observation_width – Width of observation images.

  • observation_height – Height of observation images.

  • visualization_width – Width of visualization frames.

  • visualization_height – Height of visualization frames.

  • split – RoboCasa dataset split (None/"all"/"pretrain"/"target").

  • episode_length – Max steps per episode (_max_episode_steps); defaults to 1000.

  • obj_registries – Object-mesh registries to sample assets from.

  • episode_index – Per-worker index (0..n_envs-1) used to spread the reset seed so each sub-env explores a distinct layout.

  • camera_name_mapping – Optional mapping from raw camera names to positional camera{i} keys; defaults to first→``camera0``, etc.

close()[source]

Close the environment and release any resources.

metadata: dict[str, Any] = {'render_fps': 20, 'render_modes': ['rgb_array']}
render() ndarray[source]

Render the environment and return an RGB array for video recording.

reset(seed=None, **kwargs) tuple[dict[str, Any], dict[str, Any]][source]

Reset the environment, deriving a per-worker seed from episode_index.

step(action: ndarray) tuple[dict[str, Any], float, bool, bool, dict[str, Any]][source]

Take a step; remaps RoboCasa’s info["success"] to is_success.

opentau.envs.robocasa.convert_action(flat_action: ndarray) dict[str, Any][source]

Split a flat (12,) action vector into a RoboCasa action dict.

Layout: base_motion(4) + control_mode(1) + ee_pos(3) + ee_rot(3) + gripper(1).

opentau.envs.robocasa.create_robocasa_envs(task: str, n_envs: int, gym_kwargs: dict[str, Any] | None = None, camera_name: str | Sequence[str] = 'robot0_agentview_left,robot0_eye_in_hand,robot0_agentview_right', env_cls: type[SyncVectorEnv] | type[AsyncVectorEnv] | None = None, episode_length: int | None = None, obj_registries: Sequence[str] = ('lightwheel',)) dict[str, dict[int, VectorEnv]][source]

Create vectorized RoboCasa365 environments with a consistent return shape.

Returns:

dict[task_name][0] -> vec_env (env_cls([...]) with n_envs factories). Each distinct task is its own group, so eval reports a per-task Success/{task} and a per-task Eval Videos/{task}_0 grid.

task can be a single task name (CloseFridge), a comma-separated list (CloseFridge,PickPlaceCoffee), or a benchmark-group shortcut (atomic_seen/composite_seen/composite_unseen/pretrain50…), which auto-expands and auto-sets the dataset split.

When run under an accelerator with multiple processes, tasks are sharded round-robin (idx % num_processes == process_index) so each rank evaluates a disjoint subset — matching LIBERO and keeping the _rank{N}-strip uniqueness assumption in collect_grid_summary_videos valid.