2024-11-20 15:23:03 -08:00
import sys
import os
import asyncio
import meshctrl
import requests
import random
import io
2025-01-08 13:23:21 -08:00
import traceback
import time
2024-11-20 15:23:03 -08:00
thisdir = os . path . dirname ( os . path . realpath ( __file__ ) )
async def test_admin ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session , \
meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session :
2024-11-20 15:23:03 -08:00
admin_users = await admin_session . list_users ( timeout = 10 )
print ( " \n info list_users: {} \n " . format ( admin_users ) )
try :
no_users = await privileged_session . list_users ( timeout = 10 )
except * meshctrl . exceptions . ServerError as e :
assert e . exceptions [ 0 ] . args [ 0 ] == " Access denied "
else :
assert len ( no_users . keys ( ) ) == 0 , " non-admin has admin acess "
admin_sessions = await admin_session . list_user_sessions ( timeout = 10 )
print ( " \n info list_user_sessions: {} \n " . format ( admin_sessions ) )
try :
no_sessions = await privileged_session . list_user_sessions ( timeout = 10 )
except * meshctrl . exceptions . ServerError as e :
assert e . exceptions [ 0 ] . args [ 0 ] == " Access denied "
else :
assert len ( no_sessions . keys ( ) ) == 0 , " non-admin has admin acess "
assert len ( admin_users ) == len ( env . users . keys ( ) ) , " Admin cannot see correct number of users "
2024-12-12 16:06:18 -08:00
assert len ( admin_sessions ) == 2 , " Admin cannot see correct number of user sessions "
2024-11-20 15:23:03 -08:00
2024-12-10 12:49:50 -08:00
async def test_auto_reconnect ( env ) :
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True , auto_reconnect = True ) as admin_session :
env . restart_mesh ( )
await asyncio . sleep ( 10 )
await admin_session . ping ( timeout = 10 )
# As above, but with proxy
async with meshctrl . Session ( " wss:// " + env . dockerurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True , auto_reconnect = True , proxy = env . proxyurl ) as admin_session :
2024-12-12 16:06:18 -08:00
2024-12-10 12:49:50 -08:00
env . restart_mesh ( )
for i in range ( 3 ) :
try :
await admin_session . ping ( timeout = 10 )
2025-02-17 12:54:01 -08:00
except * Exception as e :
print ( " " . join ( traceback . format_exception ( e ) ) )
pass
else :
break
2024-12-10 12:49:50 -08:00
else :
raise Exception ( " Failed to reconnect " )
env . restart_proxy ( )
for i in range ( 3 ) :
try :
await admin_session . ping ( timeout = 10 )
except * Exception as e :
2025-02-17 12:54:01 -08:00
print ( " " . join ( traceback . format_exception ( e ) ) )
2024-12-10 12:49:50 -08:00
pass
else :
break
else :
raise Exception ( " Failed to reconnect " )
2024-11-20 15:23:03 -08:00
async def test_users ( env ) :
try :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl [ 3 : ] , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session :
2024-11-20 15:23:03 -08:00
pass
except * ValueError :
pass
else :
raise Exception ( " Connected with bad URL " )
try :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , ignore_ssl = True ) as admin_session :
2024-11-20 15:23:03 -08:00
pass
except * meshctrl . exceptions . MeshCtrlError :
pass
else :
raise Exception ( " Connected with no password " )
2025-01-08 13:23:21 -08:00
start = time . time ( )
try :
async with meshctrl . Session ( env . mcurl , user = " admin " , password = " The wrong password " , ignore_ssl = True ) as admin_session :
pass
except * meshctrl . exceptions . ServerError as eg :
assert str ( eg . exceptions [ 0 ] ) == " Invalid Auth " or eg . exceptions [ 0 ] . message == " Invalid Auth " , " Didn ' t get invalid auth message "
assert time . time ( ) - start < 10 , " Invalid auth wasn ' t raised until after timeout "
pass
else :
raise Exception ( " Connected with bad password " )
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl + " / " , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session , \
meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session , \
meshctrl . Session ( env . mcurl , user = " unprivileged " , password = env . users [ " unprivileged " ] , ignore_ssl = True ) as unprivileged_session :
2024-11-20 15:23:03 -08:00
assert len ( await admin_session . list_users ( timeout = 10 ) ) == 3 , " Wrong number of users "
assert await admin_session . add_user ( " test " , " test " , email = " test@email.com " , timeout = 10 ) , " Failed to create user "
assert await admin_session . add_user ( " test2 " , randompass = True , email = " test2@email.com " , emailverified = True , resetpass = True , realname = " test2 " , phone = " 555555555 " , rights = 0 , timeout = 10 ) , " Failed to create user "
try :
await unprivileged_session . add_user ( " nope " , " nope " , timeout = 10 )
except :
pass
else :
raise Exception ( " Unprivileged user created a user " )
assert len ( await admin_session . list_users ( timeout = 10 ) ) == 5 , " Wrong number of users "
assert await admin_session . edit_user ( " user//test " , email = " test@email.com " , emailverified = True , resetpass = True , realname = " test " , phone = " 555555555 " , rights = 0 , timeout = 10 ) , " Failed to edit user "
assert await admin_session . edit_user ( " user//test2 " , email = " test2@email.com " , emailverified = False , timeout = 10 ) , " Failed to edit user "
assert await admin_session . remove_user ( " user//test " , timeout = 10 ) , " Failed to remove user "
assert await admin_session . remove_user ( " user//test2 " , timeout = 10 ) , " Failed to remove user "
assert len ( await admin_session . list_users ( timeout = 10 ) ) == 3 , " Failed to remove user "
async def test_login_token ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " unprivileged " , password = env . users [ " unprivileged " ] , ignore_ssl = True ) as s :
2024-11-20 15:23:03 -08:00
token = await s . add_login_token ( " test " , expire = 1 , timeout = 10 )
print ( " \n info add_login_token: {} \n " . format ( token ) )
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = token [ " tokenUser " ] , password = token [ " tokenPass " ] , ignore_ssl = True ) as s2 :
2024-11-20 15:23:03 -08:00
assert ( await s2 . user_info ( ) ) [ " _id " ] == ( await s . user_info ( ) ) [ " _id " ] , " Login token logged into wrong account "
# Wait for the login token to expire
await asyncio . sleep ( 65 )
try :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = token [ " tokenUser " ] , password = token [ " tokenPass " ] , ignore_ssl = True ) as s2 :
2024-11-20 15:23:03 -08:00
pass
except :
pass
else :
raise Exception ( " User logged in with expired token! " )
token = await s . add_login_token ( " test2 " , timeout = 10 )
token2 = await s . add_login_token ( " test3 " , timeout = 10 )
print ( " \n info add_login_token_no_expire: {} \n " . format ( token ) )
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = token [ " tokenUser " ] , password = token [ " tokenPass " ] , ignore_ssl = True ) as s2 :
2024-11-20 15:23:03 -08:00
assert ( await s2 . user_info ( ) ) [ " _id " ] == ( await s . user_info ( ) ) [ " _id " ] , " Login token logged into wrong account "
r = await s . list_login_tokens ( timeout = 10 )
print ( " \n info list_login_tokens: {} \n " . format ( r ) )
assert len ( r ) == 2 , " Wrong number of tokens "
assert " tokenPass " not in r [ 0 ] , " Retrieved tokens include password "
r = await s . remove_login_token ( token [ " name " ] , timeout = 10 )
print ( " \n info remove_login_token: {} \n " . format ( r ) )
assert len ( await s . remove_login_token ( [ token2 [ " name " ] ] , timeout = 10 ) ) == 0 , " Residual login tokens "
async def test_mesh_device ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session , \
meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session , \
meshctrl . Session ( env . mcurl , user = " unprivileged " , password = env . users [ " unprivileged " ] , ignore_ssl = True ) as unprivileged_session :
2024-11-20 15:23:03 -08:00
# Test creating a mesh
mesh = await admin_session . add_device_group ( " test " , description = " This is a test group " , amtonly = False , features = 0 , consent = 0 , timeout = 10 )
print ( " \n info add_device_group: {} \n " . format ( mesh ) )
assert mesh . meshid , " Mesh failed to create "
assert len ( mesh . links . keys ( ) ) , " Mesh created without any users "
assert " user//admin " in mesh . links , " Mesh created with wrong user links "
# Used to move a device into later
mesh2 = await admin_session . add_device_group ( " test2 " , description = " This is a test2 group " , amtonly = False , features = 0 , consent = 0 , timeout = 10 )
# Test editing a device group
assert await admin_session . edit_device_group ( mesh . meshid , name = " new_test " , description = " New description " , flags = meshctrl . constants . MeshFeatures . all , consent = meshctrl . constants . ConsentFlags . all , invite_codes = True , timeout = 10 ) , " Failed to edit device group "
try :
await unprivileged_session . edit_device_group ( mesh . meshid , description = " New description2 " , timeout = 2 )
except * meshctrl . exceptions . ServerError as e :
assert e . exceptions [ 0 ] . args [ 0 ] == " Access denied "
# The server just ignores this command if you don't have permissions, so accept timeout as a good response.
except * asyncio . TimeoutError :
pass
else :
raise Exception ( " Unprivileged user could modify device group " )
# Test invite codes. Kinda.
assert await admin_session . edit_device_group ( mesh . meshid , invite_codes = [ " aoeu " , " asdf " ] , backgroundonly = True , interactiveonly = True , timeout = 10 ) , " Failed to edit device group "
# Test editing a group by name
assert await admin_session . edit_device_group ( " new_test " , isname = True , name = mesh . name , consent = meshctrl . constants . ConsentFlags . none , timeout = 10 ) , " Failed to edit device group by name "
# Test adding users to device group
r = await admin_session . add_users_to_device_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , mesh . meshid , rights = meshctrl . constants . MeshRights . fullrights , timeout = 5 )
print ( " \n info add_users_to_device_group: {} \n " . format ( r ) )
assert r [ ( await privileged_session . user_info ( ) ) [ " _id " ] ] [ " success " ] , " Failed to add user to group "
#Test adding users by device group name
await admin_session . add_users_to_device_group ( [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] , mesh2 . name , isname = True , rights = meshctrl . constants . MeshRights . fullrights , timeout = 5 )
await admin_session . remove_users_from_device_group ( [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] , mesh2 . name , isname = True , timeout = 10 )
# Test getting device groups for each user.
r = await admin_session . list_device_groups ( timeout = 10 )
print ( " \n info list_device_groups: {} \n " . format ( r ) )
assert len ( r ) == 2 , " Incorrect number of groups "
assert len ( await privileged_session . list_device_groups ( timeout = 10 ) ) == 1 , " Incorrect number of groups "
assert len ( await unprivileged_session . list_device_groups ( timeout = 10 ) ) == 0 , " Unprivileged account has access to group it should not "
assert r [ 0 ] . description == " New description " , " Description either failed to change, or was changed by a user without permission to do so "
2025-02-05 12:16:01 -08:00
# There once was a bug that occured whenever running run_commands with multiple meshes. We need to add devices to both meshes to be sure that bug is squashed.
2025-01-08 13:38:49 -08:00
with env . create_agent ( mesh . short_meshid ) as agent , \
2025-02-05 12:16:01 -08:00
env . create_agent ( mesh . short_meshid ) as agent2 , \
env . create_agent ( mesh2 . short_meshid ) as agent3 :
2024-11-20 15:23:03 -08:00
# Test agent added to device group being propagated correctly
# Create agent isn't so good at waiting for the agent to show in the sessions. Give it a couple seconds to appear.
for i in range ( 3 ) :
try :
r = await admin_session . list_devices ( timeout = 10 )
print ( " \n info list_devices: {} \n " . format ( r ) )
2025-02-05 12:16:01 -08:00
assert len ( r ) == 3 , " Incorrect number of agents connected "
2024-11-20 15:23:03 -08:00
except :
if i == 2 :
raise
await asyncio . sleep ( 1 )
else :
break
2025-01-08 13:38:49 -08:00
assert len ( await privileged_session . list_devices ( timeout = 10 ) ) == 2 , " Incorrect number of agents connected "
2024-11-20 15:23:03 -08:00
assert len ( await unprivileged_session . list_devices ( timeout = 10 ) ) == 0 , " Unprivileged account has access to agent it should not "
r = await admin_session . list_devices ( details = True , timeout = 10 )
print ( " \n info list_devices_details: {} \n " . format ( r ) )
r = await admin_session . list_devices ( group = mesh . name , timeout = 10 )
print ( " \n info list_devices_group: {} \n " . format ( r ) )
r = await admin_session . list_devices ( meshid = mesh . meshid , timeout = 10 )
print ( " \n info list_devices_meshid: {} \n " . format ( r ) )
2025-02-05 12:28:06 -08:00
r = await admin_session . device_info ( agent . nodeid , timeout = 10 )
print ( " \n info admin_device_info: {} \n " . format ( r ) )
2024-11-20 15:23:03 -08:00
# Test editing device info propagating correctly
assert await admin_session . edit_device ( agent . nodeid , name = " new_name " , description = " New Description " , tags = " device " , consent = meshctrl . constants . ConsentFlags . all , timeout = 10 ) , " Failed to edit device info "
assert ( await privileged_session . device_info ( agent . nodeid , timeout = 10 ) ) . name == " new_name " , " New name did not propagate to other sessions "
assert await admin_session . edit_device ( agent . nodeid , consent = meshctrl . constants . ConsentFlags . none , timeout = 10 ) , " Failed to edit device info "
# Test run_commands
2025-01-08 13:38:49 -08:00
r = await admin_session . run_command ( [ agent . nodeid , agent2 . nodeid ] , " ls " , timeout = 10 )
2024-11-20 15:23:03 -08:00
print ( " \n info run_command: {} \n " . format ( r ) )
assert " meshagent " in r [ agent . nodeid ] [ " result " ] , " ls gave incorrect data "
2025-01-08 13:38:49 -08:00
assert " meshagent " in r [ agent2 . nodeid ] [ " result " ] , " ls gave incorrect data "
assert " Run commands completed. " not in r [ agent . nodeid ] [ " result " ] , " Didn ' t parse run command ending correctly "
assert " Run commands completed. " not in r [ agent2 . nodeid ] [ " result " ] , " Didn ' t parse run command ending correctly "
2024-11-20 15:23:03 -08:00
assert " meshagent " in ( await privileged_session . run_command ( agent . nodeid , " ls " , timeout = 10 ) ) [ agent . nodeid ] [ " result " ] , " ls gave incorrect data "
2025-09-26 12:57:56 -07:00
# Test run_commands missing device
try :
await admin_session . run_command ( [ agent . nodeid , " notanid " ] , " ls " , timeout = 10 )
except * ( meshctrl . exceptions . ServerError , ValueError ) :
pass
else :
raise Exception ( " Run command on a device that doesn ' t exist did not raise an exception " )
2025-09-26 14:12:56 -07:00
r = await admin_session . run_console_command ( [ agent . nodeid , agent2 . nodeid ] , " info " , timeout = 10 )
print ( " \n info run_console_command: {} \n " . format ( r ) )
assert agent . nodeid in r [ agent . nodeid ] [ " result " ] , " Run console command gave bad response "
assert agent2 . nodeid in r [ agent2 . nodeid ] [ " result " ] , " Run console command gave bad response "
# Test run_commands missing device
try :
await admin_session . run_console_command ( [ agent . nodeid , " notanid " ] , " info " , timeout = 10 )
except * ( meshctrl . exceptions . ServerError , ValueError ) :
pass
else :
raise Exception ( " Run console command on a device that doesn ' t exist did not raise an exception " )
2025-09-24 10:36:42 -07:00
# Test run commands with individual device permissions
2024-11-20 15:23:03 -08:00
try :
await unprivileged_session . run_command ( agent . nodeid , " ls " , timeout = 10 )
except * ( meshctrl . exceptions . ServerError , ValueError ) :
pass
else :
raise Exception ( " Unprivileged user has access to device it should not " )
try :
await unprivileged_session . device_info ( agent . nodeid , timeout = 10 )
except * ValueError :
pass
else :
raise Exception ( " Unprivileged user has access to device it should not " )
2025-09-24 10:36:42 -07:00
assert ( await admin_session . add_users_to_device ( ( await unprivileged_session . user_info ( ) ) [ " _id " ] , agent . nodeid , meshctrl . constants . DeviceRights . norights ) ) , " Failed to add user to device "
2024-11-20 15:23:03 -08:00
try :
await unprivileged_session . run_command ( agent . nodeid , " ls " , ignore_output = True , timeout = 10 )
except * meshctrl . exceptions . ServerError :
pass
else :
raise Exception ( " Unprivileged user has access to device it should not " )
# Test getting individual device info
r = await unprivileged_session . device_info ( agent . nodeid , timeout = 10 )
2025-02-05 12:28:06 -08:00
print ( " \n info unprivileged_device_info: {} \n " . format ( r ) )
2024-11-20 15:23:03 -08:00
# This device info includes the mesh ID of the device, even though the user doesn't have acces to that mesh. That's odd.
# assert r.meshid is None, "Individual device is exposing its meshid"
assert r . links [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] [ " rights " ] == meshctrl . constants . DeviceRights . norights , " Unprivileged user has too many rights! "
2025-09-24 10:36:42 -07:00
assert ( await admin_session . add_users_to_device ( [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] , agent . nodeid , meshctrl . constants . DeviceRights . fullrights ) ) , " Failed to modify user ' s permissions "
2024-11-20 15:23:03 -08:00
2025-09-24 10:36:42 -07:00
assert ( await unprivileged_session . device_info ( agent . nodeid , timeout = 10 ) ) . links [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] [ " rights " ] == meshctrl . constants . DeviceRights . fullrights , " Adding permissions did not update unprivileged user. "
2024-11-20 15:23:03 -08:00
2025-09-24 10:36:42 -07:00
# For now, this expects no response. If we ever figure out why the server isn't sending console information to us when it should, fix this.
2024-11-20 15:23:03 -08:00
# assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data"
2025-09-24 10:36:42 -07:00
# Meshcentral has a 10 second cache on user perms.
#await asyncio.sleep(15)
2024-11-20 15:23:03 -08:00
await unprivileged_session . run_command ( agent . nodeid , " ls " , timeout = 10 )
assert await admin_session . move_to_device_group ( agent . nodeid , mesh2 . meshid , timeout = 5 ) , " Failed to move mesh to new device group "
try :
await privileged_session . device_info ( agent . nodeid , timeout = 10 )
except * ValueError :
pass
else :
raise Exception ( " Privileged user has access to device after it was moved to a new mesh " )
assert await admin_session . move_to_device_group ( [ agent . nodeid ] , mesh . name , isname = True , timeout = 5 ) , " Failed to move mesh to new device group by name "
2025-09-24 10:36:42 -07:00
# For now, this expects no response. If we ever figure out why the server isn't sending console information te us when it should, fix this.
2024-11-20 15:23:03 -08:00
# assert "meshagent" in (await unprivileged_session.run_command(agent.nodeid, "ls", timeout=10))[agent.nodeid]["result"], "ls gave incorrect data"
try :
await unprivileged_session . run_command ( agent . nodeid , " ls " , timeout = 10 )
except :
raise Exception ( " Failed to run command on device after it was moved to a new mesh while having individual device permissions " )
2024-12-12 16:06:18 -08:00
r = await admin_session . remove_users_from_device_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , mesh . meshid , timeout = 10 )
print ( " \n info remove_users_from_device_group: {} \n " . format ( r ) )
assert ( r [ ( await privileged_session . user_info ( ) ) [ " _id " ] ] [ " success " ] ) , " Failed to remove user from device group "
2025-09-26 15:20:25 -07:00
# Dunno how to test if this actually works, so just check for errors, I guess.
admin_session . remove_devices ( agent . nodeid , timeout = 10 )
2024-12-12 16:06:18 -08:00
assert ( await admin_session . remove_users_from_device ( agent . nodeid , ( await unprivileged_session . user_info ( ) ) [ " _id " ] , timeout = 10 ) ) , " Failed to remove user from device "
2024-11-20 15:23:03 -08:00
assert ( await admin_session . remove_device_group ( mesh . meshid , timeout = 10 ) ) , " Failed to remove device group "
2024-12-12 16:06:18 -08:00
assert ( await admin_session . remove_device_group ( mesh2 . name , isname = True , timeout = 10 ) ) , " Failed to remove device group by name "
2024-11-20 15:23:03 -08:00
assert not ( await admin_session . add_users_to_device_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , mesh . meshid , rights = meshctrl . constants . MeshRights . fullrights , timeout = 5 ) ) [ ( await privileged_session . user_info ( ) ) [ " _id " ] ] [ " success " ] , " Added user to device group which doesn ' t exist? "
async def test_user_groups ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session , \
meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session , \
meshctrl . Session ( env . mcurl , user = " unprivileged " , password = env . users [ " unprivileged " ] , ignore_ssl = True ) as unprivileged_session :
2024-11-20 15:23:03 -08:00
user_group = await admin_session . add_user_group ( " test " , description = " aoeu " )
print ( " \n info add_user_group: {} \n " . format ( user_group ) )
user_group2 = await admin_session . add_user_group ( " test2 " , description = " aoeu " )
r = await admin_session . list_user_groups ( timeout = 10 )
assert len ( r ) == 2 , " Wrong number of user groups "
print ( " \n info list_user_groups: {} \n " . format ( r ) )
assert ( await privileged_session . list_user_groups ( timeout = 10 ) ) [ 0 ] . description == None , " User has access to group they should not "
r = await admin_session . add_users_to_user_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , user_group . id , timeout = 10 )
print ( " \n info add_users_to_user_group: {} \n " . format ( r ) )
assert r [ ( await privileged_session . user_info ( ) ) [ " _id " ] . split ( " / " ) [ - 1 ] ] [ " success " ] , " Failed to add user to user group "
assert ( await admin_session . add_users_to_user_group ( [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] ] , user_group . id . split ( " / " ) [ - 1 ] , timeout = 10 ) ) [ ( await unprivileged_session . user_info ( ) ) [ " _id " ] . split ( " / " ) [ - 1 ] ] [ " success " ] , " Failed to add user to user group "
r = await privileged_session . list_user_groups ( timeout = 10 )
print ( " \n info list_user_groups_non_owner: {} \n " . format ( r ) )
# Non owners just don't get to see the description.
# assert r[0].description == "aoeu", "Failed to add user to user group"
assert await admin_session . remove_user_from_user_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , user_group . id , timeout = 10 )
assert await admin_session . remove_user_from_user_group ( ( await unprivileged_session . user_info ( ) ) [ " _id " ] , user_group . id . split ( " / " ) [ - 1 ] , timeout = 10 )
assert await admin_session . remove_user_group ( user_group . id )
assert await admin_session . remove_user_group ( user_group2 . id . split ( " / " ) [ - 1 ] )
async def test_events ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session :
2024-11-20 15:23:03 -08:00
await admin_session . list_events ( )
mesh = await admin_session . add_device_group ( " test " , description = " This is a test group " , amtonly = False , features = 0 , consent = 0 , timeout = 10 )
try :
with env . create_agent ( mesh . short_meshid ) as agent :
# Create agent isn't so good at waiting for the agent to show in the sessions. Give it a couple seconds to appear.
for i in range ( 3 ) :
try :
r = await admin_session . list_devices ( timeout = 10 )
assert len ( r ) == 1 , " Incorrect number of agents connected "
except :
if i == 2 :
raise
await asyncio . sleep ( 1 )
else :
break
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session :
2024-11-20 15:23:03 -08:00
# assert len(await privileged_session.list_events()) == 0, "non-admin user has access to admin events"
await admin_session . run_command ( agent . nodeid , " ls " , timeout = 10 )
events = await privileged_session . list_events ( nodeid = agent . nodeid )
admin_events = await admin_session . list_events ( nodeid = agent . nodeid )
# For some reason, this lets you get these. Probably a bug.
# assert len(events) == 0, "User has access to events on device on which they are not a user"
assert len ( admin_events ) > 0 , " Admin didn ' t get events from running a command "
await admin_session . add_users_to_device_group ( ( await privileged_session . user_info ( ) ) [ " _id " ] , mesh . meshid , rights = meshctrl . constants . MeshRights . fullrights , timeout = 5 )
events = await privileged_session . list_events ( )
admin_events = await admin_session . list_events ( )
events = await admin_session . list_events ( )
assert len ( events ) > 1 , " Missed some events "
assert len ( await admin_session . list_events ( limit = 1 ) ) == 1 , " Event limiter gave wrong number of events "
events = await privileged_session . list_events ( userid = ( await admin_session . user_info ( ) ) [ " _id " ] )
admin_events = await admin_session . list_events ( userid = ( await privileged_session . user_info ( ) ) [ " _id " ] )
assert len ( events ) != len ( admin_events ) , " Failed to filter events based on user id "
finally :
assert ( await admin_session . remove_device_group ( mesh . meshid , timeout = 10 ) ) , " Failed to remove device group "
async def test_interuser ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session , \
meshctrl . Session ( env . mcurl , user = " privileged " , password = env . users [ " privileged " ] , ignore_ssl = True ) as privileged_session :
2024-11-20 15:23:03 -08:00
got_message = asyncio . Event ( )
async def _ ( ) :
async for message in admin_session . events ( { " action " : " interuser " } ) :
print ( " \n info interuser_message: {} \n " . format ( message ) )
assert message [ " data " ] == " ping " , " Got wrong interuser message "
await admin_session . interuser ( " pong " , session = message [ " sessionid " ] )
break
async def __ ( ) :
async for message in privileged_session . events ( { " action " : " interuser " } ) :
assert message [ " data " ] == " pong " , " Got wrong interuser message "
got_message . set ( )
break
async with asyncio . TaskGroup ( ) as tg :
tg . create_task ( _ ( ) )
tg . create_task ( __ ( ) )
# Interuser only works with username, not id
tg . create_task ( privileged_session . interuser ( " ping " , user = ( await admin_session . user_info ( ) ) [ " _id " ] . split ( " / " ) [ - 1 ] ) )
tg . create_task ( asyncio . wait_for ( got_message . wait ( ) , 5 ) )
async def test_session_files ( env ) :
2024-12-02 13:39:40 -08:00
async with meshctrl . Session ( env . mcurl , user = " admin " , password = env . users [ " admin " ] , ignore_ssl = True ) as admin_session :
2024-11-20 15:23:03 -08:00
mesh = await admin_session . add_device_group ( " test " , description = " This is a test group " , amtonly = False , features = 0 , consent = 0 , timeout = 10 )
try :
with env . create_agent ( mesh . short_meshid ) as agent :
# Create agent isn't so good at waiting for the agent to show in the sessions. Give it a couple seconds to appear.
for i in range ( 3 ) :
try :
r = await admin_session . list_devices ( timeout = 10 )
assert len ( r ) == 1 , " Incorrect number of agents connected "
except :
if i == 2 :
raise
await asyncio . sleep ( 1 )
else :
break
pwd = ( await admin_session . run_command ( agent . nodeid , " pwd " , timeout = 10 ) ) [ agent . nodeid ] [ " result " ] . strip ( )
2024-12-12 18:06:50 -08:00
randdata = random . randbytes ( 20000000 )
2024-11-20 15:23:03 -08:00
upfilestream = io . BytesIO ( randdata )
downfilestream = io . BytesIO ( )
os . makedirs ( os . path . join ( thisdir , " data " ) , exist_ok = True )
with open ( os . path . join ( thisdir , " data " , " test " ) , " wb " ) as outfile :
outfile . write ( randdata )
r = await admin_session . upload ( agent . nodeid , upfilestream , f " { pwd } /test " , timeout = 5 )
print ( " \n info files_upload: {} \n " . format ( r ) )
2024-12-12 16:06:18 -08:00
assert r [ " result " ] == True , " Upload failed "
2024-11-20 15:23:03 -08:00
assert r [ " size " ] == len ( randdata ) , " Uploaded wrong number of bytes "
r = await admin_session . upload_file ( agent . nodeid , os . path . join ( thisdir , " data " , " test " ) , f " { pwd } /test2 " , timeout = 5 )
print ( " \n info files_upload: {} \n " . format ( r ) )
2024-12-12 16:06:18 -08:00
assert r [ " result " ] == True , " Upload failed "
2024-11-20 15:23:03 -08:00
assert r [ " size " ] == len ( randdata ) , " Uploaded wrong number of bytes "
s = await admin_session . download ( agent . nodeid , f " { pwd } /test " , timeout = 5 )
assert s . read ( ) == randdata , " Downloaded bad data "
await admin_session . download ( agent . nodeid , f " { pwd } /test " , downfilestream , timeout = 5 )
assert downfilestream . read ( ) == randdata , " Downloaded bad data "
await admin_session . download_file ( agent . nodeid , f " { pwd } /test2 " , os . path . join ( thisdir , " data " , " test " ) , timeout = 5 )
with open ( os . path . join ( thisdir , " data " , " test " ) , " rb " ) as infile :
assert infile . read ( ) == randdata , " Downloaded bad data into file "
r = await admin_session . upload_file ( agent . nodeid , os . path . join ( thisdir , " data " , " test " ) , f " { pwd } /test2 " , unique_file_tunnel = True , timeout = 5 )
2024-12-12 16:06:18 -08:00
assert r [ " result " ] == True , " Upload failed "
2024-11-20 15:23:03 -08:00
assert r [ " size " ] == len ( randdata ) , " Uploaded wrong number of bytes "
await admin_session . download_file ( agent . nodeid , f " { pwd } /test2 " , os . path . join ( thisdir , " data " , " test " ) , unique_file_tunnel = True , timeout = 5 )
with open ( os . path . join ( thisdir , " data " , " test " ) , " rb " ) as infile :
assert infile . read ( ) == randdata , " Downloaded bad data into file "
finally :
assert ( await admin_session . remove_device_group ( mesh . meshid , timeout = 10 ) ) , " Failed to remove device group "