Source code for mujoco_warp._src.forward

# Copyright 2025 The Newton Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from typing import Optional

import warp as wp

from mujoco_warp._src import collision_driver
from mujoco_warp._src import constraint
from mujoco_warp._src import derivative
from mujoco_warp._src import math
from mujoco_warp._src import passive
from mujoco_warp._src import sensor
from mujoco_warp._src import smooth
from mujoco_warp._src import solver
from mujoco_warp._src import util_misc
from mujoco_warp._src.support import xfrc_accumulate
from mujoco_warp._src.types import MJ_MINVAL
from mujoco_warp._src.types import BiasType
from mujoco_warp._src.types import Data
from mujoco_warp._src.types import DisableBit
from mujoco_warp._src.types import DynType
from mujoco_warp._src.types import EnableBit
from mujoco_warp._src.types import GainType
from mujoco_warp._src.types import IntegratorType
from mujoco_warp._src.types import JointType
from mujoco_warp._src.types import Model
from mujoco_warp._src.types import TileSet
from mujoco_warp._src.types import TrnType
from mujoco_warp._src.types import vec10f
from mujoco_warp._src.warp_util import cache_kernel
from mujoco_warp._src.warp_util import event_scope

wp.set_module_options({"enable_backward": False})


@wp.kernel
def _next_position(
  # Model:
  opt_timestep: wp.array(dtype=float),
  jnt_type: wp.array(dtype=int),
  jnt_qposadr: wp.array(dtype=int),
  jnt_dofadr: wp.array(dtype=int),
  # Data in:
  qpos_in: wp.array2d(dtype=float),
  qvel_in: wp.array2d(dtype=float),
  # In:
  qvel_scale_in: float,
  # Data out:
  qpos_out: wp.array2d(dtype=float),
):
  worldid, jntid = wp.tid()
  timestep = opt_timestep[worldid % opt_timestep.shape[0]]

  jnttype = jnt_type[jntid]
  qpos_adr = jnt_qposadr[jntid]
  dof_adr = jnt_dofadr[jntid]
  qpos = qpos_in[worldid]
  qpos_next = qpos_out[worldid]
  qvel = qvel_in[worldid]

  if jnttype == JointType.FREE:
    qpos_pos = wp.vec3(qpos[qpos_adr], qpos[qpos_adr + 1], qpos[qpos_adr + 2])
    qvel_lin = wp.vec3(qvel[dof_adr], qvel[dof_adr + 1], qvel[dof_adr + 2]) * qvel_scale_in

    qpos_new = qpos_pos + timestep * qvel_lin

    qpos_quat = wp.quat(
      qpos[qpos_adr + 3],
      qpos[qpos_adr + 4],
      qpos[qpos_adr + 5],
      qpos[qpos_adr + 6],
    )
    qvel_ang = wp.vec3(qvel[dof_adr + 3], qvel[dof_adr + 4], qvel[dof_adr + 5]) * qvel_scale_in

    qpos_quat_new = math.quat_integrate(qpos_quat, qvel_ang, timestep)

    qpos_next[qpos_adr + 0] = qpos_new[0]
    qpos_next[qpos_adr + 1] = qpos_new[1]
    qpos_next[qpos_adr + 2] = qpos_new[2]
    qpos_next[qpos_adr + 3] = qpos_quat_new[0]
    qpos_next[qpos_adr + 4] = qpos_quat_new[1]
    qpos_next[qpos_adr + 5] = qpos_quat_new[2]
    qpos_next[qpos_adr + 6] = qpos_quat_new[3]

  elif jnttype == JointType.BALL:
    qpos_quat = wp.quat(qpos[qpos_adr + 0], qpos[qpos_adr + 1], qpos[qpos_adr + 2], qpos[qpos_adr + 3])
    qvel_ang = wp.vec3(qvel[dof_adr], qvel[dof_adr + 1], qvel[dof_adr + 2]) * qvel_scale_in

    qpos_quat_new = math.quat_integrate(qpos_quat, qvel_ang, timestep)

    qpos_next[qpos_adr + 0] = qpos_quat_new[0]
    qpos_next[qpos_adr + 1] = qpos_quat_new[1]
    qpos_next[qpos_adr + 2] = qpos_quat_new[2]
    qpos_next[qpos_adr + 3] = qpos_quat_new[3]

  else:  # if jnt_type in (JointType.HINGE, JointType.SLIDE):
    qpos_next[qpos_adr] = qpos[qpos_adr] + timestep * qvel[dof_adr] * qvel_scale_in


@wp.kernel
def _next_velocity(
  # Model:
  opt_timestep: wp.array(dtype=float),
  # Data in:
  qvel_in: wp.array2d(dtype=float),
  qacc_in: wp.array2d(dtype=float),
  # In:
  qacc_scale_in: float,
  # Data out:
  qvel_out: wp.array2d(dtype=float),
):
  worldid, dofid = wp.tid()
  timestep = opt_timestep[worldid % opt_timestep.shape[0]]
  qvel_out[worldid, dofid] = qvel_in[worldid, dofid] + qacc_scale_in * qacc_in[worldid, dofid] * timestep


# TODO(team): kernel analyzer array slice?
@wp.func
def _next_act(
  # Model:
  opt_timestep: float,  # kernel_analyzer: ignore
  actuator_dyntype: int,  # kernel_analyzer: ignore
  actuator_dynprm: vec10f,  # kernel_analyzer: ignore
  actuator_actrange: wp.vec2,  # kernel_analyzer: ignore
  # Data In:
  act_in: float,  # kernel_analyzer: ignore
  act_dot_in: float,  # kernel_analyzer: ignore
  # In:
  act_dot_scale: float,
  clamp: bool,
) -> float:
  # advance actuation
  if actuator_dyntype == DynType.FILTEREXACT:
    tau = wp.max(MJ_MINVAL, actuator_dynprm[0])
    act = act_in + act_dot_scale * act_dot_in * tau * (1.0 - wp.exp(-opt_timestep / tau))
  else:
    act = act_in + act_dot_scale * act_dot_in * opt_timestep

  # clamp to actrange
  if clamp:
    act = wp.clamp(act, actuator_actrange[0], actuator_actrange[1])

  return act


@wp.kernel
def _next_activation(
  # Model:
  opt_timestep: wp.array(dtype=float),
  actuator_dyntype: wp.array(dtype=int),
  actuator_actlimited: wp.array(dtype=bool),
  actuator_dynprm: wp.array2d(dtype=vec10f),
  actuator_actrange: wp.array2d(dtype=wp.vec2),
  # Data in:
  act_in: wp.array2d(dtype=float),
  act_dot_in: wp.array2d(dtype=float),
  # In:
  act_dot_scale: float,
  limit: bool,
  # Data out:
  act_out: wp.array2d(dtype=float),
):
  worldid, actid = wp.tid()
  opt_timestep_id = worldid % opt_timestep.shape[0]
  actuator_dynprm_id = worldid % actuator_dynprm.shape[0]
  actuator_actrange_id = worldid % actuator_actrange.shape[0]
  act = _next_act(
    opt_timestep[opt_timestep_id],
    actuator_dyntype[actid],
    actuator_dynprm[actuator_dynprm_id, actid],
    actuator_actrange[actuator_actrange_id, actid],
    act_in[worldid, actid],
    act_dot_in[worldid, actid],
    act_dot_scale,
    limit and actuator_actlimited[actid],
  )
  act_out[worldid, actid] = act


@wp.kernel
def _next_time(
  # Model:
  opt_timestep: wp.array(dtype=float),
  # Data in:
  nefc_in: wp.array(dtype=int),
  time_in: wp.array(dtype=float),
  nworld_in: int,
  naconmax_in: int,
  njmax_in: int,
  nacon_in: wp.array(dtype=int),
  ncollision_in: wp.array(dtype=int),
  # Data out:
  time_out: wp.array(dtype=float),
):
  worldid = wp.tid()
  time_out[worldid] = time_in[worldid] + opt_timestep[worldid % opt_timestep.shape[0]]
  nefc = nefc_in[worldid]

  if nefc > njmax_in:
    wp.printf("nefc overflow - please increase njmax to %u\n", nefc)

  if worldid == 0:
    ncollision = ncollision_in[0]
    if ncollision > naconmax_in:
      nconmax = int(wp.ceil(float(ncollision) / float(nworld_in)))
      wp.printf("broadphase overflow - please increase nconmax to %u or naconmax to %u\n", nconmax, ncollision)

    if nacon_in[0] > naconmax_in:
      nconmax = int(wp.ceil(float(nacon_in[0]) / float(nworld_in)))
      wp.printf("narrowphase overflow - please increase nconmax to %u or naconmax to %u\n", nconmax, nacon_in[0])


def _advance(m: Model, d: Data, qacc: wp.array, qvel: Optional[wp.array] = None):
  """Advance state and time given activation derivatives and acceleration."""
  # TODO(team): can we assume static timesteps?

  # advance activations
  wp.launch(
    _next_activation,
    dim=(d.nworld, m.na),
    inputs=[
      m.opt.timestep,
      m.actuator_dyntype,
      m.actuator_actlimited,
      m.actuator_dynprm,
      m.actuator_actrange,
      d.act,
      d.act_dot,
      1.0,
      True,
    ],
    outputs=[d.act],
  )

  wp.launch(
    _next_velocity,
    dim=(d.nworld, m.nv),
    inputs=[m.opt.timestep, d.qvel, qacc, 1.0],
    outputs=[d.qvel],
  )

  # advance positions with qvel if given, d.qvel otherwise (semi-implicit)
  qvel_in = qvel or d.qvel

  wp.launch(
    _next_position,
    dim=(d.nworld, m.njnt),
    inputs=[m.opt.timestep, m.jnt_type, m.jnt_qposadr, m.jnt_dofadr, d.qpos, qvel_in, 1.0],
    outputs=[d.qpos],
  )

  wp.launch(
    _next_time,
    dim=d.nworld,
    inputs=[m.opt.timestep, d.nefc, d.time, d.nworld, d.naconmax, d.njmax, d.nacon, d.ncollision],
    outputs=[d.time],
  )

  wp.copy(d.qacc_warmstart, d.qacc)


@wp.kernel
def _euler_damp_qfrc_sparse(
  # Model:
  opt_timestep: wp.array(dtype=float),
  dof_Madr: wp.array(dtype=int),
  dof_damping: wp.array2d(dtype=float),
  # Out:
  qM_integration_out: wp.array3d(dtype=float),
):
  worldid, tid = wp.tid()
  timestep = opt_timestep[worldid % opt_timestep.shape[0]]

  adr = dof_Madr[tid]
  qM_integration_out[worldid, 0, adr] += timestep * dof_damping[worldid, tid]


@cache_kernel
def _tile_euler_dense(tile: TileSet):
  @wp.kernel(module="unique", enable_backward=False)
  def euler_dense(
    # Model:
    opt_timestep: wp.array(dtype=float),
    dof_damping: wp.array2d(dtype=float),
    # Data in:
    qM_in: wp.array3d(dtype=float),
    efc_Ma_in: wp.array2d(dtype=float),
    # In:
    adr_in: wp.array(dtype=int),
    # Data out:
    qacc_out: wp.array2d(dtype=float),
  ):
    worldid, nodeid = wp.tid()
    timestep = opt_timestep[worldid % opt_timestep.shape[0]]
    TILE_SIZE = wp.static(tile.size)

    dofid = adr_in[nodeid]
    M_tile = wp.tile_load(qM_in[worldid], shape=(TILE_SIZE, TILE_SIZE), offset=(dofid, dofid))
    damping_tile = wp.tile_load(dof_damping[worldid % dof_damping.shape[0]], shape=(TILE_SIZE,), offset=(dofid,))
    damping_scaled = damping_tile * timestep
    qm_integration_tile = wp.tile_diag_add(M_tile, damping_scaled)

    Ma_tile = wp.tile_load(efc_Ma_in[worldid], shape=(TILE_SIZE,), offset=(dofid,))
    L_tile = wp.tile_cholesky(qm_integration_tile)
    qacc_tile = wp.tile_cholesky_solve(L_tile, Ma_tile)
    wp.tile_store(qacc_out[worldid], qacc_tile, offset=(dofid))

  return euler_dense


[docs] @event_scope def euler(m: Model, d: Data): """Euler integrator, semi-implicit in velocity.""" # integrate damping implicitly if not m.opt.disableflags & (DisableBit.EULERDAMP | DisableBit.DAMPER): qacc = wp.empty((d.nworld, m.nv), dtype=float) if m.is_sparse: qM = wp.clone(d.qM) qLD = wp.empty((d.nworld, 1, m.nC), dtype=float) qLDiagInv = wp.empty((d.nworld, m.nv), dtype=float) wp.launch( _euler_damp_qfrc_sparse, dim=(d.nworld, m.nv), inputs=[m.opt.timestep, m.dof_Madr, m.dof_damping], outputs=[qM], ) smooth.factor_solve_i(m, d, qM, qLD, qLDiagInv, qacc, d.efc.Ma) else: for tile in m.qM_tiles: wp.launch_tiled( _tile_euler_dense(tile), dim=(d.nworld, tile.adr.size), inputs=[m.opt.timestep, m.dof_damping, d.qM, d.efc.Ma, tile.adr], outputs=[qacc], block_dim=m.block_dim.euler_dense, ) _advance(m, d, qacc) else: _advance(m, d, d.qacc)
def _rk_perturb_state( m: Model, d: Data, scale: float, qpos_t0: wp.array2d(dtype=float), qvel_t0: wp.array2d(dtype=float), act_t0: Optional[wp.array] = None, ): # position wp.launch( _next_position, dim=(d.nworld, m.njnt), inputs=[m.opt.timestep, m.jnt_type, m.jnt_qposadr, m.jnt_dofadr, qpos_t0, d.qvel, scale], outputs=[d.qpos], ) # velocity wp.launch( _next_velocity, dim=(d.nworld, m.nv), inputs=[m.opt.timestep, qvel_t0, d.qacc, scale], outputs=[d.qvel], ) # activation if m.na and act_t0 is not None: wp.launch( _next_activation, dim=(d.nworld, m.na), inputs=[m.opt.timestep, act_t0, d.act_dot, scale, False], outputs=[d.act], ) @wp.kernel def _rk_accumulate_velocity_acceleration( # Data in: qvel_in: wp.array2d(dtype=float), qacc_in: wp.array2d(dtype=float), # In: scale: float, # Data out: qvel_out: wp.array2d(dtype=float), qacc_out: wp.array2d(dtype=float), ): worldid, dofid = wp.tid() qvel_out[worldid, dofid] += scale * qvel_in[worldid, dofid] qacc_out[worldid, dofid] += scale * qacc_in[worldid, dofid] @wp.kernel def _rk_accumulate_activation_velocity( # Data in: act_dot_in: wp.array2d(dtype=float), # In: scale: float, # Data out: act_dot_out: wp.array2d(dtype=float), ): worldid, actid = wp.tid() act_dot_out[worldid, actid] += scale * act_dot_in[worldid, actid] def _rk_accumulate( m: Model, d: Data, scale: float, qvel_rk: wp.array2d(dtype=float), qacc_rk: wp.array2d(dtype=float), act_dot_rk: Optional[wp.array] = None, ): """Computes one term of 1/6 k_1 + 1/3 k_2 + 1/3 k_3 + 1/6 k_4.""" wp.launch( _rk_accumulate_velocity_acceleration, dim=(d.nworld, m.nv), inputs=[d.qvel, d.qacc, scale], outputs=[qvel_rk, qacc_rk], ) if m.na and act_dot_rk is not None: wp.launch( _rk_accumulate_activation_velocity, dim=(d.nworld, m.na), inputs=[d.act_dot, scale], outputs=[act_dot_rk], )
[docs] @event_scope def rungekutta4(m: Model, d: Data): """Runge-Kutta explicit order 4 integrator.""" # RK4 tableau A = [0.5, 0.5, 1.0] # diagonal only B = [1.0 / 6.0, 1.0 / 3.0, 1.0 / 3.0, 1.0 / 6.0] qpos_t0 = wp.clone(d.qpos) qvel_t0 = wp.clone(d.qvel) qvel_rk = wp.zeros((d.nworld, m.nv), dtype=float) qacc_rk = wp.zeros((d.nworld, m.nv), dtype=float) if m.na: act_t0 = wp.clone(d.act) act_dot_rk = wp.zeros((d.nworld, m.na), dtype=float) else: act_t0 = None act_dot_rk = None _rk_accumulate(m, d, B[0], qvel_rk, qacc_rk, act_dot_rk) for i in range(3): a, b = float(A[i]), B[i + 1] _rk_perturb_state(m, d, a, qpos_t0, qvel_t0, act_t0) forward(m, d) _rk_accumulate(m, d, b, qvel_rk, qacc_rk, act_dot_rk) wp.copy(d.qpos, qpos_t0) wp.copy(d.qvel, qvel_t0) if m.na: wp.copy(d.act, act_t0) wp.copy(d.act_dot, act_dot_rk) _advance(m, d, qacc_rk, qvel_rk)
[docs] @event_scope def implicit(m: Model, d: Data): """Integrates fully implicit in velocity.""" if ~(m.opt.disableflags | ~(DisableBit.ACTUATION | DisableBit.SPRING | DisableBit.DAMPER)): if m.is_sparse: qDeriv = wp.empty((d.nworld, 1, m.nM), dtype=float) qLD = wp.empty((d.nworld, 1, m.nC), dtype=float) else: qDeriv = wp.empty(d.qM.shape, dtype=float) qLD = wp.empty(d.qM.shape, dtype=float) qLDiagInv = wp.empty((d.nworld, m.nv), dtype=float) derivative.deriv_smooth_vel(m, d, qDeriv) qacc = wp.empty((d.nworld, m.nv), dtype=float) smooth.factor_solve_i(m, d, qDeriv, qLD, qLDiagInv, qacc, d.efc.Ma) _advance(m, d, qacc) else: _advance(m, d, d.qacc)
[docs] @event_scope def fwd_position(m: Model, d: Data, factorize: bool = True): """Position-dependent computations. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. factorize: Flag to factorize interia matrix. """ smooth.kinematics(m, d) smooth.com_pos(m, d) smooth.camlight(m, d) smooth.flex(m, d) smooth.tendon(m, d) smooth.crb(m, d) smooth.tendon_armature(m, d) if factorize: smooth.factor_m(m, d) if m.opt.run_collision_detection: collision_driver.collision(m, d) constraint.make_constraint(m, d) smooth.transmission(m, d)
# TODO(team): sparse actuator_moment version @cache_kernel def _actuator_velocity(nv: int): @wp.kernel(module="unique", enable_backward=False) def actuator_velocity( # Data in: qvel_in: wp.array2d(dtype=float), actuator_moment_in: wp.array3d(dtype=float), # Data out: actuator_velocity_out: wp.array2d(dtype=float), ): worldid, actid = wp.tid() moment_tile = wp.tile_load(actuator_moment_in[worldid, actid], shape=wp.static(nv)) qvel_tile = wp.tile_load(qvel_in[worldid], shape=wp.static(nv)) moment_qvel_tile = wp.tile_map(wp.mul, moment_tile, qvel_tile) actuator_velocity_tile = wp.tile_reduce(wp.add, moment_qvel_tile) actuator_velocity_out[worldid, actid] = actuator_velocity_tile[0] return actuator_velocity @cache_kernel def _tendon_velocity(nv: int): @wp.kernel(module="unique", enable_backward=False) def tendon_velocity( # Data in: qvel_in: wp.array2d(dtype=float), ten_J_in: wp.array3d(dtype=float), # Data out: ten_velocity_out: wp.array2d(dtype=float), ): worldid, tenid = wp.tid() ten_J_tile = wp.tile_load(ten_J_in[worldid, tenid], shape=wp.static(nv)) qvel_tile = wp.tile_load(qvel_in[worldid], shape=wp.static(nv)) ten_J_qvel_tile = wp.tile_map(wp.mul, ten_J_tile, qvel_tile) ten_velocity_tile = wp.tile_reduce(wp.add, ten_J_qvel_tile) ten_velocity_out[worldid, tenid] = ten_velocity_tile[0] return tendon_velocity
[docs] @event_scope def fwd_velocity(m: Model, d: Data): """Velocity-dependent computations.""" wp.launch_tiled( _actuator_velocity(m.nv), dim=(d.nworld, m.nu), inputs=[d.qvel, d.actuator_moment], outputs=[d.actuator_velocity], block_dim=m.block_dim.actuator_velocity, ) # TODO(team): sparse version wp.launch_tiled( _tendon_velocity(m.nv), dim=(d.nworld, m.ntendon), inputs=[d.qvel, d.ten_J], outputs=[d.ten_velocity], block_dim=m.block_dim.tendon_velocity, ) smooth.com_vel(m, d) passive.passive(m, d) smooth.rne(m, d) smooth.tendon_bias(m, d, d.qfrc_bias)
@wp.kernel def _actuator_force( # Model: na: int, opt_timestep: wp.array(dtype=float), actuator_dyntype: wp.array(dtype=int), actuator_gaintype: wp.array(dtype=int), actuator_biastype: wp.array(dtype=int), actuator_actadr: wp.array(dtype=int), actuator_actnum: wp.array(dtype=int), actuator_ctrllimited: wp.array(dtype=bool), actuator_forcelimited: wp.array(dtype=bool), actuator_actlimited: wp.array(dtype=bool), actuator_dynprm: wp.array2d(dtype=vec10f), actuator_gainprm: wp.array2d(dtype=vec10f), actuator_biasprm: wp.array2d(dtype=vec10f), actuator_actearly: wp.array(dtype=bool), actuator_ctrlrange: wp.array2d(dtype=wp.vec2), actuator_forcerange: wp.array2d(dtype=wp.vec2), actuator_actrange: wp.array2d(dtype=wp.vec2), actuator_acc0: wp.array(dtype=float), actuator_lengthrange: wp.array(dtype=wp.vec2), # Data in: act_in: wp.array2d(dtype=float), ctrl_in: wp.array2d(dtype=float), actuator_length_in: wp.array2d(dtype=float), actuator_velocity_in: wp.array2d(dtype=float), # In: dsbl_clampctrl: int, # Data out: act_dot_out: wp.array2d(dtype=float), actuator_force_out: wp.array2d(dtype=float), ): worldid, uid = wp.tid() actuator_ctrlrange_id = worldid % actuator_ctrlrange.shape[0] ctrl = ctrl_in[worldid, uid] if actuator_ctrllimited[uid] and not dsbl_clampctrl: ctrlrange = actuator_ctrlrange[actuator_ctrlrange_id, uid] ctrl = wp.clamp(ctrl, ctrlrange[0], ctrlrange[1]) ctrl_act = ctrl act_first = actuator_actadr[uid] if na and act_first >= 0: act_last = act_first + actuator_actnum[uid] - 1 dyntype = actuator_dyntype[uid] dynprm = actuator_dynprm[worldid % actuator_dynprm.shape[0], uid] if dyntype == DynType.INTEGRATOR: act_dot = ctrl elif dyntype == DynType.FILTER or dyntype == DynType.FILTEREXACT: act = act_in[worldid, act_last] act_dot = (ctrl - act) / wp.max(dynprm[0], MJ_MINVAL) elif dyntype == DynType.MUSCLE: dynprm = actuator_dynprm[worldid, uid] act = act_in[worldid, act_last] act_dot = util_misc.muscle_dynamics(ctrl, act, dynprm) else: # DynType.NONE act_dot = 0.0 act_dot_out[worldid, act_last] = act_dot if actuator_actearly[uid]: if dyntype == DynType.INTEGRATOR or dyntype == DynType.NONE: act = act_in[worldid, act_last] ctrl_act = _next_act( opt_timestep[worldid % opt_timestep.shape[0]], dyntype, dynprm, actuator_actrange[worldid % actuator_actrange.shape[0], uid], act, act_dot, 1.0, actuator_actlimited[uid], ) else: ctrl_act = act_in[worldid, act_last] length = actuator_length_in[worldid, uid] velocity = actuator_velocity_in[worldid, uid] # gain gaintype = actuator_gaintype[uid] gainprm = actuator_gainprm[worldid % actuator_gainprm.shape[0], uid] gain = 0.0 if gaintype == GainType.FIXED: gain = gainprm[0] elif gaintype == GainType.AFFINE: gain = gainprm[0] + gainprm[1] * length + gainprm[2] * velocity elif gaintype == GainType.MUSCLE: acc0 = actuator_acc0[uid] lengthrange = actuator_lengthrange[uid] gain = util_misc.muscle_gain(length, velocity, lengthrange, acc0, gainprm) # bias biastype = actuator_biastype[uid] biasprm = actuator_biasprm[worldid % actuator_biasprm.shape[0], uid] bias = 0.0 # BiasType.NONE if biastype == BiasType.AFFINE: bias = biasprm[0] + biasprm[1] * length + biasprm[2] * velocity elif biastype == BiasType.MUSCLE: acc0 = actuator_acc0[uid] lengthrange = actuator_lengthrange[uid] bias = util_misc.muscle_bias(length, lengthrange, acc0, biasprm) force = gain * ctrl_act + bias if actuator_forcelimited[uid]: forcerange = actuator_forcerange[worldid % actuator_forcerange.shape[0], uid] force = wp.clamp(force, forcerange[0], forcerange[1]) actuator_force_out[worldid, uid] = force @wp.kernel def _tendon_actuator_force( # Model: actuator_trntype: wp.array(dtype=int), actuator_trnid: wp.array(dtype=wp.vec2i), # Data in: actuator_force_in: wp.array2d(dtype=float), # Out: ten_actfrc_out: wp.array2d(dtype=float), ): worldid, actid = wp.tid() if actuator_trntype[actid] == TrnType.TENDON: tenid = actuator_trnid[actid][0] # TODO(team): only compute for tendons with force limits? wp.atomic_add(ten_actfrc_out[worldid], tenid, actuator_force_in[worldid, actid]) @wp.kernel def _tendon_actuator_force_clamp( # Model: tendon_actfrclimited: wp.array(dtype=bool), tendon_actfrcrange: wp.array2d(dtype=wp.vec2), actuator_trntype: wp.array(dtype=int), actuator_trnid: wp.array(dtype=wp.vec2i), # In: ten_actfrc_in: wp.array2d(dtype=float), # Data out: actuator_force_out: wp.array2d(dtype=float), ): worldid, actid = wp.tid() if actuator_trntype[actid] == TrnType.TENDON: tenid = actuator_trnid[actid][0] if tendon_actfrclimited[tenid]: ten_actfrc = ten_actfrc_in[worldid, tenid] actfrcrange = tendon_actfrcrange[worldid % tendon_actfrcrange.shape[0], tenid] if ten_actfrc < actfrcrange[0]: actuator_force_out[worldid, actid] *= actfrcrange[0] / ten_actfrc elif ten_actfrc > actfrcrange[1]: actuator_force_out[worldid, actid] *= actfrcrange[1] / ten_actfrc @wp.kernel def _qfrc_actuator( # Model: nu: int, ngravcomp: int, jnt_actfrclimited: wp.array(dtype=bool), jnt_actgravcomp: wp.array(dtype=int), jnt_actfrcrange: wp.array2d(dtype=wp.vec2), dof_jntid: wp.array(dtype=int), # Data in: actuator_moment_in: wp.array3d(dtype=float), qfrc_gravcomp_in: wp.array2d(dtype=float), actuator_force_in: wp.array2d(dtype=float), # Data out: qfrc_actuator_out: wp.array2d(dtype=float), ): worldid, dofid = wp.tid() qfrc = float(0.0) for uid in range(nu): qfrc += actuator_moment_in[worldid, uid, dofid] * actuator_force_in[worldid, uid] jntid = dof_jntid[dofid] # actuator-level gravity compensation, skip if added as passive force if ngravcomp and jnt_actgravcomp[jntid]: qfrc += qfrc_gravcomp_in[worldid, dofid] if jnt_actfrclimited[jntid]: frcrange = jnt_actfrcrange[worldid % jnt_actfrcrange.shape[0], jntid] qfrc = wp.clamp(qfrc, frcrange[0], frcrange[1]) qfrc_actuator_out[worldid, dofid] = qfrc
[docs] @event_scope def fwd_actuation(m: Model, d: Data): """Actuation-dependent computations.""" if not m.nu or (m.opt.disableflags & DisableBit.ACTUATION): d.act_dot.zero_() d.qfrc_actuator.zero_() return wp.launch( _actuator_force, dim=(d.nworld, m.nu), inputs=[ m.na, m.opt.timestep, m.actuator_dyntype, m.actuator_gaintype, m.actuator_biastype, m.actuator_actadr, m.actuator_actnum, m.actuator_ctrllimited, m.actuator_forcelimited, m.actuator_actlimited, m.actuator_dynprm, m.actuator_gainprm, m.actuator_biasprm, m.actuator_actearly, m.actuator_ctrlrange, m.actuator_forcerange, m.actuator_actrange, m.actuator_acc0, m.actuator_lengthrange, d.act, d.ctrl, d.actuator_length, d.actuator_velocity, m.opt.disableflags & DisableBit.CLAMPCTRL, ], outputs=[d.act_dot, d.actuator_force], ) if m.ntendon: # total actuator force at tendon ten_actfrc = wp.zeros((d.nworld, m.ntendon), dtype=float) wp.launch( _tendon_actuator_force, dim=(d.nworld, m.nu), inputs=[m.actuator_trntype, m.actuator_trnid, d.actuator_force], outputs=[ten_actfrc], ) wp.launch( _tendon_actuator_force_clamp, dim=(d.nworld, m.nu), inputs=[m.tendon_actfrclimited, m.tendon_actfrcrange, m.actuator_trntype, m.actuator_trnid, ten_actfrc], outputs=[d.actuator_force], ) wp.launch( _qfrc_actuator, dim=(d.nworld, m.nv), inputs=[ m.nu, m.ngravcomp, m.jnt_actfrclimited, m.jnt_actgravcomp, m.jnt_actfrcrange, m.dof_jntid, d.actuator_moment, d.qfrc_gravcomp, d.actuator_force, ], outputs=[d.qfrc_actuator], )
@wp.kernel def _qfrc_smooth( # Data in: qfrc_applied_in: wp.array2d(dtype=float), qfrc_bias_in: wp.array2d(dtype=float), qfrc_passive_in: wp.array2d(dtype=float), qfrc_actuator_in: wp.array2d(dtype=float), # Data out: qfrc_smooth_out: wp.array2d(dtype=float), ): worldid, dofid = wp.tid() qfrc_smooth_out[worldid, dofid] = ( qfrc_passive_in[worldid, dofid] - qfrc_bias_in[worldid, dofid] + qfrc_actuator_in[worldid, dofid] + qfrc_applied_in[worldid, dofid] )
[docs] @event_scope def fwd_acceleration(m: Model, d: Data, factorize: bool = False): """Add up all non-constraint forces, compute qacc_smooth. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. factorize: Flag to factorize inertia matrix. """ wp.launch( _qfrc_smooth, dim=(d.nworld, m.nv), inputs=[d.qfrc_applied, d.qfrc_bias, d.qfrc_passive, d.qfrc_actuator], outputs=[d.qfrc_smooth], ) xfrc_accumulate(m, d, d.qfrc_smooth) if factorize: smooth.factor_solve_i(m, d, d.qM, d.qLD, d.qLDiagInv, d.qacc_smooth, d.qfrc_smooth) else: smooth.solve_m(m, d, d.qacc_smooth, d.qfrc_smooth)
[docs] @event_scope def forward(m: Model, d: Data): """Forward dynamics.""" energy = m.opt.enableflags & EnableBit.ENERGY fwd_position(m, d, factorize=False) d.sensordata.zero_() sensor.sensor_pos(m, d) if energy: if m.sensor_e_potential == 0: # not computed by sensor sensor.energy_pos(m, d) else: d.energy.zero_() fwd_velocity(m, d) sensor.sensor_vel(m, d) if energy: if m.sensor_e_kinetic == 0: # not computed by sensor sensor.energy_vel(m, d) fwd_actuation(m, d) fwd_acceleration(m, d, factorize=True) solver.solve(m, d) sensor.sensor_acc(m, d)
[docs] @event_scope def step(m: Model, d: Data): """Advance simulation.""" # TODO(team): mj_checkPos # TODO(team): mj_checkVel forward(m, d) # TODO(team): mj_checkAcc if m.opt.integrator == IntegratorType.EULER: euler(m, d) elif m.opt.integrator == IntegratorType.RK4: rungekutta4(m, d) elif m.opt.integrator == IntegratorType.IMPLICITFAST: implicit(m, d) else: raise NotImplementedError(f"integrator {m.opt.integrator} not implemented.")
[docs] @event_scope def step1(m: Model, d: Data): """Advance simulation in two phases: before input is set by user.""" energy = m.opt.enableflags & EnableBit.ENERGY # TODO(team): mj_checkPos # TODO(team): mj_checkVel fwd_position(m, d) d.sensordata.zero_() sensor.sensor_pos(m, d) if energy: if m.sensor_e_potential == 0: # not computed by sensor sensor.energy_pos(m, d) else: d.energy.zero_() fwd_velocity(m, d) sensor.sensor_vel(m, d) if energy: if m.sensor_e_kinetic == 0: # not computed by sensor sensor.energy_vel(m, d)
[docs] @event_scope def step2(m: Model, d: Data): """Advance simulation in two phases: after input is set by user.""" fwd_actuation(m, d) fwd_acceleration(m, d) solver.solve(m, d) sensor.sensor_acc(m, d) # TODO(team): mj_checkAcc # integrate with Euler or implicitfast # TODO(team): implicit if m.opt.integrator == IntegratorType.IMPLICITFAST: implicit(m, d) else: # note: RK4 defaults to Euler euler(m, d)