Source code for binaryninja.collaboration.project

import ctypes
import datetime
import tempfile
from os import PathLike
from pathlib import Path
from typing import Dict, List, Optional, Union

import binaryninja

from . import _collaboration as core
from . import enums
from . import remote
from . import file
from . import folder
from . import permission
from . import databasesync
from . import util
from typing import List, Tuple, Optional, Dict, Union
import ctypes


[docs] def nop(*args, **kwargs): return True
[docs] class RemoteProject: """ Class representing a remote project """ def __init__(self, handle): self._handle = core.BNCollaborationProjectCopy(handle) def __del__(self): if self._handle is not None: core.BNCollaborationProjectFree(self._handle) def __eq__(self, other): if type(other) is not RemoteProject: return False return other.id == self.id def __str__(self): return f'<project: {self.remote.name}/{self.name}>' def __repr__(self): return f'<project: {self.remote.name}/{self.name}>'
[docs] @staticmethod def get_for_local_database(database: 'binaryninja.Database') -> Optional['RemoteProject']: """ Get the Remote Project for a Database :param database: BN database, potentially with collaboration metadata :return: Remote project from one of the connected remotes, or None if not found or if projects are not pulled :raises RuntimeError: If there was an error """ remote = databasesync.get_remote_for_local_database(database) if remote is None: return None if not remote.has_pulled_projects: remote.pull_projects() return databasesync.get_remote_project_for_local_database(database)
[docs] @staticmethod def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['RemoteProject']: """ Get the Remote Project for a BinaryView :param bv: BinaryView, potentially with collaboration metadata :return: Remote project from one of the connected remotes, or None if not found or if projects are not pulled :raises RuntimeError: If there was an error """ if not bv.file.has_database: return None return RemoteProject.get_for_local_database(bv.file.database)
@property def core_project(self) -> 'binaryninja.Project': """ Get the core :py:class:`binaryninja.Project` object for this Remote Project. .. note:: If the project has not been opened, it will be opened upon calling this. :return: Project instance """ if not self.is_open: self.open() core_handle = core.BNCollaborationProjectGetCoreProject(self._handle) if core_handle is None: raise RuntimeError(util._last_error()) return binaryninja.Project(handle=ctypes.cast(core_handle, ctypes.POINTER(binaryninja.core.BNProject))) @property def remote(self) -> 'remote.Remote': """ Owning Remote :return: Remote object """ value = core.BNCollaborationProjectGetRemote(self._handle) if value is None: raise RuntimeError(util._last_error()) result = remote.Remote(handle=value) core.BNCollaborationRemoteFree(value) return result @property def url(self): """ Web api endpoint URL :return: URL string """ return core.BNCollaborationProjectGetUrl(self._handle) @property def id(self): """ Unique id :return: Id string """ return core.BNCollaborationProjectGetId(self._handle) @property def created(self) -> datetime.datetime: """ Created date of the project :return: Date object """ return datetime.datetime.utcfromtimestamp(core.BNCollaborationProjectGetCreated(self._handle)) @property def last_modified(self) -> datetime.datetime: """ Last modified date of the project :return: Date object """ return datetime.datetime.utcfromtimestamp(core.BNCollaborationProjectGetLastModified(self._handle)) @property def name(self): """ Displayed name of project :return: Name string """ return core.BNCollaborationProjectGetName(self._handle) @property def description(self): """ Description of the project :return: Description string """ return core.BNCollaborationProjectGetDescription(self._handle) @name.setter def name(self, value): """ Set the display name of the project. You will need to push the project to update the remote version. :param value: New name """ if not core.BNCollaborationProjectSetName(self._handle, value): raise RuntimeError(util._last_error()) @description.setter def description(self, value): """ Set the description of the project. You will need to push the project to update the remote version. :param description: New description """ if not core.BNCollaborationProjectSetDescription(self._handle, value): raise RuntimeError(util._last_error()) @property def received_file_count(self) -> int: """ Get the number of files in a project (without needing to pull them first) :return: Number of files """ return core.BNCollaborationProjectGetReceivedFileCount(self._handle) @property def received_folder_count(self) -> int: """ Get the number of folders in a project (without needing to pull them first) :return: Number of folders """ return core.BNCollaborationProjectGetReceivedFolderCount(self._handle) @property def default_path(self) -> str: """ Get the default directory path for a remote Project. This is based off the Setting for collaboration.directory, the project's id, and the project's remote's id. :return: Default project path :rtype: str """ return databasesync.default_project_path(self) @property def has_pulled_files(self): """ If the project has pulled the files yet :return: True if they have been pulled """ return core.BNCollaborationProjectHasPulledFiles(self._handle) @property def has_pulled_folders(self): """ If the project has pulled the folders yet :return: True if they have been pulled """ return core.BNCollaborationProjectHasPulledFolders(self._handle) @property def has_pulled_group_permissions(self): """ If the project has pulled the group permissions yet :return: True if they have been pulled """ return core.BNCollaborationProjectHasPulledGroupPermissions(self._handle) @property def has_pulled_user_permissions(self): """ If the project has pulled the user permissions yet :return: True if they have been pulled """ return core.BNCollaborationProjectHasPulledUserPermissions(self._handle) @property def is_admin(self): """ If the currently logged in user is an administrator of the project (and can edit permissions and such for the project). :return: True if the user is an admin """ return core.BNCollaborationProjectIsAdmin(self._handle) @property def is_open(self) -> bool: """ Determine if the project is open (it needs to be opened before you can access its files) :return: True if open """ return core.BNCollaborationProjectIsOpen(self._handle)
[docs] def open(self, progress: 'util.ProgressFuncType' = nop): """ Open the project, allowing various file and folder based apis to work, as well as connecting a core Project (see :py:func:`core_project`). :param progress: Function to call for progress updates :raises: RuntimeError if there was an error opening the Project """ if not core.BNCollaborationProjectOpen(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def close(self): """ Close the project and stop all background operations (e.g. file uploads) """ if not core.BNCollaborationProjectClose(self._handle): raise RuntimeError(util._last_error())
@property def files(self) -> List['file.RemoteFile']: """ Get the list of files in this project. .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If folders have not been pulled, they will be pulled upon calling this. .. note:: If files have not been pulled, they will be pulled upon calling this. :return: List of File objects :raises: RuntimeError if there was an error pulling files """ if not self.is_open: self.open() if not self.has_pulled_files: self.pull_files() count = ctypes.c_size_t() value = core.BNCollaborationProjectGetFiles(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(file.RemoteFile(value[i])) core.BNCollaborationFileFreeList(value, count.value) return result
[docs] def get_file_by_id(self, id: str) -> Optional['file.RemoteFile']: """ Get a specific File in the Project by its id .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If files have not been pulled, they will be pulled upon calling this. :param id: Id of File :return: File object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling files """ if not self.is_open: self.open() if not self.has_pulled_files: self.pull_files() value = core.BNCollaborationProjectGetFileById(self._handle, id) if value is None: return None result = file.RemoteFile(value) core.BNCollaborationFileFree(value) return result
[docs] def get_file_by_name(self, name: str) -> Optional['file.RemoteFile']: """ Get a specific File in the Project by its name .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If files have not been pulled, they will be pulled upon calling this. :param name: Name of File :return: File object, if one with that name exists. Else, None :raises: RuntimeError if there was an error pulling files """ if not self.is_open: self.open() if not self.has_pulled_files: self.pull_files() value = core.BNCollaborationProjectGetFileByName(self._handle, name) if value is None: return None result = file.RemoteFile(value) core.BNCollaborationFileFree(value) return result
[docs] def pull_files(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of files from the Remote. .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If folders have not been pulled, they will be pulled upon calling this. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling files """ if not self.is_open: self.open() if not self.has_pulled_folders: self.pull_folders() if not core.BNCollaborationProjectPullFiles(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_file(self, filename: str, contents: bytes, name: str, description: str, parent_folder: Optional['folder.RemoteFolder'] = None, file_type: enums.FileType = enums.FileType.BNCollaborationFileTypeBinaryViewAnalysis, progress: 'util.ProgressFuncType' = nop) -> 'file.RemoteFile': """ Create a new file on the remote (and pull it) .. note:: If the project has not been opened, it will be opened upon calling this. :param filename: File name :param contents: File contents :param name: Displayed file name :param description: File description :param parent_folder: Folder that will contain the file :param file_type: Type of File to create :param progress: Function to call on upload progress updates :return: Reference to the created file :raises: RuntimeError if there was an error """ if not self.is_open: self.open() folder_handle = parent_folder._handle if parent_folder is not None else None value = core.BNCollaborationProjectCreateFile(self._handle, filename, contents, len(contents), name, description, folder_handle, file_type.value, util.wrap_progress(progress), None) if value is None: raise RuntimeError(util._last_error()) result = file.RemoteFile(value) core.BNCollaborationFileFree(value) return result
[docs] def push_file(self, file: 'file.RemoteFile', extra_fields: Optional[Dict[str, str]] = None): """ Push an updated File object to the Remote .. note:: If the project has not been opened, it will be opened upon calling this. :param file: File object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if not self.is_open: self.open() if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationProjectPushFile(self._handle, file._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())
[docs] def delete_file(self, file: 'file.RemoteFile'): """ Delete a file from the remote .. note:: If the project has not been opened, it will be opened upon calling this. :param file: File to delete :raises: RuntimeError if there was an error """ if not self.is_open: self.open() if not core.BNCollaborationProjectDeleteFile(self._handle, file._handle): raise RuntimeError(util._last_error())
@property def folders(self) -> List['folder.RemoteFolder']: """ Get the list of folders in this project. .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If folders have not been pulled, they will be pulled upon calling this. :return: List of Folder objects :raises: RuntimeError if there was an error pulling folders """ if not self.is_open: self.open() if not self.has_pulled_folders: self.pull_folders() count = ctypes.c_size_t() value = core.BNCollaborationProjectGetFolders(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(folder.RemoteFolder(value[i])) core.BNCollaborationFolderFreeList(value, count.value) return result
[docs] def get_folder_by_id(self, id: str) -> Optional['folder.RemoteFolder']: """ Get a specific Folder in the Project by its id .. note:: If the project has not been opened, it will be opened upon calling this. .. note:: If folders have not been pulled, they will be pulled upon calling this. :param id: Id of Folder :return: Folder object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling folders """ if not self.is_open: self.open() if not self.has_pulled_folders: self.pull_folders() value = core.BNCollaborationProjectGetFolderById(self._handle, id) if value is None: return None result = folder.RemoteFolder(value) core.BNCollaborationFolderFree(value) return result
[docs] def pull_folders(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of folders from the Remote. .. note:: If the project has not been opened, it will be opened upon calling this. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling folders """ if not self.is_open: self.open() if not core.BNCollaborationProjectPullFolders(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_folder(self, name: str, description: str, parent: Optional['folder.RemoteFolder'] = None, progress: 'util.ProgressFuncType' = nop) -> 'folder.RemoteFolder': """ Create a new folder on the remote (and pull it) .. note:: If the project has not been opened, it will be opened upon calling this. :param name: Displayed folder name :param description: Folder description :param parent: Parent folder (optional) :param progress: Function to call on upload progress updates :return: Reference to the created folder :raises: RuntimeError if there was an error pulling folders """ if not self.is_open: self.open() parent_handle = parent._handle if parent is not None else None value = core.BNCollaborationProjectCreateFolder(self._handle, name, description, parent_handle, util.wrap_progress(progress), None) if value is None: raise RuntimeError(util._last_error()) result = folder.RemoteFolder(value) core.BNCollaborationFolderFree(value) return result
[docs] def push_folder(self, folder: 'folder.RemoteFolder', extra_fields: Optional[Dict[str, str]] = None): """ Push an updated Folder object to the Remote .. note:: If the project has not been opened, it will be opened upon calling this. :param folder: Folder object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if not self.is_open: self.open() if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationProjectPushFolder(self._handle, folder._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())
[docs] def delete_folder(self, folder: 'folder.RemoteFolder'): """ Delete a folder from the remote .. note:: If the project has not been opened, it will be opened upon calling this. :param folder: Folder to delete :raises: RuntimeError if there was an error """ if not self.is_open: self.open() if not core.BNCollaborationProjectDeleteFolder(self._handle, folder._handle): raise RuntimeError(util._last_error())
@property def group_permissions(self) -> List['permission.Permission']: """ Get the list of group permissions in this project. .. note:: If group permissions have not been pulled, they will be pulled upon calling this. :return: List of Permission objects :raises: RuntimeError if there was an error pulling group permissions """ if not self.has_pulled_group_permissions: self.pull_group_permissions() count = ctypes.c_size_t() value = core.BNCollaborationProjectGetGroupPermissions(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(permission.Permission(value[i])) core.BNCollaborationPermissionFreeList(value, count.value) return result @property def user_permissions(self) -> List['permission.Permission']: """ Get the list of user permissions in this project. .. note:: If user permissions have not been pulled, they will be pulled upon calling this. :return: List of Permission objects :raises: RuntimeError if there was an error pulling user permissions """ if not self.has_pulled_user_permissions: self.pull_user_permissions() count = ctypes.c_size_t() value = core.BNCollaborationProjectGetUserPermissions(self._handle, count) if value is None: raise RuntimeError(util._last_error()) result = [] for i in range(count.value): result.append(permission.Permission(value[i])) core.BNCollaborationPermissionFreeList(value, count.value) return result
[docs] def get_permission_by_id(self, id: str) -> Optional['permission.Permission']: """ Get a specific permission in the Project by its id .. note:: If group or user permissions have not been pulled, they will be pulled upon calling this. :param id: Id of Permission :return: Permission object, if one with that id exists. Else, None :raises: RuntimeError if there was an error pulling permissions """ if not self.has_pulled_user_permissions: self.pull_user_permissions() if not self.has_pulled_group_permissions: self.pull_group_permissions() value = core.BNCollaborationProjectGetPermissionById(self._handle, id) if value is None: return None result = permission.Permission(value) core.BNCollaborationPermissionFree(value) return result
[docs] def pull_group_permissions(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of group permissions from the Remote. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling permissions """ if not core.BNCollaborationProjectPullGroupPermissions(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def pull_user_permissions(self, progress: 'util.ProgressFuncType' = nop): """ Pull the list of user permissions from the Remote. :param progress: Function to call for progress updates :raises: RuntimeError if there was an error pulling permissions """ if not core.BNCollaborationProjectPullUserPermissions(self._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def create_group_permission(self, group_id: int, level: 'permission.PermissionLevel', progress: 'util.ProgressFuncType' = nop) -> 'permission.Permission': """ Create a new group permission on the remote (and pull it) :param group_id: Group id :param level: Permission level :param progress: Function to call on upload progress updates :return: Reference to the created permission :raises: RuntimeError if there was an error pulling permissions """ value = core.BNCollaborationProjectCreateGroupPermission(self._handle, group_id, level, util.wrap_progress(progress), None) if value is None: raise RuntimeError(util._last_error()) result = permission.Permission(value) core.BNCollaborationPermissionFree(value) return result
[docs] def create_user_permission(self, user_id: str, level: 'permission.PermissionLevel', progress: 'util.ProgressFuncType' = nop) -> 'permission.Permission': """ Create a new user permission on the remote (and pull it) :param user_id: User id :param level: Permission level :param progress: Function to call on upload progress updates :return: Reference to the created permission :raises: RuntimeError if there was an error pulling permissions """ value = core.BNCollaborationProjectCreateUserPermission(self._handle, user_id, level, util.wrap_progress(progress), None) if value is None: raise RuntimeError(util._last_error()) result = permission.Permission(value) core.BNCollaborationPermissionFree(value) return result
[docs] def push_permission(self, permission: 'permission.Permission', extra_fields: Optional[Dict[str, str]] = None): """ Push project permissions to the remote :param permission: Permission object which has been updated :param extra_fields: Extra HTTP fields to send with the update :raises: RuntimeError if there was an error """ if extra_fields is None: extra_fields = {} extra_field_keys = (ctypes.c_char_p * len(extra_fields))() extra_field_values = (ctypes.c_char_p * len(extra_fields))() for (i, (key, value)) in enumerate(extra_fields.items()): extra_field_keys[i] = core.cstr(key) extra_field_values[i] = core.cstr(value) if not core.BNCollaborationProjectPushPermission(self._handle, permission._handle, extra_field_keys, extra_field_values, len(extra_fields)): raise RuntimeError(util._last_error())
[docs] def delete_permission(self, permission: 'permission.Permission'): """ Delete a permission from the remote :param permission: Permission to delete :raises: RuntimeError if there was an error """ if not core.BNCollaborationProjectDeletePermission(self._handle, permission._handle): raise RuntimeError(util._last_error())
[docs] def can_user_view(self, username: str) -> bool: """ Determine if a user is in any of the view/edit/admin groups :param username: Username of user to check :return: True if they are in any of those groups :raises: RuntimeError if there was an error """ return core.BNCollaborationProjectCanUserView(self._handle, username)
[docs] def can_user_edit(self, username: str) -> bool: """ Determine if a user is in any of the edit/admin groups :param username: Username of user to check :return: True if they are in any of those groups :raises: RuntimeError if there was an error """ return core.BNCollaborationProjectCanUserEdit(self._handle, username)
[docs] def can_user_admin(self, username: str) -> bool: """ Determine if a user is in the admin group :param username: Username of user to check :return: True if they are in any of those groups :raises: RuntimeError if there was an error """ return core.BNCollaborationProjectCanUserAdmin(self._handle, username)
[docs] def upload_new_file( self, target: Union[str, PathLike, 'binaryninja.BinaryView', 'binaryninja.FileMetadata'], parent_folder: Optional['folder.RemoteFolder'] = None, progress: 'util.ProgressFuncType' = nop, open_view_options = None) -> 'file.RemoteFile': """ Upload a file to the project, creating a new File and pulling it .. note:: If the project has not been opened, it will be opened upon calling this. :param target: Path to file on disk or BinaryView/FileMetadata object of already-opened file :param parent_folder: Parent folder to place the uploaded file in :param progress: Function to call for progress updates :return: Created File object :raises: RuntimeError if there was an error """ if not self.is_open: self.open() if type(target) is binaryninja.FileMetadata: if target.has_database: return databasesync.upload_database(target, self, parent_folder=parent_folder, progress=progress) else: target = target.raw if type(target) is binaryninja.BinaryView: maybe_bv = target target = maybe_bv.file.original_filename else: # Convert PathLike to string if isinstance(target, (Path,)): target = target.resolve() target = str(target) # Argument is a path, try opening it: try: if open_view_options is None: open_view_options = {} maybe_bv = binaryninja.load( target, progress_func=util.split_progress(progress, 0, [0.25, 0.75]), **open_view_options) except Exception as e: raise RuntimeError("Could not upload view: " + str(e)) with maybe_bv as bv: # Can't open, can't upload if not bv: raise RuntimeError("Could not open file at path for uploading") # If it is backed by a database, just upload that metadata = bv.file if metadata.has_database: uploaded = databasesync.upload_database( metadata, self, parent_folder=parent_folder, progress=util.split_progress(progress, 1, [0.25, 0.75])) return uploaded # Ported from remotebrowser.cpp (original comments): # TODO: This is not efficient at all! # No db exists, so create one before uploading so we always have a root snapshot # on the server # - Load file into memory # - Make temp path for temp database # - Make temp database with file # - UploadDatabase copies the temp database and makes its own # - Delete temp database # - Now you don't have an empty remote file with tempfile.TemporaryDirectory() as temp_dir: db_path = Path(temp_dir) / Path(target).name binaryninja.log_info(f'Saving temporary database at {db_path}') # Save bndb first to create database if not metadata.create_database( str(db_path), util.split_progress(progress, 1, [0.25, 0.25, 0.25, 0.25])): raise RuntimeError("Could not save database for temporary path") if not metadata.save_auto_snapshot( util.split_progress(progress, 2, [0.25, 0.25, 0.25, 0.25])): raise RuntimeError("Could not create initial snapshot for upload") metadata.filename = str(db_path) uploaded = databasesync.upload_database( metadata, self, parent_folder=parent_folder, progress=util.split_progress(progress, 3, [0.25, 0.25, 0.25, 0.25])) return uploaded