Drones

../../../_images/helix.gif ../../../_images/rl.gif ../../../_images/marl.gif

This environment is forked from the gym-pybullet-drones, which is a gym environment with pybullet physics for reinforcement learning of multi-agent quadcopter control. It supports both single and multiple drones control. According to the official repository, it provides the following five kinds of action types:

rpm: rounds per minutes (RPMs);
pid: PID control;
vel: Velocity input (using PID control);
one_d_rpm: 1D (identical input to all motors) with RPMs;
one_d_pid: 1D (identical input to all motors) with PID control.

You also have permission to customize the scenarios and tasks in this environment for your needs.



Installation

Tip

Before preparing the software packages for this simulator, it is recommended to create a new conda environment with Python 3.10.

Open terminal and type the following commands, then a new conda environment for xuance with drones could be built:

conda create -n xuance_drones python=3.10
conda activate xuance_drones
pip install xuance  # refer to the installation of XuanCe.

git clone https://github.com/utiasDSL/gym-pybullet-drones.git
cd gym-pybullet-drones/
pip install --upgrade pip
pip install -e .  # if needed, `sudo apt install build-essential` to install `gcc` and build `pybullet`

During the installation of gym-pybullet-drones, you might encounter the errors like:

Error

gym-pybullet-drones 2.0.0 requires numpy<2.0,>1.24, but you have numpy 1.22.4 which is incompatible.
gym-pybullet-drones 2.0.0 requires scipy<2.0,>1.10, but you have scipy 1.7.3 which is incompatible.

Solution: Upgrade the above incompatible packages.

pip install numpy==1.24.0
pip install scipy==1.12.0


Try an Example

Create a python file named, e.g., “demo_drones.py”.

import argparse
from xuance import get_runner

def parse_args():
    parser = argparse.ArgumentParser("Run a demo.")
    parser.add_argument("--method", type=str, default="iddpg")
    parser.add_argument("--env", type=str, default="drones")
    parser.add_argument("--env-id", type=str, default="MultiHoverAviary")
    parser.add_argument("--test", type=int, default=0)
    parser.add_argument("--device", type=str, default="cuda:0")
    parser.add_argument("--parallels", type=int, default=10)
    parser.add_argument("--benchmark", type=int, default=1)
    parser.add_argument("--test-episode", type=int, default=5)

    return parser.parse_args()

if __name__ == '__main__':
    parser = parse_args()
    runner = get_runner(method=parser.method,
                        env=parser.env,
                        env_id=parser.env_id,
                        parser_args=parser,
                        is_test=parser.test)
    if parser.benchmark:
        runner.benchmark()
    else:
        runner.run()

Open the terminal and type the python command:

python demo_drones.py
Then, you can brew a cup of coffee, and wait for the training process to finish.
Finally, test the trained model and view the effectiveness.
python demo_drones.py --benchmark 0 --test 1


drones_env.py

class xuance.environment.drones.drones_env.Drones_Env(args)

This is a wrapper class for a Drones_Env environment.

Parameters:
  • args (SimpleNamespace) – An argument object that contains various settings and parameters for initializing the environment.

  • args.continuous (bool) – Determines whether the drone operates in a continuous control mode.

  • args.env_id (str) – Specifies the type of PyBullet Drones environment to instantiate.

  • args.render (bool) – Determines whether to render the environment with a graphical interface.

  • args.record (bool) – Determines whether to record the environment’s visual output.

  • args.max_episode_steps (int) – Maximum number of steps per episode for the environment.

xuance.environment.drones.drones_env.Drones_Env.space_reshape(gym_space)

Reshape the given Gym space into a new Box space with flattened boundaries.

Parameters:

gym_space (gym.spaces.Space) – The Gym space that needs to be reshaped.

Returns:

A reshaped Box space with flattened boundaries.

Return type:

gym.spaces.Box

xuance.environment.drones.drones_env.Drones_Env.close()

Close the environment.

xuance.environment.drones.drones_env.Drones_Env.render()

Return the rendering result.

Returns:

the rendering result.

Return type:

np.ndarray

xuance.environment.drones.drones_env.Drones_Env.reset()

Reset the environment.

Returns:

The initial observation of the environment as a flattened 1-dimensional array and additional information regarding the environment’s state.

Return type:

tuple

xuance.environment.drones.drones_env.Drones_Env.step(actions)

Execute the actions and get next observations, rewards, and other information.

Parameters:

actions (np.ndarray) – Actions to be executed in the environment. The actions are reshaped to be compatible with the environment’s expectations.

Returns:

A tuple containing the flattened initial observation of the environment, the received reward, a termination indicator, a truncation indicator, and additional environment-related information.

Return type:

tuple



drones_vec__env.py

class xuance.environment.drones.drones_vec_env.SubprocVecEnv_Drones(env_fns, context='spawn', in_series=1)

Extend the functionality of a subprocess-based vectorized environment.

Parameters:
  • env_fns – environment function.

  • context – the method used for creating and managing processes in a multiprocessing environment.

  • in_series (int) – specifies the number of environments to run in series.

class xuance.environment.drones.drones_vec_env.DummyVecEnv_Drones(env_fns)

Extends the functionality of a dummy vectorized environment

Parameters:

env_fns – environment function.



Source Code

import numpy as np
from gym.spaces import Box
import time


class Drones_Env:
    def __init__(self, args):
        # import scenarios of gym-pybullet-drones
        self.env_id = args.env_id
        from gym_pybullet_drones.envs.CtrlAviary import CtrlAviary
        from xuance.environment.drones.customized.HoverAviary import HoverAviary
        from gym_pybullet_drones.envs.VelocityAviary import VelocityAviary
        from xuance.environment.drones.customized.MultiHoverAviary import MultiHoverAviary
        from gym_pybullet_drones.utils.enums import ObservationType, ActionType
        REGISTRY = {
            "CtrlAviary": CtrlAviary,
            "HoverAviary": HoverAviary,
            "VelocityAviary": VelocityAviary,
            "MultiHoverAviary": MultiHoverAviary,
            # you can add your customized scenarios here.
        }
        self.gui = args.render  # Note: You cannot render multiple environments in parallel.
        self.sleep = args.sleep
        self.env_id = args.env_id

        kwargs_env = {'gui': self.gui}
        if self.env_id in ["HoverAviary", "MultiHoverAviary"]:
            kwargs_env.update({'obs': ObservationType(args.obs_type),
                               'act': ActionType(args.act_type)})
        if self.env_id != "HoverAviary":
            kwargs_env.update({'num_drones': args.num_drones})
        self.env = REGISTRY[args.env_id](**kwargs_env)

        self._episode_step = 0
        self._episode_score = 0.0
        if self.env_id == "MultiHoverAviary":
            self.observation_space = self.env.observation_space
            self.observation_shape = self.env.observation_space.shape
            self.action_space = self.env.action_space
            self.action_shape = self.env.action_space.shape
        else:
            self.observation_space = self.space_reshape(self.env.observation_space)
            self.action_space = self.space_reshape(self.env.action_space)
        self.max_episode_steps = self.max_cycles = args.max_episode_steps

        self.n_agents = args.num_drones
        self.env_info = {
            "n_agents": self.n_agents,
            "obs_shape": self.env.observation_space.shape,
            "act_space": self.action_space,
            "state_shape": 20,
            "n_actions": self.env.action_space.shape[-1],
            "episode_limit": self.max_episode_steps,
        }

    def space_reshape(self, gym_space):
        low = gym_space.low.reshape(-1)
        high = gym_space.high.reshape(-1)
        shape_obs = (gym_space.shape[-1], )
        return Box(low=low, high=high, shape=shape_obs, dtype=gym_space.dtype)

    def close(self):
        self.env.close()

    def render(self, *args, **kwargs):
        return np.zeros([2, 2, 2])

    def reset(self):
        obs, info = self.env.reset()
        info["episode_step"] = self._episode_step

        self._episode_step = 0
        if self.n_agents > 1:
            self._episode_score = np.zeros([self.n_agents, 1])
            obs_return = obs
        else:
            self._episode_score = 0.0
            obs_return = obs.reshape(-1)
        return obs_return, info

    def step(self, actions):
        if self.n_agents > 1:
            obs, reward, terminated, truncated, info = self.env.step(actions)
            obs_return = obs
            terminated = [terminated for _ in range(self.n_agents)]
        else:
            obs, reward, terminated, truncated, info = self.env.step(actions.reshape([1, -1]))
            obs_return = obs.reshape(-1)

        self._episode_step += 1
        self._episode_score += reward
        if self.n_agents > 1:
            truncated = [True for _ in range(self.n_agents)] if (self._episode_step >= self.max_episode_steps) else [False for _ in range(self.n_agents)]
        else:
            truncated = True if (self._episode_step >= self.max_episode_steps) else False
        info["episode_step"] = self._episode_step  # current episode step
        info["episode_score"] = self._episode_score  # the accumulated rewards

        if self.gui:
            time.sleep(self.sleep)

        return obs_return, reward, terminated, truncated, info

    def get_agent_mask(self):
        return np.ones(self.n_agents, dtype=np.bool_)  # 1 means available

    def state(self):
        return np.zeros([20])