Source code for binaryninja.collaboration.remote

import ctypes
import json
from typing import Dict, List, Optional, Tuple

import binaryninja
import binaryninja.enterprise as enterprise

from . import _collaboration as core
from . import databasesync, group, project, user, util


[docs] def nop(*args, **kwargs): return True
[docs] class Remote: """ Class representing a connection to a Collaboration server """ def __init__(self, name: Optional[str] = None, address: Optional[str] = None, handle=None): """ Create a Remote object (but don't connect to it yet) :param name: Identifier for remote :param address: Base address (HTTPS) for all api requests :param handle: FFI handle for internal use :raises: RuntimeError if there was an error """ if handle is not None: self._handle = core.BNCollaborationRemoteCopy(handle) else: if name is None: raise RuntimeError("Name cannot be None if no handle is provided") if address is None: raise RuntimeError("Address cannot be None if no handle is provided") self._handle = core.BNCollaborationRemoteCreate(name, address) def __del__(self): if self._handle is not None: core.BNCollaborationRemoteFree(self._handle) def __eq__(self, other): if type(other) is not Remote: return False if not self.has_loaded_metadata or not other.has_loaded_metadata: # Don't pull metadata if we haven't yet return self.address == other.address return other.unique_id == self.unique_id def __str__(self): return f'<remote: {self.name}>' def __repr__(self): return f'<remote: {self.name}>'
[docs] @staticmethod def get_for_local_database(database: 'binaryninja.Database') -> Optional['Remote']: """ Get the Remote for a Database :param database: BN database, potentially with collaboration metadata :return: Remote from one of the connected remotes, or None if not found :rtype: Optional[Remote] :raises RuntimeError: If there was an error """ return databasesync.get_remote_for_local_database(database)
[docs] @staticmethod def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['Remote']: """ Get the Remote for a Binary View :param bv: Binary view, potentially with collaboration metadata :return: Remote from one of the connected remotes, or None if not found :raises RuntimeError: If there was an error """ if not bv.file.has_database: return None return databasesync.get_remote_for_local_database(bv.file.database)
@property def has_loaded_metadata(self): """ If the remote has pulled metadata like its id, etc :return: True if it has been pulled """ return core.BNCollaborationRemoteHasLoadedMetadata(self._handle) @property def unique_id(self) -> str: """ Unique id. If metadata has not been pulled, it will be pulled upon calling this. :return: Id string :raises RuntimeError: If there was an error pulling metadata. """ if not self.has_loaded_metadata: self.load_metadata() return core.BNCollaborationRemoteGetUniqueId(self._handle) @property def name(self) -> str: """ Assigned name of the Remote :return: Name string """ return core.BNCollaborationRemoteGetName(self._handle) @property def address(self) -> str: """ Base address of the Remote :return: URL string """ return core.BNCollaborationRemoteGetAddress(self._handle) @property def is_connected(self) -> bool: """ If the Remote is connected (has `Remote.connect` been called) :return: True if connected """ return core.BNCollaborationRemoteIsConnected(self._handle) @property def username(self) -> str: """ Username used to connect to the remote :return: Username string """ return core.BNCollaborationRemoteGetUsername(self._handle) @property def token(self) -> str: """ Token used to connect to the remote :return: Token string """ return core.BNCollaborationRemoteGetToken(self._handle) @property def server_version(self) -> int: """ Version of software running on the server. If metadata has not been pulled, it will be pulled upon calling this. :return: Server version number :raises RuntimeError: If there was an error """ if not self.has_loaded_metadata: self.load_metadata() return core.BNCollaborationRemoteGetServerVersion(self._handle) @property def server_build_id(self) -> str: """ Build id of software running on the server. If metadata has not been pulled, it will be pulled upon calling this. :return: Server build id string :raises RuntimeError: If there was an error """ if not self.has_loaded_metadata: self.load_metadata() return core.BNCollaborationRemoteGetServerBuildId(self._handle) @property def auth_backends(self) -> List[Tuple[str, str]]: """ List of supported authentication backends on the server. If metadata has not been pulled, it will be pulled upon calling this. :return: List of Backend id <=> backend display name tuples :raises RuntimeError: If there was an error """ if not self.has_loaded_metadata: self.load_metadata() backend_ids = ctypes.POINTER(ctypes.c_char_p)() backend_names = ctypes.POINTER(ctypes.c_char_p)() count = ctypes.c_size_t() if not core.BNCollaborationRemoteGetAuthBackends(self._handle, backend_ids, backend_names, count): raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append((core.pyNativeStr(backend_ids[i]), core.pyNativeStr(backend_names[i]))) core.BNCollaborationFreeStringList(backend_ids, count.value) core.BNCollaborationFreeStringList(backend_names, count.value) return result @property def is_admin(self) -> bool: """ If the currently connected user is an administrator. .. note:: If users have not been pulled, they will attempt to be pulled upon calling this. :return: True if the user is an administrator """ # This is the test by which the api knows it is an admin if not self.has_pulled_users: self.pull_users() return core.BNCollaborationRemoteIsAdmin(self._handle) @property def is_enterprise(self) -> bool: """ If this remote is the same as the Enterprise License server :return: True if the same """ if not self.has_loaded_metadata: self.load_metadata() return core.BNCollaborationRemoteIsEnterprise(self._handle)
[docs] def load_metadata(self): """ Load metadata from the Remote, including unique id and versions :raises RuntimeError: If there was an error """ if not core.BNCollaborationRemoteLoadMetadata(self._handle): raise RuntimeError(util._last_error())
[docs] def request_authentication_token(self, username: str, password: str) -> Optional[str]: """ Request an authentication token using a username and password. :param username: Username to authenticate with :param password: Password of user :return: Authentication token string, or None if there was an error """ return core.BNCollaborationRemoteRequestAuthenticationToken(self._handle, username, password)
[docs] def connect(self, username: Optional[str] = None, token: Optional[str] = None): """ Connect to a Remote, loading metadata and optionally acquiring a token. .. note:: If no username or token are provided, they will be looked up from the keychain, \ likely saved there by Enterprise authentication. :param username: Optional username to connect with :param token: Optional token to authenticate with :raises RuntimeError: If the connection fails """ if not self.has_loaded_metadata: self.load_metadata() if username is None: # Try logging in with defaults if self.is_enterprise: username = enterprise.username() token = enterprise.token() else: # Load from default secrets provider secrets = binaryninja.SecretsProvider[ binaryninja.Settings().get_string("enterprise.secretsProvider")] if not secrets.has_data(self.address): raise RuntimeError("No username and token provided, and none found " "in the default keychain.") creds = json.loads(secrets.get_data(self.address)) username = creds['username'] token = creds['token'] if username is None or token is None: raise RuntimeError("Cannot connect without a username or token") if not core.BNCollaborationRemoteConnect(self._handle, username, token): raise RuntimeError(util._last_error())
[docs] def disconnect(self): """ Disconnect from the remote :raises RuntimeError: If there was somehow an error """ if not core.BNCollaborationRemoteDisconnect(self._handle): raise RuntimeError(util._last_error())
@property def has_pulled_projects(self) -> bool: """ If the project has pulled the projects yet :return: True if they have been pulled """ return core.BNCollaborationRemoteHasPulledProjects(self._handle) @property def has_pulled_groups(self) -> bool: """ If the project has pulled the groups yet :return: True if they have been pulled """ return core.BNCollaborationRemoteHasPulledGroups(self._handle) @property def has_pulled_users(self) -> bool: """ If the project has pulled the users yet :return: True if they have been pulled """ return core.BNCollaborationRemoteHasPulledUsers(self._handle) @property def projects(self) -> List['project.RemoteProject']: """ Get the list of projects in this project. .. note:: If projects have not been pulled, they will be pulled upon calling this. :return: List of Project objects :raises: RuntimeError if there was an error pulling projects """ if not self.has_pulled_projects: self.pull_projects() count = ctypes.c_size_t() value = core.BNCollaborationRemoteGetProjects(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(project.RemoteProject(value[i])) core.BNCollaborationProjectFreeList(value, count.value) return result
[docs] def get_project_by_id(self, id: str) -> Optional['project.RemoteProject']: """ Get a specific project in the Remote by its id .. note:: If projects have not been pulled, they will be pulled upon calling this. :param id: Id of Project :return: Project object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling projects """ if not self.has_pulled_projects: self.pull_projects() value = core.BNCollaborationRemoteGetProjectById(self._handle, id) if value is None: return None result = project.RemoteProject(value) core.BNCollaborationProjectFree(value) return result
[docs] def get_project_by_name(self, name: str) -> Optional['project.RemoteProject']: """ Get a specific project in the Remote by its name .. note:: If projects have not been pulled, they will be pulled upon calling this. :param name: Name of Project :return: Project object, if one with that name exists. Else, None :raises: RuntimeError if there was an error pulling projects """ if not self.has_pulled_projects: self.pull_projects() value = core.BNCollaborationRemoteGetProjectByName(self._handle, name) if value is None: return None result = project.RemoteProject(value) core.BNCollaborationProjectFree(value) return result
[docs] def pull_projects(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of projects from the Remote. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling projects """ if not core.BNCollaborationRemotePullProjects(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_project(self, name: str, description: str) -> 'project.RemoteProject': """ Create a new project on the remote (and pull it) :param name: Project name :param description: Project description :return: Reference to the created project :raises: RuntimeError if there was an error """ value = core.BNCollaborationRemoteCreateProject(self._handle, name, description) if value is None: raise RuntimeError(util._last_error()) result = project.RemoteProject(value) core.BNCollaborationProjectFree(value) return result
[docs] def push_project(self, project: 'project.RemoteProject', extra_fields: Optional[Dict[str, str]] = None): """ Push an updated Project object to the Remote :param project: Project object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationRemotePushProject(self._handle, project._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())
[docs] def delete_project(self, project: 'project.RemoteProject'): """ Delete a project from the remote :param project: Project to delete :raises: RuntimeError if there was an error """ if not core.BNCollaborationRemoteDeleteProject(self._handle, project._handle): raise RuntimeError(util._last_error())
@property def groups(self) -> List['group.Group']: """ Get the list of groups in this project. .. note:: If groups have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :return: List of Group objects :raises: RuntimeError if there was an error pulling groups """ if not self.has_pulled_groups: self.pull_groups() count = ctypes.c_size_t() value = core.BNCollaborationRemoteGetGroups(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(group.Group(value[i])) core.BNCollaborationGroupFreeList(value, count.value) return result
[docs] def get_group_by_id(self, id: int) -> Optional['group.Group']: """ Get a specific group in the Remote by its id .. note:: If groups have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :param id: Id of Group :return: Group object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling groups """ if not self.has_pulled_groups: self.pull_groups() value = core.BNCollaborationRemoteGetGroupById(self._handle, id) if value is None: return None result = group.Group(value) core.BNCollaborationGroupFree(value) return result
[docs] def get_group_by_name(self, name: str) -> Optional['group.Group']: """ Get a specific group in the Remote by its name .. note:: If groups have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :param name: Name of Group :return: Group object, if one with that name exists. Else, None :raises: RuntimeError if there was an error pulling groups """ if not self.has_pulled_groups: self.pull_groups() value = core.BNCollaborationRemoteGetGroupByName(self._handle, name) if value is None: return None result = group.Group(value) core.BNCollaborationGroupFree(value) return result
[docs] def search_groups(self, prefix: str) -> List[Tuple[int, str]]: """ Search for groups in the Remote with a given prefix :param prefix: Prefix of name for groups :return: List of group id <=> group name pairs :raises: RuntimeError if there was an error """ count = ctypes.c_size_t() group_ids = ctypes.POINTER(ctypes.c_uint64)() group_names = ctypes.POINTER(ctypes.c_char_p)() if not core.BNCollaborationRemoteSearchGroups(self._handle, prefix, group_ids, group_names, count): raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append((group_ids[i], core.pyNativeStr(group_names[i]))) core.BNCollaborationFreeIdList(group_ids, count.value) core.BNCollaborationFreeStringList(group_names, count.value) return result
[docs] def pull_groups(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of groups from the Remote. .. note:: This function is only available to accounts with admin status on the Remote :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling groups """ if not core.BNCollaborationRemotePullGroups(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_group(self, name: str, usernames: List[str]) -> 'group.Group': """ Create a new group on the remote (and pull it) .. note:: This function is only available to accounts with admin status on the Remote :param name: Group name :param usernames: List of usernames of users in the group :return: Reference to the created group :raises: RuntimeError if there was an error """ c_usernames = (ctypes.c_char_p * len(usernames))() for (i, username) in enumerate(usernames): c_usernames[i] = core.cstr(username) value = core.BNCollaborationRemoteCreateGroup(self._handle, name, c_usernames, len(usernames)) if value is None: raise RuntimeError(util._last_error()) result = group.Group(value) core.BNCollaborationGroupFree(value) return result
[docs] def push_group(self, group: 'group.Group', extra_fields: Optional[Dict[str, str]] = None): """ Push an updated Group object to the Remote .. note:: This function is only available to accounts with admin status on the Remote :param group: Group object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationRemotePushGroup(self._handle, group._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())
[docs] def delete_group(self, group: 'group.Group'): """ Delete a group from the remote .. note:: This function is only available to accounts with admin status on the Remote :param group: Group to delete :raises: RuntimeError if there was an error """ if not core.BNCollaborationRemoteDeleteGroup(self._handle, group._handle): raise RuntimeError(util._last_error())
@property def users(self) -> List['user.User']: """ Get the list of users in this project. .. note:: If users have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :return: List of User objects :raises: RuntimeError if there was an error pulling users """ if not self.has_pulled_users: self.pull_users() count = ctypes.c_size_t() value = core.BNCollaborationRemoteGetUsers(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(user.User(value[i])) core.BNCollaborationUserFreeList(value, count.value) return result
[docs] def get_user_by_id(self, id: str) -> Optional['user.User']: """ Get a specific user in the Remote by their id .. note:: If users have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :param id: Id of User :return: User object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling users """ if not self.has_pulled_users: self.pull_users() value = core.BNCollaborationRemoteGetUserById(self._handle, id) if value is None: return None result = user.User(value) core.BNCollaborationUserFree(value) return result
[docs] def get_user_by_username(self, username: str) -> Optional['user.User']: """ Get a specific user in the Remote by their username .. note:: If users have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :param username: Username of User :return: User object, if one with that name exists. Else, None :raises: RuntimeError if there was an error pulling users """ if not self.has_pulled_users: self.pull_users() value = core.BNCollaborationRemoteGetUserByUsername(self._handle, username) if value is None: return None result = user.User(value) core.BNCollaborationUserFree(value) return result
@property def current_user(self) -> Optional['user.User']: """ Get the user object for the currently connected user (only if you are an admin) .. note:: If users have not been pulled, they will be pulled upon calling this. .. note:: This function is only available to accounts with admin status on the Remote :return: User object :raises: RuntimeError if there was an error pulling users """ if not self.has_pulled_users: self.pull_users() value = core.BNCollaborationRemoteGetCurrentUser(self._handle) if value is None: return None result = user.User(value) core.BNCollaborationUserFree(value) return result
[docs] def search_users(self, prefix: str) -> List[Tuple[str, str]]: """ Search for users in the Remote with a given prefix :param prefix: Prefix of name for users :return: List of user id <=> user name pairs :raises: RuntimeError if there was an error """ count = ctypes.c_size_t() user_ids = ctypes.POINTER(ctypes.c_char_p)() usernames = ctypes.POINTER(ctypes.c_char_p)() if not core.BNCollaborationRemoteSearchUsers(self._handle, prefix, user_ids, usernames, count): raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append((core.pyNativeStr(user_ids[i]), core.pyNativeStr(usernames[i]))) core.BNCollaborationFreeStringList(user_ids, count.value) core.BNCollaborationFreeStringList(usernames, count.value) return result
[docs] def pull_users(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of users from the Remote. .. note:: This function is only available to accounts with admin status on the Remote. \ Non-admin accounts attempting to call this function will pull an empty list of users. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling users """ if not core.BNCollaborationRemotePullUsers(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_user(self, username: str, email: str, is_active: bool, password: str, group_ids: List[int], user_permission_ids: List[int]) -> 'user.User': """ Create a new user on the remote (and pull it) .. note:: This function is only available to accounts with admin status on the Remote :param username: User username :param email: User email :param is_active: If the user is enabled :param password: User password :param group_ids: List of group ids for the user :param user_permission_ids: List of permission ids for the user :return: Reference to the created user :raises: RuntimeError if there was an error """ group_ids_array = (ctypes.c_uint64 * len(group_ids))() for i in range(len(group_ids)): group_ids_array[i] = group_ids[i] user_permission_ids_array = (ctypes.c_uint64 * len(group_ids))() for i in range(len(user_permission_ids)): user_permission_ids_array[i] = user_permission_ids[i] value = core.BNCollaborationRemoteCreateUser(self._handle, username, email, is_active, password, group_ids_array, len(group_ids), user_permission_ids_array, len(user_permission_ids)) if value is None: raise RuntimeError(util._last_error()) result = user.User(value) core.BNCollaborationUserFree(value) return result
[docs] def push_user(self, user: 'user.User', extra_fields: Optional[Dict[str, str]] = None): """ Push an updated User object to the Remote .. note:: This function is only available to accounts with admin status on the Remote :param group: User object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationRemotePushUser(self._handle, user._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())