mirror of
https://github.com/HuFlungDu/pylibmeshctrl.git
synced 2026-02-20 13:42:11 +00:00
Added Session to the __init__ file, and changed docs and test accordingly
This commit is contained in:
@@ -8,8 +8,8 @@ import io
|
||||
thisdir = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
async def test_admin(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
admin_users = await admin_session.list_users(timeout=10)
|
||||
print("\ninfo list_users: {}\n".format(admin_users))
|
||||
try:
|
||||
@@ -34,22 +34,22 @@ async def test_admin(env):
|
||||
|
||||
async def test_users(env):
|
||||
try:
|
||||
async with meshctrl.session.Session(env.mcurl[3:], user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
async with meshctrl.Session(env.mcurl[3:], user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
pass
|
||||
except* ValueError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("Connected with bad URL")
|
||||
try:
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", ignore_ssl=True) as admin_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", ignore_ssl=True) as admin_session:
|
||||
pass
|
||||
except* meshctrl.exceptions.MeshCtrlError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("Connected with no password")
|
||||
async with meshctrl.session.Session(env.mcurl+"/", user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
async with meshctrl.Session(env.mcurl+"/", user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
|
||||
assert len(await admin_session.list_users(timeout=10)) == 3, "Wrong number of users"
|
||||
|
||||
@@ -74,17 +74,17 @@ async def test_users(env):
|
||||
assert len(await admin_session.list_users(timeout=10)) == 3, "Failed to remove user"
|
||||
|
||||
async def test_login_token(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s:
|
||||
async with meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as s:
|
||||
token = await s.add_login_token("test", expire=1, timeout=10)
|
||||
print("\ninfo add_login_token: {}\n".format(token))
|
||||
|
||||
async with meshctrl.session.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
async with meshctrl.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
assert (await s2.user_info())["_id"] == (await s.user_info())["_id"], "Login token logged into wrong account"
|
||||
# Wait for the login token to expire
|
||||
await asyncio.sleep(65)
|
||||
|
||||
try:
|
||||
async with meshctrl.session.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
async with meshctrl.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
@@ -94,7 +94,7 @@ async def test_login_token(env):
|
||||
token = await s.add_login_token("test2", timeout=10)
|
||||
token2 = await s.add_login_token("test3", timeout=10)
|
||||
print("\ninfo add_login_token_no_expire: {}\n".format(token))
|
||||
async with meshctrl.session.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
async with meshctrl.Session(env.mcurl, user=token["tokenUser"], password=token["tokenPass"], ignore_ssl=True) as s2:
|
||||
assert (await s2.user_info())["_id"] == (await s.user_info())["_id"], "Login token logged into wrong account"
|
||||
|
||||
r = await s.list_login_tokens(timeout=10)
|
||||
@@ -107,9 +107,9 @@ async def test_login_token(env):
|
||||
assert len(await s.remove_login_token([token2["name"]], timeout=10)) == 0, "Residual login tokens"
|
||||
|
||||
async def test_mesh_device(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
# Test creating a mesh
|
||||
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
|
||||
print("\ninfo add_device_group: {}\n".format(mesh))
|
||||
@@ -266,9 +266,9 @@ async def test_mesh_device(env):
|
||||
assert not (await admin_session.add_users_to_device_group((await privileged_session.user_info())["_id"], mesh.meshid, rights=meshctrl.constants.MeshRights.fullrights, timeout=5))[(await privileged_session.user_info())["_id"]]["success"], "Added user to device group which doesn't exist?"
|
||||
|
||||
async def test_user_groups(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session,\
|
||||
meshctrl.Session(env.mcurl, user="unprivileged", password=env.users["unprivileged"], ignore_ssl=True) as unprivileged_session:
|
||||
|
||||
user_group = await admin_session.add_user_group("test", description="aoeu")
|
||||
print("\ninfo add_user_group: {}\n".format(user_group))
|
||||
@@ -294,7 +294,7 @@ async def test_user_groups(env):
|
||||
assert await admin_session.remove_user_group(user_group2.id.split("/")[-1])
|
||||
|
||||
async def test_events(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
await admin_session.list_events()
|
||||
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
|
||||
try:
|
||||
@@ -310,7 +310,7 @@ async def test_events(env):
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
break
|
||||
async with meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
async with meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
|
||||
# assert len(await privileged_session.list_events()) == 0, "non-admin user has access to admin events"
|
||||
|
||||
@@ -337,8 +337,8 @@ async def test_events(env):
|
||||
assert (await admin_session.remove_device_group(mesh.meshid, timeout=10)), "Failed to remove device group"
|
||||
|
||||
async def test_interuser(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.session.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session,\
|
||||
meshctrl.Session(env.mcurl, user="privileged", password=env.users["privileged"], ignore_ssl=True) as privileged_session:
|
||||
got_message = asyncio.Event()
|
||||
async def _():
|
||||
async for message in admin_session.events({"action": "interuser"}):
|
||||
@@ -361,7 +361,7 @@ async def test_interuser(env):
|
||||
tg.create_task(asyncio.wait_for(got_message.wait(), 5))
|
||||
|
||||
async def test_session_files(env):
|
||||
async with meshctrl.session.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
async with meshctrl.Session(env.mcurl, user="admin", password=env.users["admin"], ignore_ssl=True) as admin_session:
|
||||
mesh = await admin_session.add_device_group("test", description="This is a test group", amtonly=False, features=0, consent=0, timeout=10)
|
||||
try:
|
||||
with env.create_agent(mesh.short_meshid) as agent:
|
||||
|
||||
Reference in New Issue
Block a user