# Copyright 2025 The Newton Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import warp as wp
from mujoco_warp._src import math
from mujoco_warp._src import support
from mujoco_warp._src import util_misc
from mujoco_warp._src.types import MJ_MAXVAL
from mujoco_warp._src.types import MJ_MINVAL
from mujoco_warp._src.types import CamLightType
from mujoco_warp._src.types import ConeType
from mujoco_warp._src.types import Data
from mujoco_warp._src.types import DisableBit
from mujoco_warp._src.types import EqType
from mujoco_warp._src.types import JointType
from mujoco_warp._src.types import Model
from mujoco_warp._src.types import ObjType
from mujoco_warp._src.types import TileSet
from mujoco_warp._src.types import TrnType
from mujoco_warp._src.types import WrapType
from mujoco_warp._src.types import vec5
from mujoco_warp._src.types import vec10
from mujoco_warp._src.types import vec11
from mujoco_warp._src.warp_util import cache_kernel
from mujoco_warp._src.warp_util import event_scope
wp.set_module_options({"enable_backward": False})
@wp.kernel
def _kinematics_branch(
# Model:
qpos0: wp.array2d(dtype=float),
body_parentid: wp.array(dtype=int),
body_mocapid: wp.array(dtype=int),
body_jntnum: wp.array(dtype=int),
body_jntadr: wp.array(dtype=int),
body_pos: wp.array2d(dtype=wp.vec3),
body_quat: wp.array2d(dtype=wp.quat),
jnt_type: wp.array(dtype=int),
jnt_qposadr: wp.array(dtype=int),
jnt_pos: wp.array2d(dtype=wp.vec3),
jnt_axis: wp.array2d(dtype=wp.vec3),
body_branches: wp.array(dtype=int),
body_branch_start: wp.array(dtype=int),
# Data in:
qpos_in: wp.array2d(dtype=float),
mocap_pos_in: wp.array2d(dtype=wp.vec3),
mocap_quat_in: wp.array2d(dtype=wp.quat),
# Data out:
xpos_out: wp.array2d(dtype=wp.vec3),
xquat_out: wp.array2d(dtype=wp.quat),
xanchor_out: wp.array2d(dtype=wp.vec3),
xaxis_out: wp.array2d(dtype=wp.vec3),
):
worldid, branchid = wp.tid()
start = body_branch_start[branchid]
end = body_branch_start[branchid + 1]
qpos = qpos_in[worldid]
for i in range(start, end):
bodyid = body_branches[i]
pid = body_parentid[bodyid]
jntadr = body_jntadr[bodyid]
jntnum = body_jntnum[bodyid]
if jntnum == 1:
jnt_type_ = jnt_type[jntadr]
if jnt_type_ == JointType.FREE:
qadr = jnt_qposadr[jntadr]
xpos = wp.vec3(qpos[qadr], qpos[qadr + 1], qpos[qadr + 2])
xquat = wp.quat(qpos[qadr + 3], qpos[qadr + 4], qpos[qadr + 5], qpos[qadr + 6])
xquat = wp.normalize(xquat)
xpos_out[worldid, bodyid] = xpos
xquat_out[worldid, bodyid] = xquat
xanchor_out[worldid, jntadr] = xpos
xaxis_out[worldid, jntadr] = jnt_axis[worldid % jnt_axis.shape[0], jntadr]
continue
# regular or no joints
# apply fixed translation and rotation relative to parent
jnt_pos_id = worldid % jnt_pos.shape[0]
pid = body_parentid[bodyid]
# mocap bodies have world body as parent
mocapid = body_mocapid[bodyid]
if mocapid >= 0:
xpos = mocap_pos_in[worldid, mocapid]
xquat = mocap_quat_in[worldid, mocapid]
else:
xpos = body_pos[worldid % body_pos.shape[0], bodyid]
xquat = body_quat[worldid % body_quat.shape[0], bodyid]
if pid >= 0:
xpos = math.rot_vec_quat(xpos, xquat_out[worldid, pid]) + xpos_out[worldid, pid]
xquat = math.mul_quat(xquat_out[worldid, pid], xquat)
for _ in range(jntnum):
qadr = jnt_qposadr[jntadr]
jnt_type_ = jnt_type[jntadr]
jnt_axis_ = jnt_axis[worldid % jnt_axis.shape[0], jntadr]
xanchor = math.rot_vec_quat(jnt_pos[jnt_pos_id, jntadr], xquat) + xpos
xaxis = math.rot_vec_quat(jnt_axis_, xquat)
if jnt_type_ == JointType.BALL:
qloc = wp.quat(qpos[qadr + 0], qpos[qadr + 1], qpos[qadr + 2], qpos[qadr + 3])
qloc = wp.normalize(qloc)
xquat = math.mul_quat(xquat, qloc)
# correct for off-center rotation
xpos = xanchor - math.rot_vec_quat(jnt_pos[jnt_pos_id, jntadr], xquat)
elif jnt_type_ == JointType.SLIDE:
xpos += xaxis * (qpos[qadr] - qpos0[worldid % qpos0.shape[0], qadr])
elif jnt_type_ == JointType.HINGE:
qpos0_ = qpos0[worldid % qpos0.shape[0], qadr]
qloc_ = math.axis_angle_to_quat(jnt_axis_, qpos[qadr] - qpos0_)
xquat = math.mul_quat(xquat, qloc_)
# correct for off-center rotation
xpos = xanchor - math.rot_vec_quat(jnt_pos[jnt_pos_id, jntadr], xquat)
xanchor_out[worldid, jntadr] = xanchor
xaxis_out[worldid, jntadr] = xaxis
jntadr += 1
xquat = wp.normalize(xquat)
xpos_out[worldid, bodyid] = xpos
xquat_out[worldid, bodyid] = xquat
@wp.kernel
def _compute_body_inertial_frames(
# Model:
body_ipos: wp.array2d(dtype=wp.vec3),
body_iquat: wp.array2d(dtype=wp.quat),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xquat_in: wp.array2d(dtype=wp.quat),
# Data out:
xipos_out: wp.array2d(dtype=wp.vec3),
ximat_out: wp.array2d(dtype=wp.mat33),
):
worldid, bodyid = wp.tid()
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
xipos_out[worldid, bodyid] = xpos + math.rot_vec_quat(body_ipos[worldid % body_ipos.shape[0], bodyid], xquat)
ximat_out[worldid, bodyid] = math.quat_to_mat(math.mul_quat(xquat, body_iquat[worldid % body_iquat.shape[0], bodyid]))
@wp.kernel
def _compute_body_matrices(
# Data in:
xquat_in: wp.array2d(dtype=wp.quat),
# Data out:
xmat_out: wp.array2d(dtype=wp.mat33),
):
worldid, bodyid = wp.tid()
xmat_out[worldid, bodyid] = math.quat_to_mat(xquat_in[worldid, bodyid])
@wp.kernel
def _geom_local_to_global(
# Model:
body_rootid: wp.array(dtype=int),
body_weldid: wp.array(dtype=int),
body_mocapid: wp.array(dtype=int),
geom_bodyid: wp.array(dtype=int),
geom_pos: wp.array2d(dtype=wp.vec3),
geom_quat: wp.array2d(dtype=wp.quat),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xquat_in: wp.array2d(dtype=wp.quat),
# Data out:
geom_xpos_out: wp.array2d(dtype=wp.vec3),
geom_xmat_out: wp.array2d(dtype=wp.mat33),
):
worldid, geomid = wp.tid()
bodyid = geom_bodyid[geomid]
if body_weldid[bodyid] == 0 and body_mocapid[body_rootid[bodyid]] == -1:
# geoms attached to the world are static (unless they are descended from mcocap bodies)
# for such static geoms, geom_xpos and geom_xquat are computed only once during make_data
return
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
geom_xpos_out[worldid, geomid] = xpos + math.rot_vec_quat(geom_pos[worldid % geom_pos.shape[0], geomid], xquat)
geom_xmat_out[worldid, geomid] = math.quat_to_mat(math.mul_quat(xquat, geom_quat[worldid % geom_quat.shape[0], geomid]))
@wp.kernel
def _site_local_to_global(
# Model:
site_bodyid: wp.array(dtype=int),
site_pos: wp.array2d(dtype=wp.vec3),
site_quat: wp.array2d(dtype=wp.quat),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xquat_in: wp.array2d(dtype=wp.quat),
# Data out:
site_xpos_out: wp.array2d(dtype=wp.vec3),
site_xmat_out: wp.array2d(dtype=wp.mat33),
):
worldid, siteid = wp.tid()
bodyid = site_bodyid[siteid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
site_xpos_out[worldid, siteid] = xpos + math.rot_vec_quat(site_pos[worldid % site_pos.shape[0], siteid], xquat)
site_xmat_out[worldid, siteid] = math.quat_to_mat(math.mul_quat(xquat, site_quat[worldid % site_quat.shape[0], siteid]))
@wp.kernel
def _flex_vertices(
# Model:
flex_vertbodyid: wp.array(dtype=int),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
# Data out:
flexvert_xpos_out: wp.array2d(dtype=wp.vec3),
):
worldid, vertid = wp.tid()
flexvert_xpos_out[worldid, vertid] = xpos_in[worldid, flex_vertbodyid[vertid]]
@wp.kernel
def _flex_edges(
# Model:
nflex: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
body_dofadr: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
flex_vertadr: wp.array(dtype=int),
flex_edgeadr: wp.array(dtype=int),
flex_edgenum: wp.array(dtype=int),
flex_vertbodyid: wp.array(dtype=int),
flex_edge: wp.array(dtype=wp.vec2i),
flexedge_J_rowadr: wp.array(dtype=int),
flexedge_J_colind: wp.array(dtype=int),
# Data in:
qvel_in: wp.array2d(dtype=float),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
flexvert_xpos_in: wp.array2d(dtype=wp.vec3),
# Data out:
flexedge_J_out: wp.array3d(dtype=float),
flexedge_length_out: wp.array2d(dtype=float),
flexedge_velocity_out: wp.array2d(dtype=float),
):
worldid, edgeid = wp.tid()
for i in range(nflex):
locid = edgeid - flex_edgeadr[i]
if locid >= 0 and locid < flex_edgenum[i]:
f = i
break
vbase = flex_vertadr[f]
v = flex_edge[edgeid]
vbase0 = vbase + v[0]
vbase1 = vbase + v[1]
pos1 = flexvert_xpos_in[worldid, vbase0]
pos2 = flexvert_xpos_in[worldid, vbase1]
vec = pos2 - pos1
edge, edge_length = math.normalize_with_norm(vec)
flexedge_length_out[worldid, edgeid] = edge_length
# TODO(quaglino): use Jacobian
b1 = flex_vertbodyid[vbase0]
b2 = flex_vertbodyid[vbase1]
dofi = body_dofadr[b1]
dofj = body_dofadr[b2]
dofi0 = dofi + 0
dofi1 = dofi + 1
dofi2 = dofi + 2
dofj0 = dofj + 0
dofj1 = dofj + 1
dofj2 = dofj + 2
vel1 = wp.vec3(qvel_in[worldid, dofi0], qvel_in[worldid, dofi1], qvel_in[worldid, dofi2])
vel2 = wp.vec3(qvel_in[worldid, dofj0], qvel_in[worldid, dofj1], qvel_in[worldid, dofj2])
flexedge_velocity_out[worldid, edgeid] = wp.dot(vel2 - vel1, edge)
rowadr = flexedge_J_rowadr[edgeid]
sparseid0 = rowadr + 0
sparseid1 = rowadr + 1
sparseid2 = rowadr + 2
sparseid3 = rowadr + 3
sparseid4 = rowadr + 4
sparseid5 = rowadr + 5
# TODO(team): jacdif
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofi0, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofi0, worldid)
jacdif = jacp2 - jacp1
Ji0 = wp.dot(jacdif, edge)
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofi1, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofi1, worldid)
jacdif = jacp2 - jacp1
Ji1 = wp.dot(jacdif, edge)
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofi2, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofi2, worldid)
jacdif = jacp2 - jacp1
Ji2 = wp.dot(jacdif, edge)
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofj0, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofj0, worldid)
jacdif = jacp2 - jacp1
Jj0 = wp.dot(jacdif, edge)
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofj1, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofj1, worldid)
jacdif = jacp2 - jacp1
Jj1 = wp.dot(jacdif, edge)
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos1, b1, dofj2, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pos2, b2, dofj2, worldid)
jacdif = jacp2 - jacp1
Jj2 = wp.dot(jacdif, edge)
flexedge_J_out[worldid, 0, sparseid0] = Ji0
flexedge_J_out[worldid, 0, sparseid1] = Ji1
flexedge_J_out[worldid, 0, sparseid2] = Ji2
flexedge_J_out[worldid, 0, sparseid3] = Jj0
flexedge_J_out[worldid, 0, sparseid4] = Jj1
flexedge_J_out[worldid, 0, sparseid5] = Jj2
[docs]
@event_scope
def kinematics(m: Model, d: Data):
"""Computes forward kinematics for all bodies, sites, geoms, and flexible elements.
This function updates the global positions and orientations of all bodies, as well as the
derived positions and orientations of geoms, sites, and flexible elements, based on the
current joint positions and any attached mocap bodies.
"""
wp.launch(
_kinematics_branch,
dim=(d.nworld, m.nbranch),
inputs=[
m.qpos0,
m.body_parentid,
m.body_mocapid,
m.body_jntnum,
m.body_jntadr,
m.body_pos,
m.body_quat,
m.jnt_type,
m.jnt_qposadr,
m.jnt_pos,
m.jnt_axis,
m.body_branches,
m.body_branch_start,
d.qpos,
d.mocap_pos,
d.mocap_quat,
],
outputs=[d.xpos, d.xquat, d.xanchor, d.xaxis],
)
wp.launch(
_compute_body_matrices,
dim=(d.nworld, m.nbody),
inputs=[d.xquat],
outputs=[d.xmat],
)
wp.launch(
_compute_body_inertial_frames,
dim=(d.nworld, m.nbody),
inputs=[m.body_ipos, m.body_iquat, d.xpos, d.xquat],
outputs=[d.xipos, d.ximat],
)
wp.launch(
_geom_local_to_global,
dim=(d.nworld, m.ngeom),
inputs=[m.body_rootid, m.body_weldid, m.body_mocapid, m.geom_bodyid, m.geom_pos, m.geom_quat, d.xpos, d.xquat],
outputs=[d.geom_xpos, d.geom_xmat],
)
wp.launch(
_site_local_to_global,
dim=(d.nworld, m.nsite),
inputs=[m.site_bodyid, m.site_pos, m.site_quat, d.xpos, d.xquat],
outputs=[d.site_xpos, d.site_xmat],
)
@event_scope
def flex(m: Model, d: Data):
wp.launch(_flex_vertices, dim=(d.nworld, m.nflexvert), inputs=[m.flex_vertbodyid, d.xpos], outputs=[d.flexvert_xpos])
wp.launch(
_flex_edges,
dim=(d.nworld, m.nflexedge),
inputs=[
m.nflex,
m.body_parentid,
m.body_rootid,
m.body_dofadr,
m.dof_bodyid,
m.flex_vertadr,
m.flex_edgeadr,
m.flex_edgenum,
m.flex_vertbodyid,
m.flex_edge,
m.flexedge_J_rowadr,
m.flexedge_J_colind,
d.qvel,
d.subtree_com,
d.cdof,
d.flexvert_xpos,
],
outputs=[
d.flexedge_J,
d.flexedge_length,
d.flexedge_velocity,
],
)
@wp.kernel
def _subtree_com_init(
# Model:
body_mass: wp.array2d(dtype=float),
# Data in:
xipos_in: wp.array2d(dtype=wp.vec3),
# Data out:
subtree_com_out: wp.array2d(dtype=wp.vec3),
):
worldid, bodyid = wp.tid()
subtree_com_out[worldid, bodyid] = xipos_in[worldid, bodyid] * body_mass[worldid % body_mass.shape[0], bodyid]
@wp.kernel
def _subtree_com_acc(
# Model:
body_parentid: wp.array(dtype=int),
# Data in:
subtree_com_in: wp.array2d(dtype=wp.vec3),
# In:
body_tree_: wp.array(dtype=int),
# Data out:
subtree_com_out: wp.array2d(dtype=wp.vec3),
):
worldid, nodeid = wp.tid()
bodyid = body_tree_[nodeid]
pid = body_parentid[bodyid]
if bodyid != 0:
wp.atomic_add(subtree_com_out, worldid, pid, subtree_com_in[worldid, bodyid])
@wp.kernel
def _subtree_div(
# Model:
body_subtreemass: wp.array2d(dtype=float),
# Data in:
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
subtree_com_out: wp.array2d(dtype=wp.vec3),
):
worldid, bodyid = wp.tid()
com = subtree_com_in[worldid, bodyid]
mass = body_subtreemass[worldid % body_subtreemass.shape[0], bodyid]
if mass != 0.0:
subtree_com_out[worldid, bodyid] = com / mass
@wp.kernel
def _cinert(
# Model:
body_rootid: wp.array(dtype=int),
body_mass: wp.array2d(dtype=float),
body_inertia: wp.array2d(dtype=wp.vec3),
# Data in:
xipos_in: wp.array2d(dtype=wp.vec3),
ximat_in: wp.array2d(dtype=wp.mat33),
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
cinert_out: wp.array2d(dtype=vec10),
):
worldid, bodyid = wp.tid()
mat = ximat_in[worldid, bodyid]
inert = body_inertia[worldid % body_inertia.shape[0], bodyid]
mass = body_mass[worldid % body_mass.shape[0], bodyid]
dif = xipos_in[worldid, bodyid] - subtree_com_in[worldid, body_rootid[bodyid]]
# express inertia in com-based frame (mju_inertCom)
res = vec10()
# res_rot = mat * diag(inert) * mat'
tmp = mat @ wp.diag(inert) @ wp.transpose(mat)
res[0] = tmp[0, 0]
res[1] = tmp[1, 1]
res[2] = tmp[2, 2]
res[3] = tmp[0, 1]
res[4] = tmp[0, 2]
res[5] = tmp[1, 2]
# res_rot -= mass * dif_cross * dif_cross
res[0] += mass * (dif[1] * dif[1] + dif[2] * dif[2])
res[1] += mass * (dif[0] * dif[0] + dif[2] * dif[2])
res[2] += mass * (dif[0] * dif[0] + dif[1] * dif[1])
res[3] -= mass * dif[0] * dif[1]
res[4] -= mass * dif[0] * dif[2]
res[5] -= mass * dif[1] * dif[2]
# res_tran = mass * dif
res[6] = mass * dif[0]
res[7] = mass * dif[1]
res[8] = mass * dif[2]
# res_mass = mass
res[9] = mass
cinert_out[worldid, bodyid] = res
@wp.kernel
def _cdof(
# Model:
body_rootid: wp.array(dtype=int),
jnt_type: wp.array(dtype=int),
jnt_dofadr: wp.array(dtype=int),
jnt_bodyid: wp.array(dtype=int),
# Data in:
xmat_in: wp.array2d(dtype=wp.mat33),
xanchor_in: wp.array2d(dtype=wp.vec3),
xaxis_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
cdof_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, jntid = wp.tid()
bodyid = jnt_bodyid[jntid]
dofid = jnt_dofadr[jntid]
jnt_type_ = jnt_type[jntid]
xaxis = xaxis_in[worldid, jntid]
xmat = wp.transpose(xmat_in[worldid, bodyid])
# compute com-anchor vector
offset = subtree_com_in[worldid, body_rootid[bodyid]] - xanchor_in[worldid, jntid]
res = cdof_out[worldid]
if jnt_type_ == JointType.FREE:
res[dofid + 0] = wp.spatial_vector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)
res[dofid + 1] = wp.spatial_vector(0.0, 0.0, 0.0, 0.0, 1.0, 0.0)
res[dofid + 2] = wp.spatial_vector(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)
# I_3 rotation in child frame (assume no subsequent rotations)
res[dofid + 3] = wp.spatial_vector(xmat[0], wp.cross(xmat[0], offset))
res[dofid + 4] = wp.spatial_vector(xmat[1], wp.cross(xmat[1], offset))
res[dofid + 5] = wp.spatial_vector(xmat[2], wp.cross(xmat[2], offset))
elif jnt_type_ == JointType.BALL: # ball
# I_3 rotation in child frame (assume no subsequent rotations)
res[dofid + 0] = wp.spatial_vector(xmat[0], wp.cross(xmat[0], offset))
res[dofid + 1] = wp.spatial_vector(xmat[1], wp.cross(xmat[1], offset))
res[dofid + 2] = wp.spatial_vector(xmat[2], wp.cross(xmat[2], offset))
elif jnt_type_ == JointType.SLIDE:
res[dofid] = wp.spatial_vector(wp.vec3(0.0), xaxis)
elif jnt_type_ == JointType.HINGE: # hinge
res[dofid] = wp.spatial_vector(xaxis, wp.cross(xaxis, offset))
[docs]
@event_scope
def com_pos(m: Model, d: Data):
"""Computes subtree center of mass positions.
Transforms inertia and motion to global frame centered at subtree CoM. Accumulates the
mass-weighted positions up the kinematic tree, divides by total mass, and computes composite
inertias and motion degrees of freedom in the subtree CoM frame.
"""
wp.launch(_subtree_com_init, dim=(d.nworld, m.nbody), inputs=[m.body_mass, d.xipos], outputs=[d.subtree_com])
for i in reversed(range(len(m.body_tree))):
body_tree = m.body_tree[i]
wp.launch(
_subtree_com_acc,
dim=(d.nworld, body_tree.size),
inputs=[m.body_parentid, d.subtree_com, body_tree],
outputs=[d.subtree_com],
)
wp.launch(_subtree_div, dim=(d.nworld, m.nbody), inputs=[m.body_subtreemass, d.subtree_com], outputs=[d.subtree_com])
wp.launch(
_cinert,
dim=(d.nworld, m.nbody),
inputs=[m.body_rootid, m.body_mass, m.body_inertia, d.xipos, d.ximat, d.subtree_com],
outputs=[d.cinert],
)
wp.launch(
_cdof,
dim=(d.nworld, m.njnt),
inputs=[m.body_rootid, m.jnt_type, m.jnt_dofadr, m.jnt_bodyid, d.xmat, d.xanchor, d.xaxis, d.subtree_com],
outputs=[d.cdof],
)
@wp.kernel
def _cam_local_to_global(
# Model:
cam_mode: wp.array(dtype=int),
cam_bodyid: wp.array(dtype=int),
cam_targetbodyid: wp.array(dtype=int),
cam_pos: wp.array2d(dtype=wp.vec3),
cam_quat: wp.array2d(dtype=wp.quat),
cam_poscom0: wp.array2d(dtype=wp.vec3),
cam_pos0: wp.array2d(dtype=wp.vec3),
cam_mat0: wp.array2d(dtype=wp.mat33),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xquat_in: wp.array2d(dtype=wp.quat),
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
cam_xpos_out: wp.array2d(dtype=wp.vec3),
cam_xmat_out: wp.array2d(dtype=wp.mat33),
):
worldid, camid = wp.tid()
cam_pos_id = worldid % cam_pos.shape[0]
cam_quat_id = worldid % cam_quat.shape[0]
is_target_cam = (cam_mode[camid] == CamLightType.TARGETBODY) or (cam_mode[camid] == CamLightType.TARGETBODYCOM)
invalid_target = is_target_cam and (cam_targetbodyid[camid] < 0)
if invalid_target:
bodyid = cam_bodyid[camid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
cam_xpos_out[worldid, camid] = xpos + math.rot_vec_quat(cam_pos[cam_pos_id, camid], xquat)
cam_xmat_out[worldid, camid] = math.quat_to_mat(math.mul_quat(xquat, cam_quat[cam_quat_id, camid]))
elif cam_mode[camid] == CamLightType.TRACK:
cam_xmat_out[worldid, camid] = cam_mat0[worldid % cam_mat0.shape[0], camid]
body_xpos = xpos_in[worldid, cam_bodyid[camid]]
cam_xpos_out[worldid, camid] = body_xpos + cam_pos0[worldid % cam_pos0.shape[0], camid]
elif cam_mode[camid] == CamLightType.TRACKCOM:
cam_xmat_out[worldid, camid] = cam_mat0[worldid % cam_mat0.shape[0], camid]
cam_xpos_out[worldid, camid] = (
subtree_com_in[worldid, cam_bodyid[camid]] + cam_poscom0[worldid % cam_poscom0.shape[0], camid]
)
elif cam_mode[camid] == CamLightType.TARGETBODY or cam_mode[camid] == CamLightType.TARGETBODYCOM:
bodyid = cam_bodyid[camid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
cam_xpos_out[worldid, camid] = xpos + math.rot_vec_quat(cam_pos[cam_pos_id, camid], xquat)
pos = xpos_in[worldid, cam_targetbodyid[camid]]
if cam_mode[camid] == CamLightType.TARGETBODYCOM:
pos = subtree_com_in[worldid, cam_targetbodyid[camid]]
# zaxis = -desired camera direction, in global frame
mat_3 = wp.normalize(cam_xpos_out[worldid, camid] - pos)
# xaxis: orthogonal to zaxis and to (0,0,1)
mat_1 = wp.normalize(wp.cross(wp.vec3(0.0, 0.0, 1.0), mat_3))
mat_2 = wp.normalize(wp.cross(mat_3, mat_1))
# fmt: off
cam_xmat_out[worldid, camid] = wp.mat33(
mat_1[0], mat_2[0], mat_3[0],
mat_1[1], mat_2[1], mat_3[1],
mat_1[2], mat_2[2], mat_3[2]
)
# fmt: on
else:
bodyid = cam_bodyid[camid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
cam_xpos_out[worldid, camid] = xpos + math.rot_vec_quat(cam_pos[cam_pos_id, camid], xquat)
cam_xmat_out[worldid, camid] = math.quat_to_mat(math.mul_quat(xquat, cam_quat[cam_quat_id, camid]))
@wp.kernel
def _light_local_to_global(
# Model:
light_mode: wp.array(dtype=int),
light_bodyid: wp.array(dtype=int),
light_targetbodyid: wp.array(dtype=int),
light_pos: wp.array2d(dtype=wp.vec3),
light_dir: wp.array2d(dtype=wp.vec3),
light_poscom0: wp.array2d(dtype=wp.vec3),
light_pos0: wp.array2d(dtype=wp.vec3),
light_dir0: wp.array2d(dtype=wp.vec3),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xquat_in: wp.array2d(dtype=wp.quat),
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
light_xpos_out: wp.array2d(dtype=wp.vec3),
light_xdir_out: wp.array2d(dtype=wp.vec3),
):
worldid, lightid = wp.tid()
light_pos_id = worldid % light_pos.shape[0]
light_dir_id = worldid % light_dir.shape[0]
is_target_light = (light_mode[lightid] == CamLightType.TARGETBODY) or (light_mode[lightid] == CamLightType.TARGETBODYCOM)
invalid_target = is_target_light and (light_targetbodyid[lightid] < 0)
if invalid_target:
bodyid = light_bodyid[lightid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
light_xpos_out[worldid, lightid] = xpos + math.rot_vec_quat(light_pos[light_pos_id, lightid], xquat)
light_xdir_out[worldid, lightid] = math.rot_vec_quat(light_dir[light_dir_id, lightid], xquat)
return
elif light_mode[lightid] == CamLightType.TRACK:
light_xdir_out[worldid, lightid] = light_dir0[worldid % light_dir0.shape[0], lightid]
body_xpos = xpos_in[worldid, light_bodyid[lightid]]
light_xpos_out[worldid, lightid] = body_xpos + light_pos0[worldid % light_pos0.shape[0], lightid]
elif light_mode[lightid] == CamLightType.TRACKCOM:
light_xdir_out[worldid, lightid] = light_dir0[worldid % light_dir0.shape[0], lightid]
light_xpos_out[worldid, lightid] = (
subtree_com_in[worldid, light_bodyid[lightid]] + light_poscom0[worldid % light_poscom0.shape[0], lightid]
)
elif light_mode[lightid] == CamLightType.TARGETBODY or light_mode[lightid] == CamLightType.TARGETBODYCOM:
bodyid = light_bodyid[lightid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
light_xpos_out[worldid, lightid] = xpos + math.rot_vec_quat(light_pos[light_pos_id, lightid], xquat)
pos = xpos_in[worldid, light_targetbodyid[lightid]]
if light_mode[lightid] == CamLightType.TARGETBODYCOM:
pos = subtree_com_in[worldid, light_targetbodyid[lightid]]
light_xdir_out[worldid, lightid] = pos - light_xpos_out[worldid, lightid]
else:
bodyid = light_bodyid[lightid]
xpos = xpos_in[worldid, bodyid]
xquat = xquat_in[worldid, bodyid]
light_xpos_out[worldid, lightid] = xpos + math.rot_vec_quat(light_pos[light_pos_id, lightid], xquat)
light_xdir_out[worldid, lightid] = math.rot_vec_quat(light_dir[light_dir_id, lightid], xquat)
light_xdir_out[worldid, lightid] = wp.normalize(light_xdir_out[worldid, lightid])
[docs]
@event_scope
def camlight(m: Model, d: Data):
"""Computes camera and light positions and orientations.
Updates the global positions and orientations for all cameras and lights in the model,
including special handling for tracking and target modes.
"""
wp.launch(
_cam_local_to_global,
dim=(d.nworld, m.ncam),
inputs=[
m.cam_mode,
m.cam_bodyid,
m.cam_targetbodyid,
m.cam_pos,
m.cam_quat,
m.cam_poscom0,
m.cam_pos0,
m.cam_mat0,
d.xpos,
d.xquat,
d.subtree_com,
],
outputs=[d.cam_xpos, d.cam_xmat],
)
wp.launch(
_light_local_to_global,
dim=(d.nworld, m.nlight),
inputs=[
m.light_mode,
m.light_bodyid,
m.light_targetbodyid,
m.light_pos,
m.light_dir,
m.light_poscom0,
m.light_pos0,
m.light_dir0,
d.xpos,
d.xquat,
d.subtree_com,
],
outputs=[d.light_xpos, d.light_xdir],
)
@wp.kernel
def _crb_accumulate(
# Model:
body_parentid: wp.array(dtype=int),
# Data in:
crb_in: wp.array2d(dtype=vec10),
# In:
body_tree_: wp.array(dtype=int),
# Data out:
crb_out: wp.array2d(dtype=vec10),
):
worldid, nodeid = wp.tid()
bodyid = body_tree_[nodeid]
pid = body_parentid[bodyid]
if pid == 0:
return
wp.atomic_add(crb_out, worldid, pid, crb_in[worldid, bodyid])
@wp.kernel
def _qM_sparse(
# Model:
dof_bodyid: wp.array(dtype=int),
dof_parentid: wp.array(dtype=int),
dof_Madr: wp.array(dtype=int),
dof_armature: wp.array2d(dtype=float),
# Data in:
cdof_in: wp.array2d(dtype=wp.spatial_vector),
crb_in: wp.array2d(dtype=vec10),
# Data out:
qM_out: wp.array3d(dtype=float),
):
worldid, dofid = wp.tid()
madr_ij = dof_Madr[dofid] # dof_Madr is not batched
bodyid = dof_bodyid[dofid]
# init M(i,i) with armature inertia
qM_out[worldid, 0, madr_ij] = dof_armature[worldid, dofid]
# precompute buf = crb_body_i * cdof_i
buf = math.inert_vec(crb_in[worldid, bodyid], cdof_in[worldid, dofid])
# sparse backward pass over ancestors
while dofid >= 0:
qM_out[worldid, 0, madr_ij] += wp.dot(cdof_in[worldid, dofid], buf)
madr_ij += 1
dofid = dof_parentid[dofid]
@wp.kernel
def _qM_dense(
# Model:
dof_bodyid: wp.array(dtype=int),
dof_parentid: wp.array(dtype=int),
dof_armature: wp.array2d(dtype=float),
# Data in:
cdof_in: wp.array2d(dtype=wp.spatial_vector),
crb_in: wp.array2d(dtype=vec10),
# Data out:
qM_out: wp.array3d(dtype=float),
):
worldid, dofid = wp.tid()
bodyid = dof_bodyid[dofid]
# init M(i,i) with armature inertia.
M = dof_armature[worldid % dof_armature.shape[0], dofid]
# precompute buf = crb_body_i * cdof_i
buf = math.inert_vec(crb_in[worldid, bodyid], cdof_in[worldid, dofid])
M += wp.dot(cdof_in[worldid, dofid], buf)
qM_out[worldid, dofid, dofid] = M
# sparse backward pass over ancestors
dofidi = dofid
dofid = dof_parentid[dofid]
while dofid >= 0:
qMij = wp.dot(cdof_in[worldid, dofid], buf)
qM_out[worldid, dofidi, dofid] += qMij
qM_out[worldid, dofid, dofidi] += qMij
dofid = dof_parentid[dofid]
[docs]
@event_scope
def crb(m: Model, d: Data):
"""Computes composite rigid body inertias for each body and the joint-space inertia matrix.
Accumulates composite rigid body inertias up the kinematic tree and computes the
joint-space inertia matrix in either sparse or dense format, depending on model options.
"""
wp.copy(d.crb, d.cinert)
for i in reversed(range(len(m.body_tree))):
body_tree = m.body_tree[i]
wp.launch(_crb_accumulate, dim=(d.nworld, body_tree.size), inputs=[m.body_parentid, d.crb, body_tree], outputs=[d.crb])
d.qM.zero_()
if m.is_sparse:
wp.launch(
_qM_sparse,
dim=(d.nworld, m.nv),
inputs=[m.dof_bodyid, m.dof_parentid, m.dof_Madr, m.dof_armature, d.cdof, d.crb],
outputs=[d.qM],
)
else:
wp.launch(
_qM_dense, dim=(d.nworld, m.nv), inputs=[m.dof_bodyid, m.dof_parentid, m.dof_armature, d.cdof, d.crb], outputs=[d.qM]
)
@wp.kernel
def _tendon_armature(
# Model:
dof_parentid: wp.array(dtype=int),
dof_Madr: wp.array(dtype=int),
tendon_armature: wp.array2d(dtype=float),
is_sparse: bool,
# Data in:
ten_J_in: wp.array3d(dtype=float),
# Data out:
qM_out: wp.array3d(dtype=float),
):
worldid, tenid, dofid = wp.tid()
if is_sparse: # is_sparse is not batched
madr_ij = dof_Madr[dofid]
armature = tendon_armature[worldid, tenid]
if armature == 0.0:
return
ten_Ji = ten_J_in[worldid, tenid, dofid]
if ten_Ji == 0.0:
return
# sparse backward pass over ancestors
dofidi = dofid
while dofid >= 0:
if dofid != dofidi:
ten_Jj = ten_J_in[worldid, tenid, dofid]
else:
ten_Jj = ten_Ji
qMij = armature * ten_Jj * ten_Ji
if is_sparse:
wp.atomic_add(qM_out[worldid, 0], madr_ij, qMij)
madr_ij += 1
else:
wp.atomic_add(qM_out[worldid, dofidi], dofid, qMij)
if dofidi != dofid:
wp.atomic_add(qM_out[worldid, dofid], dofidi, qMij)
dofid = dof_parentid[dofid]
@event_scope
def tendon_armature(m: Model, d: Data):
"""Add tendon armature to qM."""
wp.launch(
_tendon_armature,
dim=(d.nworld, m.ntendon, m.nv),
inputs=[m.dof_parentid, m.dof_Madr, m.tendon_armature, m.is_sparse, d.ten_J],
outputs=[d.qM],
)
@wp.kernel
def _copy_CSR(
# Model:
mapM2M: wp.array(dtype=int),
# In:
M_in: wp.array3d(dtype=float),
# Out:
L_out: wp.array3d(dtype=float),
):
worldid, ind = wp.tid()
L_out[worldid, 0, ind] = M_in[worldid, 0, mapM2M[ind]]
@wp.kernel
def _qLD_acc(
# Model:
M_rownnz: wp.array(dtype=int),
M_rowadr: wp.array(dtype=int),
# In:
qLD_updates_: wp.array(dtype=wp.vec3i),
L_in: wp.array3d(dtype=float),
# Out:
L_out: wp.array3d(dtype=float),
):
worldid, nodeid = wp.tid()
update = qLD_updates_[nodeid]
i, k, Madr_ki = update[0], update[1], update[2]
Madr_i = M_rowadr[i] # Address of row being updated
diag_k = M_rowadr[k] + M_rownnz[k] - 1 # Address of diagonal element of k
# tmp = M(k,i) / M(k,k)
tmp = L_out[worldid, 0, Madr_ki] / L_out[worldid, 0, diag_k]
for j in range(M_rownnz[i]):
# M(i,j) -= M(k,j) * tmp
wp.atomic_sub(L_out[worldid, 0], Madr_i + j, L_in[worldid, 0, M_rowadr[k] + j] * tmp)
# M(k,i) = tmp
L_out[worldid, 0, Madr_ki] = tmp
@wp.kernel
def _qLDiag_div(
# Model:
M_rownnz: wp.array(dtype=int),
M_rowadr: wp.array(dtype=int),
# In:
L_in: wp.array3d(dtype=float),
# Out:
D_out: wp.array2d(dtype=float),
):
worldid, dofid = wp.tid()
diag_i = M_rowadr[dofid] + M_rownnz[dofid] - 1 # Address of diagonal element of i
D_out[worldid, dofid] = 1.0 / L_in[worldid, 0, diag_i]
def _factor_i_sparse(m: Model, d: Data, M: wp.array3d(dtype=float), L: wp.array3d(dtype=float), D: wp.array2d(dtype=float)):
"""Sparse L'*D*L factorization of inertia-like matrix M, assumed spd."""
wp.launch(_copy_CSR, dim=(d.nworld, m.nC), inputs=[m.mapM2M, M], outputs=[L])
for i in reversed(range(len(m.qLD_updates))):
qLD_updates = m.qLD_updates[i]
wp.launch(_qLD_acc, dim=(d.nworld, qLD_updates.size), inputs=[m.M_rownnz, m.M_rowadr, qLD_updates, L], outputs=[L])
wp.launch(_qLDiag_div, dim=(d.nworld, m.nv), inputs=[m.M_rownnz, m.M_rowadr, L], outputs=[D])
@cache_kernel
def _tile_cholesky_factorize(tile: TileSet):
"""Returns a kernel for dense Cholesky factorization of a tile."""
@wp.kernel(module="unique", enable_backward=False)
def cholesky_factorize(
# Data in:
qM_in: wp.array3d(dtype=float),
# In:
adr: wp.array(dtype=int),
# Out:
L_out: wp.array3d(dtype=float),
):
worldid, nodeid = wp.tid()
TILE_SIZE = wp.static(tile.size)
dofid = adr[nodeid]
M_tile = wp.tile_load(qM_in[worldid], shape=(TILE_SIZE, TILE_SIZE), offset=(dofid, dofid))
L_tile = wp.tile_cholesky(M_tile)
wp.tile_store(L_out[worldid], L_tile, offset=(dofid, dofid))
return cholesky_factorize
def _factor_i_dense(m: Model, d: Data, M: wp.array, L: wp.array):
"""Dense Cholesky factorization of inertia-like matrix M, assumed spd."""
for tile in m.qM_tiles:
wp.launch_tiled(
_tile_cholesky_factorize(tile),
dim=(d.nworld, tile.adr.size),
inputs=[M, tile.adr],
outputs=[L],
block_dim=m.block_dim.cholesky_factorize,
)
[docs]
@event_scope
def factor_m(m: Model, d: Data):
"""Factorization of inertia-like matrix M, assumed spd."""
if m.is_sparse:
_factor_i_sparse(m, d, d.qM, d.qLD, d.qLDiagInv)
else:
_factor_i_dense(m, d, d.qM, d.qLD)
@wp.kernel
def _cacc_world(
# In:
gravity: wp.array(dtype=wp.vec3),
# Data out:
cacc_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid = wp.tid()
cacc_out[worldid, 0] = wp.spatial_vector(wp.vec3(0.0), -gravity[worldid % gravity.shape[0]])
def _rne_cacc_world(m: Model, d: Data):
if m.opt.disableflags & DisableBit.GRAVITY:
d.cacc.zero_()
else:
wp.launch(_cacc_world, dim=[d.nworld], inputs=[m.opt.gravity], outputs=[d.cacc])
@wp.kernel
def _cacc_branch(
# Model:
body_parentid: wp.array(dtype=int),
body_dofnum: wp.array(dtype=int),
body_dofadr: wp.array(dtype=int),
body_branches: wp.array(dtype=int),
body_branch_start: wp.array(dtype=int),
# Data in:
qvel_in: wp.array2d(dtype=float),
qacc_in: wp.array2d(dtype=float),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
cdof_dot_in: wp.array2d(dtype=wp.spatial_vector),
# In:
flg_acc: bool,
# Data out:
cacc_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, branchid = wp.tid()
start = body_branch_start[branchid]
end = body_branch_start[branchid + 1]
bodyid = body_branches[start]
pid = body_parentid[bodyid]
local_cacc = cacc_out[worldid, pid]
for i in range(start, end):
bodyid = body_branches[i]
dofnum = body_dofnum[bodyid]
dofadr = body_dofadr[bodyid]
for j in range(dofnum):
local_cacc += cdof_dot_in[worldid, dofadr + j] * qvel_in[worldid, dofadr + j]
if flg_acc:
local_cacc += cdof_in[worldid, dofadr + j] * qacc_in[worldid, dofadr + j]
cacc_out[worldid, bodyid] = local_cacc
def _rne_cacc_forward(m: Model, d: Data, flg_acc: bool = False):
wp.launch(
_cacc_branch,
dim=(d.nworld, m.nbranch),
inputs=[
m.body_parentid,
m.body_dofnum,
m.body_dofadr,
m.body_branches,
m.body_branch_start,
d.qvel,
d.qacc,
d.cdof,
d.cdof_dot,
flg_acc,
],
outputs=[d.cacc],
)
@wp.kernel
def _cfrc(
# Data in:
cinert_in: wp.array2d(dtype=vec10),
cvel_in: wp.array2d(dtype=wp.spatial_vector),
cacc_in: wp.array2d(dtype=wp.spatial_vector),
cfrc_ext_in: wp.array2d(dtype=wp.spatial_vector),
# In:
flg_cfrc_ext: bool,
# Data out:
cfrc_int_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, bodyid = wp.tid()
bodyid += 1 # skip world body
cacc = cacc_in[worldid, bodyid]
cinert = cinert_in[worldid, bodyid]
cvel = cvel_in[worldid, bodyid]
frc = math.inert_vec(cinert, cacc)
frc += math.motion_cross_force(cvel, math.inert_vec(cinert, cvel))
if flg_cfrc_ext:
frc -= cfrc_ext_in[worldid, bodyid]
cfrc_int_out[worldid, bodyid] = frc
def _rne_cfrc(m: Model, d: Data, flg_cfrc_ext: bool = False):
wp.launch(
_cfrc, dim=[d.nworld, m.nbody - 1], inputs=[d.cinert, d.cvel, d.cacc, d.cfrc_ext, flg_cfrc_ext], outputs=[d.cfrc_int]
)
@wp.kernel
def _cfrc_backward(
# Model:
body_parentid: wp.array(dtype=int),
# Data in:
cfrc_int_in: wp.array2d(dtype=wp.spatial_vector),
# In:
body_tree_: wp.array(dtype=int),
# Data out:
cfrc_int_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, nodeid = wp.tid()
bodyid = body_tree_[nodeid]
pid = body_parentid[bodyid]
if bodyid != 0:
wp.atomic_add(cfrc_int_out[worldid], pid, cfrc_int_in[worldid, bodyid])
def _rne_cfrc_backward(m: Model, d: Data):
for body_tree in reversed(m.body_tree):
wp.launch(
_cfrc_backward, dim=[d.nworld, body_tree.size], inputs=[m.body_parentid, d.cfrc_int, body_tree], outputs=[d.cfrc_int]
)
@wp.kernel
def _qfrc_bias(
# Model:
dof_bodyid: wp.array(dtype=int),
# Data in:
cdof_in: wp.array2d(dtype=wp.spatial_vector),
cfrc_int_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
qfrc_bias_out: wp.array2d(dtype=float),
):
worldid, dofid = wp.tid()
bodyid = dof_bodyid[dofid]
qfrc_bias_out[worldid, dofid] = wp.dot(cdof_in[worldid, dofid], cfrc_int_in[worldid, bodyid])
[docs]
@event_scope
def rne(m: Model, d: Data, flg_acc: bool = False):
"""Computes inverse dynamics using the recursive Newton-Euler algorithm.
Computes the bias forces (`qfrc_bias`) and internal forces (`cfrc_int`) for the current state,
including the effects of gravity and optionally joint accelerations.
Args:
m: The model containing kinematic and dynamic information.
d: The data object containing the current state and output arrays.
flg_acc: If True, includes joint accelerations in the computation.
"""
_rne_cacc_world(m, d)
_rne_cacc_forward(m, d, flg_acc=flg_acc)
_rne_cfrc(m, d)
_rne_cfrc_backward(m, d)
wp.launch(_qfrc_bias, dim=[d.nworld, m.nv], inputs=[m.dof_bodyid, d.cdof, d.cfrc_int], outputs=[d.qfrc_bias])
@wp.kernel
def _cfrc_ext(
# Model:
body_rootid: wp.array(dtype=int),
# Data in:
xfrc_applied_in: wp.array2d(dtype=wp.spatial_vector),
xipos_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
# Data out:
cfrc_ext_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, bodyid = wp.tid()
if bodyid == 0:
cfrc_ext_out[worldid, 0] = wp.spatial_vector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
else:
xfrc_applied = xfrc_applied_in[worldid, bodyid]
subtree_com = subtree_com_in[worldid, body_rootid[bodyid]]
xipos = xipos_in[worldid, bodyid]
cfrc_ext_out[worldid, bodyid] = support.transform_force(xfrc_applied, subtree_com - xipos)
@wp.kernel
def _count_equality_constraints(
# Model:
eq_type: wp.array(dtype=int),
# Data in:
ne_in: wp.array(dtype=int),
efc_type_in: wp.array2d(dtype=int),
efc_id_in: wp.array2d(dtype=int),
# Out:
ne_connect_out: wp.array(dtype=int),
ne_weld_out: wp.array(dtype=int),
):
"""Counts connect and weld equality constraints from efc data."""
worldid, efcid = wp.tid()
# Only process rows within the equality constraint range
if efcid >= ne_in[worldid]:
return
# Get the equality constraint ID and its type
eq_id = efc_id_in[worldid, efcid]
eq_constraint_type = eq_type[eq_id]
# Count by type (each connect has 3 rows, each weld has 6 rows)
if eq_constraint_type == EqType.CONNECT:
wp.atomic_add(ne_connect_out, worldid, 1)
elif eq_constraint_type == EqType.WELD:
wp.atomic_add(ne_weld_out, worldid, 1)
@wp.kernel
def _cfrc_ext_equality(
# Model:
body_rootid: wp.array(dtype=int),
site_bodyid: wp.array(dtype=int),
site_pos: wp.array2d(dtype=wp.vec3),
eq_obj1id: wp.array(dtype=int),
eq_obj2id: wp.array(dtype=int),
eq_objtype: wp.array(dtype=int),
eq_data: wp.array2d(dtype=vec11),
# Data in:
xpos_in: wp.array2d(dtype=wp.vec3),
xmat_in: wp.array2d(dtype=wp.mat33),
subtree_com_in: wp.array2d(dtype=wp.vec3),
efc_id_in: wp.array2d(dtype=int),
efc_force_in: wp.array2d(dtype=float),
# In:
ne_connect_in: wp.array(dtype=int),
ne_weld_in: wp.array(dtype=int),
# Data out:
cfrc_ext_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, eqid = wp.tid()
ne_connect = ne_connect_in[worldid]
ne_weld = ne_weld_in[worldid]
num_connect = ne_connect // 3
if eqid >= num_connect + ne_weld // 6:
return
is_connect = eqid < num_connect
if is_connect:
efcid = 3 * eqid
cfrc_torque = wp.vec3(0.0, 0.0, 0.0) # no torque from connect
else:
efcid = 6 * eqid - ne_connect
cfrc_torque = wp.vec3(efc_force_in[worldid, efcid + 3], efc_force_in[worldid, efcid + 4], efc_force_in[worldid, efcid + 5])
cfrc_force = wp.vec3(
efc_force_in[worldid, efcid + 0],
efc_force_in[worldid, efcid + 1],
efc_force_in[worldid, efcid + 2],
)
id = efc_id_in[worldid, efcid]
eq_data_ = eq_data[worldid, id]
body_semantic = eq_objtype[id] == ObjType.BODY
obj1 = eq_obj1id[id]
obj2 = eq_obj2id[id]
if body_semantic:
bodyid1 = obj1
bodyid2 = obj2
else:
bodyid1 = site_bodyid[obj1]
bodyid2 = site_bodyid[obj2]
# body 1
if bodyid1:
if body_semantic:
if is_connect:
offset = wp.vec3(eq_data_[0], eq_data_[1], eq_data_[2])
else:
offset = wp.vec3(eq_data_[3], eq_data_[4], eq_data_[5])
else:
offset = site_pos[worldid, obj1]
# transform point on body1: local -> global
pos = xmat_in[worldid, bodyid1] @ offset + xpos_in[worldid, bodyid1]
# subtree CoM-based torque_force vector
newpos = subtree_com_in[worldid, body_rootid[bodyid1]]
dif = newpos - pos
cfrc_com = wp.spatial_vector(cfrc_torque - wp.cross(dif, cfrc_force), cfrc_force)
# apply (opposite for body 1)
wp.atomic_add(cfrc_ext_out[worldid], bodyid1, cfrc_com)
# body 2
if bodyid2:
if body_semantic:
if is_connect:
offset = wp.vec3(eq_data_[3], eq_data_[4], eq_data_[5])
else:
offset = wp.vec3(eq_data_[0], eq_data_[1], eq_data_[2])
else:
offset = site_pos[worldid, obj2]
# transform point on body2: local -> global
pos = xmat_in[worldid, bodyid2] @ offset + xpos_in[worldid, bodyid2]
# subtree CoM-based torque_force vector
newpos = subtree_com_in[worldid, body_rootid[bodyid2]]
dif = newpos - pos
cfrc_com = wp.spatial_vector(cfrc_torque - wp.cross(dif, cfrc_force), cfrc_force)
# apply
wp.atomic_sub(cfrc_ext_out[worldid], bodyid2, cfrc_com)
@wp.func
def transform_force(force: wp.vec3, torque: wp.vec3, offset: wp.vec3) -> wp.spatial_vector:
torque -= wp.cross(offset, force)
return wp.spatial_vector(torque, force)
@wp.kernel
def _cfrc_ext_contact(
# Model:
opt_cone: int,
body_rootid: wp.array(dtype=int),
geom_bodyid: wp.array(dtype=int),
# Data in:
subtree_com_in: wp.array2d(dtype=wp.vec3),
contact_pos_in: wp.array(dtype=wp.vec3),
contact_frame_in: wp.array(dtype=wp.mat33),
contact_friction_in: wp.array(dtype=vec5),
contact_dim_in: wp.array(dtype=int),
contact_geom_in: wp.array(dtype=wp.vec2i),
contact_efc_address_in: wp.array2d(dtype=int),
contact_worldid_in: wp.array(dtype=int),
efc_force_in: wp.array2d(dtype=float),
njmax_in: int,
nacon_in: wp.array(dtype=int),
# Data out:
cfrc_ext_out: wp.array2d(dtype=wp.spatial_vector),
):
contactid = wp.tid()
if contactid >= nacon_in[0]:
return
geom = contact_geom_in[contactid]
id1 = geom_bodyid[geom[0]]
id2 = geom_bodyid[geom[1]]
if id1 == 0 and id2 == 0:
return
worldid = contact_worldid_in[contactid]
# contact force in world frame
force = support.contact_force_fn(
opt_cone,
contact_frame_in,
contact_friction_in,
contact_dim_in,
contact_efc_address_in,
efc_force_in,
njmax_in,
nacon_in,
worldid,
contactid,
to_world_frame=True,
)
pos = contact_pos_in[contactid]
# contact force on bodies
if id1:
com1 = subtree_com_in[worldid, body_rootid[id1]]
wp.atomic_sub(cfrc_ext_out[worldid], id1, support.transform_force(force, com1 - pos))
if id2:
com2 = subtree_com_in[worldid, body_rootid[id2]]
wp.atomic_add(cfrc_ext_out[worldid], id2, support.transform_force(force, com2 - pos))
[docs]
@event_scope
def rne_postconstraint(m: Model, d: Data):
"""Computes the recursive Newton-Euler algorithm after constraints are applied.
Computes `cacc`, `cfrc_ext`, and `cfrc_int`, including the effects of applied forces, equality
constraints, and contacts.
"""
# cfrc_ext = perturb
wp.launch(
_cfrc_ext,
dim=(d.nworld, m.nbody),
inputs=[m.body_rootid, d.xfrc_applied, d.xipos, d.subtree_com],
outputs=[d.cfrc_ext],
)
# Equality constraint forces - only if model has equality constraints
if m.neq > 0:
# Allocate inline counters and count from efc data
ne_connect = wp.zeros((d.nworld,), dtype=int)
ne_weld = wp.zeros((d.nworld,), dtype=int)
wp.launch(
_count_equality_constraints,
dim=(d.nworld, d.njmax), # TODO(team): launch over max equality constraints
inputs=[m.eq_type, d.ne, d.efc.type, d.efc.id],
outputs=[ne_connect, ne_weld],
)
wp.launch(
_cfrc_ext_equality,
dim=(d.nworld, m.neq),
inputs=[
m.body_rootid,
m.site_bodyid,
m.site_pos,
m.eq_obj1id,
m.eq_obj2id,
m.eq_objtype,
m.eq_data,
d.xpos,
d.xmat,
d.subtree_com,
d.efc.id,
d.efc.force,
ne_connect,
ne_weld,
],
outputs=[d.cfrc_ext],
)
# cfrc_ext += contacts
wp.launch(
_cfrc_ext_contact,
dim=(d.naconmax,),
inputs=[
m.opt.cone,
m.body_rootid,
m.geom_bodyid,
d.subtree_com,
d.contact.pos,
d.contact.frame,
d.contact.friction,
d.contact.dim,
d.contact.geom,
d.contact.efc_address,
d.contact.worldid,
d.efc.force,
d.njmax,
d.nacon,
],
outputs=[d.cfrc_ext],
)
# forward pass over bodies: compute cacc, cfrc_int
_rne_cacc_world(m, d)
_rne_cacc_forward(m, d, flg_acc=True)
# cfrc_body = cinert * cacc + cvel x (cinert * cvel)
_rne_cfrc(m, d, flg_cfrc_ext=True)
# backward pass over bodies: accumulate cfrc_int from children
_rne_cfrc_backward(m, d)
@wp.kernel
def _tendon_dot(
# Model:
nv: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
jnt_type: wp.array(dtype=int),
jnt_dofadr: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
dof_jntid: wp.array(dtype=int),
site_bodyid: wp.array(dtype=int),
tendon_adr: wp.array(dtype=int),
tendon_num: wp.array(dtype=int),
tendon_armature: wp.array2d(dtype=float),
wrap_type: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
wrap_prm: wp.array(dtype=float),
# Data in:
site_xpos_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
cvel_in: wp.array2d(dtype=wp.spatial_vector),
cdof_dot_in: wp.array2d(dtype=wp.spatial_vector),
# Out:
ten_Jdot_out: wp.array3d(dtype=float),
):
worldid, tenid = wp.tid()
armature = tendon_armature[worldid, tenid]
if armature == 0.0:
return
# fixed tendon has zero Jdot
adr = tendon_adr[tenid]
if wrap_type[adr] == WrapType.JOINT:
return
# process spatial tendon
divisor = float(1.0)
num = tendon_num[tenid]
j = int(0)
while j < num - 1:
# get 1st and 2nd object
type0 = wrap_type[adr + j + 0]
type1 = wrap_type[adr + j + 1]
id0 = wrap_objid[adr + j + 0]
id1 = wrap_objid[adr + j + 1]
# pulley
pulley = WrapType.PULLEY
if (type0 == pulley) or (type1 == pulley):
# get divisor
if type0 == pulley:
divisor = wrap_prm[adr + j]
j += 1
continue
# init sequence; assume it start with site
wpnt0 = site_xpos_in[worldid, id0]
bodyid0 = site_bodyid[id0]
pos0 = site_xpos_in[worldid, id0]
cvel0 = cvel_in[worldid, bodyid0]
subtree_com0 = subtree_com_in[worldid, body_rootid[bodyid0]]
dif0 = pos0 - subtree_com0
wvel0 = wp.spatial_bottom(cvel0) - wp.cross(dif0, wp.spatial_top(cvel0))
wbody0 = site_bodyid[id0]
# second object is geom: process site-geom-site
if (type1 == WrapType.SPHERE) or (type1 == WrapType.CYLINDER):
# TODO(team): derivatives of util_misc.wrap
return
# complete sequence
wbody1 = site_bodyid[id1]
wpnt1 = site_xpos_in[worldid, id1]
bodyid1 = site_bodyid[id1]
pos1 = site_xpos_in[worldid, id1]
cvel1 = cvel_in[worldid, bodyid1]
subtree_com1 = subtree_com_in[worldid, body_rootid[bodyid1]]
dif1 = pos1 - subtree_com1
wvel1 = wp.spatial_bottom(cvel1) - wp.cross(dif1, wp.spatial_top(cvel1))
# accumulate moments if consecutive points are in different bodies
if wbody0 != wbody1:
# dpnt = 3D position difference, normalize
dpnt, norm = math.normalize_with_norm(wpnt1 - wpnt0)
# dvel = d / dt (dpnt)
dvel = wvel1 - wvel0
dot = wp.dot(dpnt, dvel)
dvel += dpnt * (-dot)
if norm > MJ_MINVAL:
dvel /= norm
else:
dvel = wp.vec3(0.0)
# get endpoint Jacobian time derivatives, subtract
# TODO(team): parallelize?
for i in range(nv):
jac1, _ = support.jac_dot_dof(
body_parentid,
body_rootid,
jnt_type,
jnt_dofadr,
dof_bodyid,
dof_jntid,
subtree_com_in,
cdof_in,
cvel_in,
cdof_dot_in,
wpnt0,
wbody0,
i,
worldid,
)
jac2, _ = support.jac_dot_dof(
body_parentid,
body_rootid,
jnt_type,
jnt_dofadr,
dof_bodyid,
dof_jntid,
subtree_com_in,
cdof_in,
cvel_in,
cdof_dot_in,
wpnt1,
wbody1,
i,
worldid,
)
jacdif = jac2 - jac1
# chain rule, first term: Jdot += d / dt (jac2 - jac1) * dpnt
Jdot = wp.dot(jacdif, dpnt)
# get endpoint Jacobians, subtract
jac1, _ = support.jac_dof(
body_parentid,
body_rootid,
dof_bodyid,
subtree_com_in,
cdof_in,
wpnt0,
wbody0,
i,
worldid,
)
jac2, _ = support.jac_dof(
body_parentid,
body_rootid,
dof_bodyid,
subtree_com_in,
cdof_in,
wpnt1,
wbody1,
i,
worldid,
)
jacdif = jac2 - jac1
# chain rule, second term: Jdot += (jac2 - jac1) * d / dt (dpnt)
Jdot += wp.dot(jacdif, dvel)
ten_Jdot_out[worldid, tenid, i] += math.safe_div(Jdot, divisor)
# TODO(team): j += 2 if geom wrapping
j += 1
@wp.kernel
def _tendon_bias_coef(
# Model:
tendon_armature: wp.array2d(dtype=float),
# Data in:
qvel_in: wp.array2d(dtype=float),
# In:
ten_Jdot_in: wp.array3d(dtype=float),
# Out:
ten_bias_coef_out: wp.array2d(dtype=float),
):
worldid, tenid, dofid = wp.tid()
armature = tendon_armature[worldid, tenid]
if armature == 0.0:
return
ten_Jdot = ten_Jdot_in[worldid, tenid, dofid]
if ten_Jdot == 0.0:
return
wp.atomic_add(ten_bias_coef_out[worldid], tenid, ten_Jdot * qvel_in[worldid, dofid])
@wp.kernel
def _tendon_bias_qfrc(
# Model:
tendon_armature: wp.array2d(dtype=float),
# Data in:
ten_J_in: wp.array3d(dtype=float),
# In:
ten_bias_coef_in: wp.array2d(dtype=float),
# Out:
qfrc_out: wp.array2d(dtype=float),
):
worldid, tenid, dofid = wp.tid()
armature = tendon_armature[worldid, tenid]
if armature == 0.0:
return
ten_J = ten_J_in[worldid, tenid, dofid]
if ten_J == 0.0:
return
wp.atomic_add(qfrc_out[worldid], dofid, ten_J * armature * ten_bias_coef_in[worldid, tenid])
@event_scope
def tendon_bias(m: Model, d: Data, qfrc: wp.array2d(dtype=float)):
"""Add bias force due to tendon armature.
Args:
m: The model containing kinematic and dynamic information.
d: The data object containing the current state and output arrays.
qfrc: Force.
"""
# time derivative of tendon Jacobian
ten_Jdot = wp.zeros((d.nworld, m.ntendon, m.nv), dtype=float)
wp.launch(
_tendon_dot,
dim=(d.nworld, m.ntendon),
inputs=[
m.nv,
m.body_parentid,
m.body_rootid,
m.jnt_type,
m.jnt_dofadr,
m.dof_bodyid,
m.dof_jntid,
m.site_bodyid,
m.tendon_adr,
m.tendon_num,
m.tendon_armature,
m.wrap_type,
m.wrap_objid,
m.wrap_prm,
d.site_xpos,
d.subtree_com,
d.cdof,
d.cvel,
d.cdof_dot,
],
outputs=[ten_Jdot],
)
# tendon bias force coefficients
ten_bias_coef = wp.zeros((d.nworld, m.ntendon), dtype=float)
wp.launch(
_tendon_bias_coef,
dim=(d.nworld, m.ntendon, m.nv),
inputs=[m.tendon_armature, d.qvel, ten_Jdot],
outputs=[ten_bias_coef],
)
wp.launch(
_tendon_bias_qfrc,
dim=(d.nworld, m.ntendon, m.nv),
inputs=[m.tendon_armature, d.ten_J, ten_bias_coef],
outputs=[qfrc],
)
@wp.kernel
def _comvel_root(cvel_out: wp.array2d(dtype=wp.spatial_vector)):
worldid, elementid = wp.tid()
cvel_out[worldid, 0][elementid] = 0.0
@wp.kernel
def _comvel_branch(
# Model:
body_parentid: wp.array(dtype=int),
body_jntnum: wp.array(dtype=int),
body_jntadr: wp.array(dtype=int),
body_dofadr: wp.array(dtype=int),
jnt_type: wp.array(dtype=int),
body_branches: wp.array(dtype=int),
body_branch_start: wp.array(dtype=int),
# Data in:
qvel_in: wp.array2d(dtype=float),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
cvel_out: wp.array2d(dtype=wp.spatial_vector),
cdof_dot_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, branchid = wp.tid()
start = body_branch_start[branchid]
end = body_branch_start[branchid + 1]
qvel = qvel_in[worldid]
cdof = cdof_in[worldid]
for i in range(start, end):
bodyid = body_branches[i]
pid = body_parentid[bodyid]
cvel = cvel_out[worldid, pid]
dofid = body_dofadr[bodyid]
jntid = body_jntadr[bodyid]
jntnum = body_jntnum[bodyid]
if jntnum == 0:
cvel_out[worldid, bodyid] = cvel
continue
for j in range(jntid, jntid + jntnum):
jnttype = jnt_type[j]
if jnttype == JointType.FREE:
cvel += cdof[dofid + 0] * qvel[dofid + 0]
cvel += cdof[dofid + 1] * qvel[dofid + 1]
cvel += cdof[dofid + 2] * qvel[dofid + 2]
cdof_dot_out[worldid, dofid + 3] = math.motion_cross(cvel, cdof[dofid + 3])
cdof_dot_out[worldid, dofid + 4] = math.motion_cross(cvel, cdof[dofid + 4])
cdof_dot_out[worldid, dofid + 5] = math.motion_cross(cvel, cdof[dofid + 5])
cvel += cdof[dofid + 3] * qvel[dofid + 3]
cvel += cdof[dofid + 4] * qvel[dofid + 4]
cvel += cdof[dofid + 5] * qvel[dofid + 5]
dofid += 6
elif jnttype == JointType.BALL:
cdof_dot_out[worldid, dofid + 0] = math.motion_cross(cvel, cdof[dofid + 0])
cdof_dot_out[worldid, dofid + 1] = math.motion_cross(cvel, cdof[dofid + 1])
cdof_dot_out[worldid, dofid + 2] = math.motion_cross(cvel, cdof[dofid + 2])
cvel += cdof[dofid + 0] * qvel[dofid + 0]
cvel += cdof[dofid + 1] * qvel[dofid + 1]
cvel += cdof[dofid + 2] * qvel[dofid + 2]
dofid += 3
else:
cdof_dot_out[worldid, dofid] = math.motion_cross(cvel, cdof[dofid])
cvel += cdof[dofid] * qvel[dofid]
dofid += 1
cvel_out[worldid, bodyid] = cvel
[docs]
@event_scope
def com_vel(m: Model, d: Data):
"""Computes the spatial velocities (cvel) and the derivative `cdof_dot` for all bodies.
Propagates velocities down the kinematic tree, updating the spatial velocity and
derivative for each body.
"""
wp.launch(_comvel_root, dim=(d.nworld, 6), inputs=[], outputs=[d.cvel])
wp.launch(
_comvel_branch,
dim=(d.nworld, m.nbranch),
inputs=[
m.body_parentid,
m.body_jntnum,
m.body_jntadr,
m.body_dofadr,
m.jnt_type,
m.body_branches,
m.body_branch_start,
d.qvel,
d.cdof,
],
outputs=[d.cvel, d.cdof_dot],
)
@wp.kernel
def _transmission(
# Model:
nv: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
body_weldid: wp.array(dtype=int),
body_dofnum: wp.array(dtype=int),
body_dofadr: wp.array(dtype=int),
jnt_type: wp.array(dtype=int),
jnt_qposadr: wp.array(dtype=int),
jnt_dofadr: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
dof_parentid: wp.array(dtype=int),
site_bodyid: wp.array(dtype=int),
site_quat: wp.array2d(dtype=wp.quat),
tendon_adr: wp.array(dtype=int),
tendon_num: wp.array(dtype=int),
wrap_type: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
actuator_trntype: wp.array(dtype=int),
actuator_trnid: wp.array(dtype=wp.vec2i),
actuator_gear: wp.array2d(dtype=wp.spatial_vector),
actuator_cranklength: wp.array(dtype=float),
# Data in:
qpos_in: wp.array2d(dtype=float),
xquat_in: wp.array2d(dtype=wp.quat),
site_xpos_in: wp.array2d(dtype=wp.vec3),
site_xmat_in: wp.array2d(dtype=wp.mat33),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
ten_J_in: wp.array3d(dtype=float),
ten_length_in: wp.array2d(dtype=float),
# Data out:
actuator_length_out: wp.array2d(dtype=float),
actuator_moment_out: wp.array3d(dtype=float),
):
worldid, actid = wp.tid()
trntype = actuator_trntype[actid]
actuator_gear_id = worldid % actuator_gear.shape[0]
gear = actuator_gear[actuator_gear_id, actid]
if trntype == TrnType.JOINT or trntype == TrnType.JOINTINPARENT:
qpos = qpos_in[worldid]
jntid = actuator_trnid[actid][0]
jnt_typ = jnt_type[jntid]
qadr = jnt_qposadr[jntid]
vadr = jnt_dofadr[jntid]
if jnt_typ == JointType.FREE:
actuator_length_out[worldid, actid] = 0.0
if trntype == TrnType.JOINTINPARENT:
quat = wp.normalize(wp.quat(qpos[qadr + 3], qpos[qadr + 4], qpos[qadr + 5], qpos[qadr + 6]))
quat_neg = math.quat_inv(quat)
gearaxis = math.rot_vec_quat(wp.spatial_bottom(gear), quat_neg)
actuator_moment_out[worldid, actid, vadr + 0] = gear[0]
actuator_moment_out[worldid, actid, vadr + 1] = gear[1]
actuator_moment_out[worldid, actid, vadr + 2] = gear[2]
actuator_moment_out[worldid, actid, vadr + 3] = gearaxis[0]
actuator_moment_out[worldid, actid, vadr + 4] = gearaxis[1]
actuator_moment_out[worldid, actid, vadr + 5] = gearaxis[2]
else:
for i in range(6):
actuator_moment_out[worldid, actid, vadr + i] = gear[i]
elif jnt_typ == JointType.BALL:
q = wp.quat(qpos[qadr + 0], qpos[qadr + 1], qpos[qadr + 2], qpos[qadr + 3])
q = wp.normalize(q)
axis_angle = math.quat_to_vel(q)
gearaxis = wp.spatial_top(gear) # [:3]
if trntype == TrnType.JOINTINPARENT:
quat_neg = math.quat_inv(q)
gearaxis = math.rot_vec_quat(gearaxis, quat_neg)
actuator_length_out[worldid, actid] = wp.dot(axis_angle, gearaxis)
for i in range(3):
actuator_moment_out[worldid, actid, vadr + i] = gearaxis[i]
elif jnt_typ == JointType.SLIDE or jnt_typ == JointType.HINGE:
actuator_length_out[worldid, actid] = qpos[qadr] * gear[0]
actuator_moment_out[worldid, actid, vadr] = gear[0]
else:
wp.printf("unrecognized joint type")
elif trntype == TrnType.SLIDERCRANK:
# get data
trnid = actuator_trnid[actid]
id = trnid[0]
idslider = trnid[1]
gear0 = gear[0]
rod = actuator_cranklength[actid]
site_xmat = site_xmat_in[worldid, idslider]
axis = wp.vec3(site_xmat[0, 2], site_xmat[1, 2], site_xmat[2, 2])
site_xpos_id = site_xpos_in[worldid, id]
site_xpos_idslider = site_xpos_in[worldid, idslider]
vec = site_xpos_id - site_xpos_idslider
# compute length and determinant
# length = a' * v - sqrt(det); det = (a' * v)^2 + r^2 - v' * v
av = wp.dot(vec, axis)
det = av * av + rod * rod - wp.dot(vec, vec)
ok = 1
if det <= 0.0:
ok = 0
sdet = 0.0
length = av
else:
sdet = wp.sqrt(det)
length = av - sdet
actuator_length_out[worldid, actid] = length * gear0
# compute derivatives of length w.r.t. vec and axis
if ok == 1:
scale = 1.0 - math.safe_div(av, sdet)
dldv = axis * scale + math.safe_div(vec, sdet)
dlda = vec * scale
else:
dldv = axis
dlda = vec
# apply chain rule
# TODO(team): parallelize?
for i in range(nv):
# get Jacobians of axis(jacA) and vec(jac)
# mj_jacPointAxis
jacp, jacr = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_xpos_idslider, site_bodyid[idslider], i, worldid
)
jacS = jacp
jacA = wp.cross(jacr, axis)
# mj_jacSite
jac, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_xpos_id, site_bodyid[id], i, worldid
)
jac -= jacS
# apply the chain rule
moment = wp.dot(dlda, jacA) + wp.dot(dldv, jac)
actuator_moment_out[worldid, actid, i] = moment * gear0
elif trntype == TrnType.TENDON:
tenid = actuator_trnid[actid][0]
gear0 = gear[0]
actuator_length_out[worldid, actid] = ten_length_in[worldid, tenid] * gear0
# fixed
adr = tendon_adr[tenid]
if wrap_type[adr] == WrapType.JOINT:
ten_num = tendon_num[tenid]
for i in range(ten_num):
dofadr = jnt_dofadr[wrap_objid[adr + i]]
actuator_moment_out[worldid, actid, dofadr] = ten_J_in[worldid, tenid, dofadr] * gear0
else: # spatial
for dofadr in range(nv):
actuator_moment_out[worldid, actid, dofadr] = ten_J_in[worldid, tenid, dofadr] * gear0
elif trntype == TrnType.BODY:
# cannot compute meaningful length, set to zero
actuator_length_out[worldid, actid] = 0.0
# initialize moment
for i in range(nv):
actuator_moment_out[worldid, actid, i] = 0.0
# moment computed by _transmission_body_moment and _transmission_body_moment_scale
elif trntype == TrnType.SITE:
trnid = actuator_trnid[actid]
siteid = trnid[0]
refid = trnid[1]
gear = actuator_gear[worldid, actid]
site_quat_id = worldid % site_quat.shape[0]
gear_translation = wp.spatial_top(gear)
gear_rotational = wp.spatial_bottom(gear)
# reference site undefined
if refid == -1:
# wrench: gear expressed in global frame
site_xmat = site_xmat_in[worldid, siteid]
wrench_translation = site_xmat @ gear_translation
wrench_rotation = site_xmat @ gear_rotational
# moment: global Jacobian projected on wrench
# TODO(team): parallelize
for i in range(nv):
jacp, jacr = support.jac_dof(
body_parentid,
body_rootid,
dof_bodyid,
subtree_com_in,
cdof_in,
site_xpos_in[worldid, siteid],
site_bodyid[siteid],
i,
worldid,
)
actuator_length_out[worldid, actid] = 0.0
actuator_moment_out[worldid, actid, i] = wp.dot(jacp, wrench_translation) + wp.dot(jacr, wrench_rotation)
# reference site defined
else:
# initialize last dof address for each body
bodyid = site_bodyid[siteid]
bodyrefid = site_bodyid[refid]
b0 = body_weldid[bodyid]
b1 = body_weldid[bodyrefid]
dofadr0 = body_dofadr[b0] + body_dofnum[b0] - 1
dofadr1 = body_dofadr[b1] + body_dofnum[b1] - 1
# find common ancestral dof, if any
dofadr_common = -1
if dofadr0 >= 0 and dofadr1 >= 0:
# traverse up the tree until common ancestral dof is found
while dofadr0 != dofadr1:
if dofadr0 < dofadr1:
dofadr1 = dof_parentid[dofadr1]
else:
dofadr0 = dof_parentid[dofadr0]
if dofadr0 == -1 or dofadr1 == -1:
# reached tree root, no common ancestral dof
break
# found common ancestral dof
if dofadr0 == dofadr1:
dofadr_common = dofadr0
translational_transmission = not (gear[0] == 0.0 and gear[1] == 0.0 and gear[2] == 0.0)
rotational_transmission = not (gear[3] == 0.0 and gear[4] == 0.0 and gear[5] == 0.0)
site_xpos = site_xpos_in[worldid, siteid]
ref_xpos = site_xpos_in[worldid, refid]
ref_xmat = site_xmat_in[worldid, refid]
length = float(0.0)
if translational_transmission:
# vec: site position in reference site frame
vec = wp.transpose(ref_xmat) @ (site_xpos - ref_xpos)
length += wp.dot(vec, gear_translation)
wrench_translation = ref_xmat @ gear_translation
if rotational_transmission:
# get site and refsite quats from parent bodies (avoid converting matrix to quat)
quat = math.mul_quat(site_quat[site_quat_id, siteid], xquat_in[worldid, bodyid])
refquat = math.mul_quat(site_quat[site_quat_id, refid], xquat_in[worldid, bodyrefid])
# convert difference to expmap (axis-angle)
vec = math.quat_sub(quat, refquat)
length += wp.dot(vec, gear_rotational)
wrench_rotation = ref_xmat @ gear_rotational
actuator_length_out[worldid, actid] = length
# TODO(team): parallelize
for i in range(nv):
jacp, jacr = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_xpos, site_bodyid[siteid], i, worldid
)
# jacref: global Jacobian of reference site
jacpref, jacrref = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, ref_xpos, site_bodyid[refid], i, worldid
)
jacpdif = jacp - jacpref
jacrdif = jacr - jacrref
# if common ancestral dof was found, clear the columns of its parental chain
da = dofadr_common
while da >= 0:
if da == i:
jacpdif = wp.vec3(0.0)
jacrdif = wp.vec3(0.0)
break
da = dof_parentid[da]
# moment: global Jacobian projected on wrench
moment = float(0.0)
if translational_transmission:
moment += wp.dot(jacpdif, wrench_translation)
if rotational_transmission:
moment += wp.dot(jacrdif, wrench_rotation)
actuator_moment_out[worldid, actid, i] = moment
else:
wp.printf("unhandled transmission type %d\n", trntype)
@wp.kernel
def _transmission_body_moment(
# Model:
opt_cone: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
geom_bodyid: wp.array(dtype=int),
actuator_trnid: wp.array(dtype=wp.vec2i),
actuator_trntype_body_adr: wp.array(dtype=int),
# Data in:
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
contact_dist_in: wp.array(dtype=float),
contact_pos_in: wp.array(dtype=wp.vec3),
contact_frame_in: wp.array(dtype=wp.mat33),
contact_includemargin_in: wp.array(dtype=float),
contact_dim_in: wp.array(dtype=int),
contact_geom_in: wp.array(dtype=wp.vec2i),
contact_efc_address_in: wp.array2d(dtype=int),
contact_worldid_in: wp.array(dtype=int),
efc_J_in: wp.array3d(dtype=float),
nacon_in: wp.array(dtype=int),
# Data out:
actuator_moment_out: wp.array3d(dtype=float),
# Out:
actuator_trntype_body_ncon_out: wp.array2d(dtype=int),
):
trnbodyid, conid, dofid = wp.tid()
actid = actuator_trntype_body_adr[trnbodyid]
bodyid = actuator_trnid[actid][0]
if conid >= nacon_in[0]:
return
worldid = contact_worldid_in[conid]
# get geom ids
geom = contact_geom_in[conid]
g1 = geom[0]
g2 = geom[1]
# contact involving flex, continue
if g1 < 0 or g2 < 0:
return
# get body ids
b1 = geom_bodyid[g1]
b2 = geom_bodyid[g2]
# irrelevant contact, continue
if b1 != bodyid and b2 != bodyid:
return
contact_exclude = int(contact_dist_in[conid] >= contact_includemargin_in[conid])
if dofid == 0:
wp.atomic_add(actuator_trntype_body_ncon_out[worldid], trnbodyid, 1)
# mark contact normals in efc_force
if contact_exclude == 0:
contact_dim = contact_dim_in[conid]
contact_efc_address = contact_efc_address_in[conid]
if contact_dim == 1 or opt_cone == ConeType.ELLIPTIC:
efc_force = 1.0
efcid0 = contact_efc_address[0]
wp.atomic_add(actuator_moment_out[worldid, actid], dofid, efc_J_in[worldid, efcid0, dofid] * efc_force)
else:
npyramid = contact_dim - 1 # number of frictional directions
efc_force = 0.5 / float(npyramid)
for j in range(2 * npyramid):
efcid = contact_efc_address[j]
wp.atomic_add(actuator_moment_out[worldid, actid], dofid, efc_J_in[worldid, efcid, dofid] * efc_force)
# excluded contact in gap: get Jacobian, accumulate
elif contact_exclude == 1:
contact_pos = contact_pos_in[conid]
contact_frame = contact_frame_in[conid]
normal = wp.vec3(contact_frame[0, 0], contact_frame[0, 1], contact_frame[0, 2])
# get Jacobian difference
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, contact_pos, b1, dofid, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, contact_pos, b2, dofid, worldid)
jacdif = jacp2 - jacp1
# project Jacobian along the normal of the contact frame
wp.atomic_add(actuator_moment_out[worldid, actid], dofid, wp.dot(normal, jacdif))
@wp.kernel
def _transmission_body_moment_scale(
# Model:
actuator_trntype_body_adr: wp.array(dtype=int),
# In:
actuator_trntype_body_ncon_in: wp.array2d(dtype=int),
# Data out:
actuator_moment_out: wp.array3d(dtype=float),
):
worldid, trnbodyid, dofid = wp.tid()
ncon = actuator_trntype_body_ncon_in[worldid, trnbodyid]
if ncon > 0:
actid = actuator_trntype_body_adr[trnbodyid]
actuator_moment_out[worldid, actid, dofid] /= -float(ncon)
[docs]
@event_scope
def transmission(m: Model, d: Data):
"""Computes actuator/transmission lengths and moments.
Updates the actuator length and moments for all actuators in the model, including joint
and tendon transmissions.
"""
d.actuator_moment.zero_()
wp.launch(
_transmission,
dim=[d.nworld, m.nu],
inputs=[
m.nv,
m.body_parentid,
m.body_rootid,
m.body_weldid,
m.body_dofnum,
m.body_dofadr,
m.jnt_type,
m.jnt_qposadr,
m.jnt_dofadr,
m.dof_bodyid,
m.dof_parentid,
m.site_bodyid,
m.site_quat,
m.tendon_adr,
m.tendon_num,
m.wrap_type,
m.wrap_objid,
m.actuator_trntype,
m.actuator_trnid,
m.actuator_gear,
m.actuator_cranklength,
d.qpos,
d.xquat,
d.site_xpos,
d.site_xmat,
d.subtree_com,
d.cdof,
d.ten_J,
d.ten_length,
],
outputs=[d.actuator_length, d.actuator_moment],
)
if m.nacttrnbody:
# compute moments
ncon = wp.zeros((d.nworld, m.nacttrnbody), dtype=int)
wp.launch(
_transmission_body_moment,
dim=(m.nacttrnbody, d.naconmax, m.nv),
inputs=[
m.opt.cone,
m.body_parentid,
m.body_rootid,
m.dof_bodyid,
m.geom_bodyid,
m.actuator_trnid,
m.actuator_trntype_body_adr,
d.subtree_com,
d.cdof,
d.contact.dist,
d.contact.pos,
d.contact.frame,
d.contact.includemargin,
d.contact.dim,
d.contact.geom,
d.contact.efc_address,
d.contact.worldid,
d.efc.J,
d.nacon,
],
outputs=[d.actuator_moment, ncon],
)
# scale moments
wp.launch(
_transmission_body_moment_scale,
dim=(d.nworld, m.nacttrnbody, m.nv),
inputs=[m.actuator_trntype_body_adr, ncon],
outputs=[d.actuator_moment],
)
@wp.kernel
def _solve_LD_sparse_x_acc_up(
# In:
L: wp.array3d(dtype=float),
qLD_updates_: wp.array(dtype=wp.vec3i),
# Out:
x: wp.array2d(dtype=float),
):
worldid, nodeid = wp.tid()
update = qLD_updates_[nodeid]
i, k, Madr_ki = update[0], update[1], update[2]
wp.atomic_sub(x[worldid], i, L[worldid, 0, Madr_ki] * x[worldid, k])
@wp.kernel
def _solve_LD_sparse_qLDiag_mul(
# In:
D: wp.array2d(dtype=float),
# Out:
out: wp.array2d(dtype=float),
):
worldid, dofid = wp.tid()
out[worldid, dofid] *= D[worldid, dofid]
@wp.kernel
def _solve_LD_sparse_x_acc_down(
# In:
L: wp.array3d(dtype=float),
qLD_updates_: wp.array(dtype=wp.vec3i),
# Out:
x: wp.array2d(dtype=float),
):
worldid, nodeid = wp.tid()
update = qLD_updates_[nodeid]
i, k, Madr_ki = update[0], update[1], update[2]
wp.atomic_sub(x[worldid], k, L[worldid, 0, Madr_ki] * x[worldid, i])
def _solve_LD_sparse(
m: Model,
d: Data,
L: wp.array3d(dtype=float),
D: wp.array2d(dtype=float),
x: wp.array2d(dtype=float),
y: wp.array2d(dtype=float),
):
"""Computes sparse backsubstitution: x = inv(L'*D*L)*y."""
wp.copy(x, y)
for qLD_updates in reversed(m.qLD_updates):
wp.launch(_solve_LD_sparse_x_acc_up, dim=(d.nworld, qLD_updates.size), inputs=[L, qLD_updates], outputs=[x])
wp.launch(_solve_LD_sparse_qLDiag_mul, dim=(d.nworld, m.nv), inputs=[D], outputs=[x])
for qLD_updates in m.qLD_updates:
wp.launch(_solve_LD_sparse_x_acc_down, dim=(d.nworld, qLD_updates.size), inputs=[L, qLD_updates], outputs=[x])
@cache_kernel
def _tile_cholesky_solve(tile: TileSet):
"""Returns a kernel for dense Cholesky backsubstitution of a tile."""
@wp.kernel(module="unique", enable_backward=False)
def cholesky_solve(
# In:
L: wp.array3d(dtype=float),
y: wp.array2d(dtype=float),
adr: wp.array(dtype=int),
# Out:
x: wp.array2d(dtype=float),
):
worldid, nodeid = wp.tid()
TILE_SIZE = wp.static(tile.size)
dofid = adr[nodeid]
y_slice = wp.tile_load(y[worldid], shape=(TILE_SIZE,), offset=(dofid,))
L_tile = wp.tile_load(L[worldid], shape=(TILE_SIZE, TILE_SIZE), offset=(dofid, dofid))
x_slice = wp.tile_cholesky_solve(L_tile, y_slice)
wp.tile_store(x[worldid], x_slice, offset=(dofid,))
return cholesky_solve
def _solve_LD_dense(m: Model, d: Data, L: wp.array3d(dtype=float), x: wp.array2d(dtype=float), y: wp.array2d(dtype=float)):
"""Computes dense backsubstitution: x = inv(L'*L)*y."""
for tile in m.qM_tiles:
wp.launch_tiled(
_tile_cholesky_solve(tile),
dim=(d.nworld, tile.adr.size),
inputs=[L, y, tile.adr],
outputs=[x],
block_dim=m.block_dim.cholesky_solve,
)
def solve_LD(
m: Model,
d: Data,
L: wp.array3d(dtype=float),
D: wp.array2d(dtype=float),
x: wp.array2d(dtype=float),
y: wp.array2d(dtype=float),
):
"""Computes backsubstitution to solve a linear system of the form x = inv(L'*D*L) * y.
L and D are the factors from the Cholesky factorization of the inertia matrix.
This function dispatches to either a sparse or dense solver depending on Model options.
Args:
m: The model containing factorization and sparsity information.
d: The data object containing workspace and factorization results.
L: Lower-triangular factor from the factorization (sparse or dense).
D: Diagonal factor from the factorization (only used for sparse).
x: Output array for the solution.
y: Input right-hand side array.
"""
if m.is_sparse:
_solve_LD_sparse(m, d, L, D, x, y)
else:
_solve_LD_dense(m, d, L, x, y)
[docs]
@event_scope
def solve_m(m: Model, d: Data, x: wp.array2d(dtype=float), y: wp.array2d(dtype=float)):
"""Computes backsubstitution: x = qLD * y.
Args:
m: The model containing inertia and factorization information.
d: The data object containing factorization results.
x: Output array for the solution.
y: Input right-hand side array.
"""
solve_LD(m, d, d.qLD, d.qLDiagInv, x, y)
@cache_kernel
def _tile_cholesky_factorize_solve(tile: TileSet):
"""Returns a kernel for dense Cholesky factorization and backsubstitution of a tile."""
@wp.kernel(module="unique", enable_backward=False)
def cholesky_factorize_solve(
# In:
M: wp.array3d(dtype=float),
y: wp.array2d(dtype=float),
adr: wp.array(dtype=int),
# Out:
x: wp.array2d(dtype=float),
L: wp.array3d(dtype=float),
):
worldid, nodeid = wp.tid()
TILE_SIZE = wp.static(tile.size)
dofid = adr[nodeid]
M_tile = wp.tile_load(M[worldid], shape=(TILE_SIZE, TILE_SIZE), offset=(dofid, dofid))
y_slice = wp.tile_load(y[worldid], shape=(TILE_SIZE,), offset=(dofid,))
L_tile = wp.tile_cholesky(M_tile)
wp.tile_store(L[worldid], L_tile, offset=(dofid, dofid))
x_slice = wp.tile_cholesky_solve(L_tile, y_slice)
wp.tile_store(x[worldid], x_slice, offset=(dofid,))
return cholesky_factorize_solve
def _factor_solve_i_dense(
m: Model,
d: Data,
M: wp.array3d(dtype=float),
x: wp.array2d(dtype=float),
y: wp.array2d(dtype=float),
L: wp.array3d(dtype=float),
):
for tile in m.qM_tiles:
wp.launch_tiled(
_tile_cholesky_factorize_solve(tile),
dim=(d.nworld, tile.adr.size),
inputs=[M, y, tile.adr],
outputs=[x, L],
block_dim=m.block_dim.cholesky_factorize_solve,
)
def factor_solve_i(m, d, M, L, D, x, y):
"""Factorizes and solves the linear system: x = inv(L'*D*L) * y or x = inv(L'*L) * y.
M is an inertia-like matrix and L, D are its Cholesky-like factors.
This function first factorizes the matrix M (sparse or dense depending on model options),
then solves the system for x given right-hand side y.
Args:
m: The model containing factorization and sparsity information.
d: The data object containing workspace and factorization results.
M: The inertia-like matrix to factorize.
L: Output lower-triangular factor from the factorization (sparse or dense).
D: Output diagonal factor from the factorization (only used for sparse).
x: Output array for the solution.
y: Input right-hand side array.
"""
if m.is_sparse:
_factor_i_sparse(m, d, M, L, D)
_solve_LD_sparse(m, d, L, D, x, y)
else:
_factor_solve_i_dense(m, d, M, x, y, L)
@wp.kernel
def _subtree_vel_forward(
# Model:
body_rootid: wp.array(dtype=int),
body_mass: wp.array2d(dtype=float),
body_inertia: wp.array2d(dtype=wp.vec3),
# Data in:
xipos_in: wp.array2d(dtype=wp.vec3),
ximat_in: wp.array2d(dtype=wp.mat33),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cvel_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
subtree_linvel_out: wp.array2d(dtype=wp.vec3),
subtree_angmom_out: wp.array2d(dtype=wp.vec3),
# Out:
subtree_bodyvel_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, bodyid = wp.tid()
body_mass_id = worldid % body_mass.shape[0]
body_inertia_id = worldid % body_inertia.shape[0]
cvel = cvel_in[worldid, bodyid]
ang = wp.spatial_top(cvel)
lin = wp.spatial_bottom(cvel)
xipos = xipos_in[worldid, bodyid]
ximat = ximat_in[worldid, bodyid]
subtree_com_root = subtree_com_in[worldid, body_rootid[bodyid]]
# update linear velocity
lin -= wp.cross(xipos - subtree_com_root, ang)
subtree_linvel_out[worldid, bodyid] = body_mass[body_mass_id, bodyid] * lin
dv = wp.transpose(ximat) @ ang
dv[0] *= body_inertia[body_inertia_id, bodyid][0]
dv[1] *= body_inertia[body_inertia_id, bodyid][1]
dv[2] *= body_inertia[body_inertia_id, bodyid][2]
subtree_angmom_out[worldid, bodyid] = ximat @ dv
subtree_bodyvel_out[worldid, bodyid] = wp.spatial_vector(ang, lin)
@wp.kernel
def _linear_momentum(
# Model:
body_parentid: wp.array(dtype=int),
body_subtreemass: wp.array2d(dtype=float),
# Data in:
subtree_linvel_in: wp.array2d(dtype=wp.vec3),
# In:
body_tree_: wp.array(dtype=int),
# Data out:
subtree_linvel_out: wp.array2d(dtype=wp.vec3),
):
worldid, nodeid = wp.tid()
bodyid = body_tree_[nodeid]
if bodyid:
pid = body_parentid[bodyid]
wp.atomic_add(subtree_linvel_out[worldid], pid, subtree_linvel_in[worldid, bodyid])
subtree_linvel_out[worldid, bodyid] /= wp.max(MJ_MINVAL, body_subtreemass[worldid % body_subtreemass.shape[0], bodyid])
@wp.kernel
def _angular_momentum(
# Model:
body_parentid: wp.array(dtype=int),
body_mass: wp.array2d(dtype=float),
body_subtreemass: wp.array2d(dtype=float),
# Data in:
xipos_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
subtree_linvel_in: wp.array2d(dtype=wp.vec3),
# In:
subtree_bodyvel_in: wp.array2d(dtype=wp.spatial_vector),
body_tree_: wp.array(dtype=int),
# Data out:
subtree_angmom_out: wp.array2d(dtype=wp.vec3),
):
worldid, nodeid = wp.tid()
bodyid = body_tree_[nodeid]
if bodyid == 0:
return
pid = body_parentid[bodyid]
xipos = xipos_in[worldid, bodyid]
com = subtree_com_in[worldid, bodyid]
com_parent = subtree_com_in[worldid, pid]
vel = subtree_bodyvel_in[worldid, bodyid]
linvel = subtree_linvel_in[worldid, bodyid]
linvel_parent = subtree_linvel_in[worldid, pid] # Data field
mass = body_mass[worldid % body_mass.shape[0], bodyid]
subtreemass = body_subtreemass[worldid % body_subtreemass.shape[0], bodyid]
# momentum wrt body i
dx = xipos - com
dv = wp.spatial_bottom(vel) - linvel
dp = dv * mass
dL = wp.cross(dx, dp)
# add to subtree i
subtree_angmom_out[worldid, bodyid] += dL
# add to parent
wp.atomic_add(subtree_angmom_out[worldid], pid, subtree_angmom_out[worldid, bodyid])
# momentum wrt parent
dx = com - com_parent
dv = linvel - linvel_parent
dv *= subtreemass
dL = wp.cross(dx, dv)
wp.atomic_add(subtree_angmom_out[worldid], pid, dL)
[docs]
def subtree_vel(m: Model, d: Data):
"""Computes subtree linear velocity and angular momentum.
Computes the linear momentum and angular momentum for each subtree, accumulating
contributions up the kinematic tree.
"""
subtree_bodyvel = wp.empty((d.nworld, m.nbody), dtype=wp.spatial_vector)
# bodywise quantities
wp.launch(
_subtree_vel_forward,
dim=(d.nworld, m.nbody),
inputs=[m.body_rootid, m.body_mass, m.body_inertia, d.xipos, d.ximat, d.subtree_com, d.cvel],
outputs=[d.subtree_linvel, d.subtree_angmom, subtree_bodyvel],
)
# sum body linear momentum recursively up the kinematic tree
for body_tree in reversed(m.body_tree):
wp.launch(
_linear_momentum,
dim=[d.nworld, body_tree.size],
inputs=[m.body_parentid, m.body_subtreemass, d.subtree_linvel, body_tree],
outputs=[d.subtree_linvel],
)
for body_tree in reversed(m.body_tree):
wp.launch(
_angular_momentum,
dim=[d.nworld, body_tree.size],
inputs=[
m.body_parentid,
m.body_mass,
m.body_subtreemass,
d.xipos,
d.subtree_com,
d.subtree_linvel,
subtree_bodyvel,
body_tree,
],
outputs=[d.subtree_angmom],
)
@wp.kernel
def _joint_tendon(
# Model:
jnt_qposadr: wp.array(dtype=int),
jnt_dofadr: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
wrap_prm: wp.array(dtype=float),
tendon_jnt_adr: wp.array(dtype=int),
wrap_jnt_adr: wp.array(dtype=int),
# Data in:
qpos_in: wp.array2d(dtype=float),
# Data out:
ten_J_out: wp.array3d(dtype=float),
ten_length_out: wp.array2d(dtype=float),
):
worldid, wrapid = wp.tid()
tendon_jnt_adr_ = tendon_jnt_adr[wrapid]
wrap_jnt_adr_ = wrap_jnt_adr[wrapid]
wrap_objid_ = wrap_objid[wrap_jnt_adr_]
prm = wrap_prm[wrap_jnt_adr_]
# add to length
L = prm * qpos_in[worldid, jnt_qposadr[wrap_objid_]]
# TODO(team): compare atomic_add and for loop
wp.atomic_add(ten_length_out[worldid], tendon_jnt_adr_, L)
# add to moment
ten_J_out[worldid, tendon_jnt_adr_, jnt_dofadr[wrap_objid_]] = prm
@wp.kernel
def _spatial_site_tendon(
# Model:
nv: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
site_bodyid: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
tendon_site_pair_adr: wp.array(dtype=int),
wrap_site_pair_adr: wp.array(dtype=int),
wrap_pulley_scale: wp.array(dtype=float),
# Data in:
site_xpos_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
ten_J_out: wp.array3d(dtype=float),
ten_length_out: wp.array2d(dtype=float),
):
worldid, elementid = wp.tid()
# site pairs
site_pair_adr = wrap_site_pair_adr[elementid]
ten_adr = tendon_site_pair_adr[elementid]
# pulley scaling
pulley_scale = wrap_pulley_scale[site_pair_adr]
id0 = wrap_objid[site_pair_adr + 0]
id1 = wrap_objid[site_pair_adr + 1]
pnt0 = site_xpos_in[worldid, id0]
pnt1 = site_xpos_in[worldid, id1]
dif = pnt1 - pnt0
vec, length = math.normalize_with_norm(dif)
wp.atomic_add(ten_length_out[worldid], ten_adr, length * pulley_scale)
if length < MJ_MINVAL:
vec = wp.vec3(1.0, 0.0, 0.0)
body0 = site_bodyid[id0]
body1 = site_bodyid[id1]
if body0 != body1:
# TODO(team): parallelize
for i in range(nv):
jacp1, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pnt0, body0, i, worldid)
jacp2, _ = support.jac_dof(body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, pnt1, body1, i, worldid)
J = wp.dot(jacp2 - jacp1, vec)
if J:
wp.atomic_add(ten_J_out[worldid, ten_adr], i, J * pulley_scale)
@wp.kernel
def _spatial_geom_tendon(
# Model:
nv: int,
body_parentid: wp.array(dtype=int),
body_rootid: wp.array(dtype=int),
dof_bodyid: wp.array(dtype=int),
geom_bodyid: wp.array(dtype=int),
geom_size: wp.array2d(dtype=wp.vec3),
site_bodyid: wp.array(dtype=int),
wrap_type: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
wrap_prm: wp.array(dtype=float),
tendon_geom_adr: wp.array(dtype=int),
wrap_geom_adr: wp.array(dtype=int),
wrap_pulley_scale: wp.array(dtype=float),
# Data in:
geom_xpos_in: wp.array2d(dtype=wp.vec3),
geom_xmat_in: wp.array2d(dtype=wp.mat33),
site_xpos_in: wp.array2d(dtype=wp.vec3),
subtree_com_in: wp.array2d(dtype=wp.vec3),
cdof_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
ten_J_out: wp.array3d(dtype=float),
ten_length_out: wp.array2d(dtype=float),
# Out:
wrap_geom_xpos_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid, elementid = wp.tid()
wrap_adr = wrap_geom_adr[elementid]
ten_adr = tendon_geom_adr[elementid]
# pulley scaling
pulley_scale = wrap_pulley_scale[wrap_adr]
# site-geom-site
wrap_objid_site0 = wrap_objid[wrap_adr - 1]
wrap_objid_geom = wrap_objid[wrap_adr + 0]
wrap_objid_site1 = wrap_objid[wrap_adr + 1]
# get site positions before and after geom
site_pnt0 = site_xpos_in[worldid, wrap_objid_site0]
site_pnt1 = site_xpos_in[worldid, wrap_objid_site1]
# get geom information
geom_xpos = geom_xpos_in[worldid, wrap_objid_geom]
geom_xmat = geom_xmat_in[worldid, wrap_objid_geom]
geomsize = geom_size[worldid, wrap_objid_geom][0]
geom_type = wrap_type[wrap_adr]
# get body ids for site-geom-site instances
bodyid_site0 = site_bodyid[wrap_objid_site0]
bodyid_geom = geom_bodyid[wrap_objid_geom]
bodyid_site1 = site_bodyid[wrap_objid_site1]
# find wrap object sidesite (if it exists)
sideid = int(wp.round(wrap_prm[wrap_adr]))
if sideid >= 0:
side = site_xpos_in[worldid, sideid]
else:
side = wp.vec3(MJ_MAXVAL)
# compute geom wrap length and connect points (if wrap occurs)
length_geomgeom, geom_pnt0, geom_pnt1 = util_misc.wrap(site_pnt0, site_pnt1, geom_xpos, geom_xmat, geomsize, geom_type, side)
# store geom points
wrap_geom_xpos_out[worldid, elementid] = wp.spatial_vector(geom_pnt0, geom_pnt1)
if length_geomgeom >= 0.0:
dif_sitegeom = geom_pnt0 - site_pnt0
dif_geomsite = site_pnt1 - geom_pnt1
vec_sitegeom, length_sitegeom = math.normalize_with_norm(dif_sitegeom)
vec_geomsite, length_geomsite = math.normalize_with_norm(dif_geomsite)
# length
length_sitegeomsite = length_sitegeom + length_geomgeom + length_geomsite
if length_sitegeomsite:
wp.atomic_add(ten_length_out[worldid], ten_adr, length_sitegeomsite * pulley_scale)
# moment
if length_sitegeom < MJ_MINVAL:
vec_sitegeom = wp.vec3(1.0, 0.0, 0.0)
if length_geomsite < MJ_MINVAL:
vec_geomsite = wp.vec3(1.0, 0.0, 0.0)
dif_body_sitegeom = bodyid_site0 != bodyid_geom
dif_body_geomsite = bodyid_geom != bodyid_site1
# TODO(team): parallelize
for i in range(nv):
J = float(0.0)
# site-geom
if dif_body_sitegeom:
jacp_site0, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_pnt0, bodyid_site0, i, worldid
)
jacp_geom0, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, geom_pnt0, bodyid_geom, i, worldid
)
J += wp.dot(jacp_geom0 - jacp_site0, vec_sitegeom)
# geom-site
if dif_body_geomsite:
jacp_geom1, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, geom_pnt1, bodyid_geom, i, worldid
)
jacp_site1, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_pnt1, bodyid_site1, i, worldid
)
J += wp.dot(jacp_site1 - jacp_geom1, vec_geomsite)
if J:
wp.atomic_add(ten_J_out[worldid, ten_adr], i, J * pulley_scale)
else:
dif_sitesite = site_pnt1 - site_pnt0
vec_sitesite, length_sitesite = math.normalize_with_norm(dif_sitesite)
# length
if length_sitesite:
wp.atomic_add(ten_length_out[worldid], ten_adr, length_sitesite * pulley_scale)
# moment
if length_sitesite < MJ_MINVAL:
vec_sitesite = wp.vec3(1.0, 0.0, 0.0)
if bodyid_site0 != bodyid_site1:
# TODO(team): parallelize
for i in range(nv):
jacp1, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_pnt0, bodyid_site0, i, worldid
)
jacp2, _ = support.jac_dof(
body_parentid, body_rootid, dof_bodyid, subtree_com_in, cdof_in, site_pnt1, bodyid_site1, i, worldid
)
J = wp.dot(jacp2 - jacp1, vec_sitesite)
if J:
wp.atomic_add(ten_J_out[worldid, ten_adr], i, J * pulley_scale)
@wp.kernel
def _spatial_tendon_wrap(
# Model:
ntendon: int,
tendon_adr: wp.array(dtype=int),
tendon_num: wp.array(dtype=int),
wrap_type: wp.array(dtype=int),
wrap_objid: wp.array(dtype=int),
# Data in:
site_xpos_in: wp.array2d(dtype=wp.vec3),
# In:
wrap_geom_xpos_in: wp.array2d(dtype=wp.spatial_vector),
# Data out:
ten_wrapadr_out: wp.array2d(dtype=int),
ten_wrapnum_out: wp.array2d(dtype=int),
wrap_obj_out: wp.array2d(dtype=wp.vec2i),
wrap_xpos_out: wp.array2d(dtype=wp.spatial_vector),
):
worldid = wp.tid()
wrapcount = int(0)
wrapgeomid = int(0)
for i in range(ntendon):
adr = tendon_adr[i]
ten_wrapadr_out[worldid, i] = wrapcount
wrapnum = int(0)
tendonnum = tendon_num[i]
# process fixed tendon
if wrap_type[adr] == WrapType.JOINT:
continue
# process spatial tendon
j = int(0)
while j < tendonnum - 1:
# get 1st and 2nd object
type0 = wrap_type[adr + j + 0]
type1 = wrap_type[adr + j + 1]
id0 = wrap_objid[adr + j + 0]
id1 = wrap_objid[adr + j + 1]
# pulley
pulley0 = type0 == WrapType.PULLEY
if pulley0 or type1 == WrapType.PULLEY:
if pulley0:
row = wrapcount // 2
col = wrapcount % 2
wrap_xpos_out[worldid, row][3 * col + 0] = 0.0
wrap_xpos_out[worldid, row][3 * col + 1] = 0.0
wrap_xpos_out[worldid, row][3 * col + 2] = 0.0
wrap_obj_out[worldid, row][col] = -2
wrapnum += 1
wrapcount += 1
# move to next
j += 1
continue
# init sequence; assume it starts with site
wpnt_site0 = site_xpos_in[worldid, id0]
# second object is geom: process site-geom-site
if type1 == WrapType.SPHERE or type1 == WrapType.CYLINDER:
wrap_geom_xpos = wrap_geom_xpos_in[worldid, wrapgeomid]
wpnt_geom0 = wp.spatial_top(wrap_geom_xpos)
wrapgeomid += 1
wrapid = id1
id1 = wrap_objid[adr + j + 2]
if wp.norm_l2(wpnt_geom0) < MJ_MAXVAL:
wpnt_geom1 = wp.spatial_bottom(wrap_geom_xpos)
wpnt_site1 = site_xpos_in[worldid, id1]
# assign to wrap
row0 = (wrapcount + 0) // 2
col0 = (wrapcount + 0) % 2
row1 = (wrapcount + 1) // 2
col1 = (wrapcount + 1) % 2
row2 = (wrapcount + 2) // 2
col2 = (wrapcount + 2) % 2
row3 = (wrapcount + 3) // 2
col3 = (wrapcount + 3) % 2
wrap_xpos_out[worldid, row0][3 * col0 + 0] = wpnt_site0[0]
wrap_xpos_out[worldid, row0][3 * col0 + 1] = wpnt_site0[1]
wrap_xpos_out[worldid, row0][3 * col0 + 2] = wpnt_site0[2]
wrap_xpos_out[worldid, row1][3 * col1 + 0] = wpnt_geom0[0]
wrap_xpos_out[worldid, row1][3 * col1 + 1] = wpnt_geom0[1]
wrap_xpos_out[worldid, row1][3 * col1 + 2] = wpnt_geom0[2]
wrap_xpos_out[worldid, row2][3 * col2 + 0] = wpnt_geom1[0]
wrap_xpos_out[worldid, row2][3 * col2 + 1] = wpnt_geom1[1]
wrap_xpos_out[worldid, row2][3 * col2 + 2] = wpnt_geom1[2]
wrap_xpos_out[worldid, row3][3 * col3 + 0] = wpnt_site1[0]
wrap_xpos_out[worldid, row3][3 * col3 + 1] = wpnt_site1[1]
wrap_xpos_out[worldid, row3][3 * col3 + 2] = wpnt_site1[2]
wrap_obj_out[worldid, row0][col0] = -1
wrap_obj_out[worldid, row1][col1] = wrapid
wrap_obj_out[worldid, row2][col2] = wrapid
wrapnum += 3
wrapcount += 3
j += 2
else:
row0 = (wrapcount + 0) // 2
col0 = (wrapcount + 0) % 2
wrap_xpos_out[worldid, row0][3 * col0 + 0] = wpnt_site0[0]
wrap_xpos_out[worldid, row0][3 * col0 + 1] = wpnt_site0[1]
wrap_xpos_out[worldid, row0][3 * col0 + 2] = wpnt_site0[2]
wrap_obj_out[worldid, row0][col0] = -1
wrapnum += 1
wrapcount += 1
j += 2
else:
row0 = (wrapcount + 0) // 2
col0 = (wrapcount + 0) % 2
wrap_xpos_out[worldid, row0][3 * col0 + 0] = wpnt_site0[0]
wrap_xpos_out[worldid, row0][3 * col0 + 1] = wpnt_site0[1]
wrap_xpos_out[worldid, row0][3 * col0 + 2] = wpnt_site0[2]
wrap_obj_out[worldid, row0][col0] = -1
wrapnum += 1
wrapcount += 1
j += 1
# assign last site before pulley or tendon end
if adr + j + 1 < wrap_type.shape[0]:
last_before_pulley = wrap_type[adr + j + 1] == WrapType.PULLEY
else:
last_before_pulley = False
if j == tendonnum - 1 or last_before_pulley:
row0 = (wrapcount + 0) // 2
col0 = (wrapcount + 0) % 2
wpnt_site1 = site_xpos_in[worldid, id1]
wrap_xpos_out[worldid, row0][3 * col0 + 0] = wpnt_site1[0]
wrap_xpos_out[worldid, row0][3 * col0 + 1] = wpnt_site1[1]
wrap_xpos_out[worldid, row0][3 * col0 + 2] = wpnt_site1[2]
wrap_obj_out[worldid, row0][col0] = -1
wrapnum += 1
wrapcount += 1
ten_wrapnum_out[worldid, i] = wrapnum
[docs]
def tendon(m: Model, d: Data):
"""Computes tendon lengths and moments.
Updates the tendon length and moment arrays for all tendons in the model, including joint,
site, and geom tendons.
"""
if not m.ntendon:
return
d.ten_length.zero_()
d.ten_J.zero_()
# Cartesian 3D points fro geom wrap points
wrap_geom_xpos = wp.empty((d.nworld, m.nwrap), dtype=wp.spatial_vector)
# process joint tendons
wp.launch(
_joint_tendon,
dim=(d.nworld, m.wrap_jnt_adr.size),
inputs=[m.jnt_qposadr, m.jnt_dofadr, m.wrap_objid, m.wrap_prm, m.tendon_jnt_adr, m.wrap_jnt_adr, d.qpos],
outputs=[d.ten_J, d.ten_length],
)
spatial_site = m.wrap_site_pair_adr.size > 0
spatial_geom = m.wrap_geom_adr.size > 0
if spatial_site or spatial_geom:
d.wrap_xpos.zero_()
d.wrap_obj.zero_()
# process spatial site tendons
wp.launch(
_spatial_site_tendon,
dim=(d.nworld, m.wrap_site_pair_adr.size),
inputs=[
m.nv,
m.body_parentid,
m.body_rootid,
m.dof_bodyid,
m.site_bodyid,
m.wrap_objid,
m.tendon_site_pair_adr,
m.wrap_site_pair_adr,
m.wrap_pulley_scale,
d.site_xpos,
d.subtree_com,
d.cdof,
],
outputs=[d.ten_J, d.ten_length],
)
# process spatial geom tendons
wp.launch(
_spatial_geom_tendon,
dim=(d.nworld, m.wrap_geom_adr.size),
inputs=[
m.nv,
m.body_parentid,
m.body_rootid,
m.dof_bodyid,
m.geom_bodyid,
m.geom_size,
m.site_bodyid,
m.wrap_type,
m.wrap_objid,
m.wrap_prm,
m.tendon_geom_adr,
m.wrap_geom_adr,
m.wrap_pulley_scale,
d.geom_xpos,
d.geom_xmat,
d.site_xpos,
d.subtree_com,
d.cdof,
],
outputs=[d.ten_J, d.ten_length, wrap_geom_xpos],
)
if spatial_site or spatial_geom:
wp.launch(
_spatial_tendon_wrap,
dim=d.nworld,
inputs=[m.ntendon, m.tendon_adr, m.tendon_num, m.wrap_type, m.wrap_objid, d.site_xpos, wrap_geom_xpos],
outputs=[d.ten_wrapadr, d.ten_wrapnum, d.wrap_obj, d.wrap_xpos],
)