import datetime
import tempfile
from os import PathLike
from pathlib import Path, PurePath
import binaryninja
from .enums import PermissionLevel
from . import _collaboration as core
from . import remote
from . import file
from . import folder
from . import permission
from . import databasesync
from . import util
from typing import List, Tuple, Optional, Dict, Union
import ctypes
[docs]def nop(*args, **kwargs):
return True
[docs]class Project:
"""
Class representing a remote project
"""
def __init__(self, handle):
self._handle = core.BNCollaborationProjectCopy(handle)
def __del__(self):
if self._handle is not None:
core.BNCollaborationProjectFree(self._handle)
def __eq__(self, other):
if type(other) is not Project:
return False
return other.id == self.id
def __str__(self):
return f'<project: {self.remote.name}/{self.name}>'
def __repr__(self):
return f'<project: {self.remote.name}/{self.name}>'
[docs] @staticmethod
def get_for_local_database(database: 'binaryninja.Database') -> Optional['Project']:
"""
Get the Remote Project for a Database
:param database: BN database, potentially with collaboration metadata
:return: Remote project from one of the connected remotes, or None if not found
or if projects are not pulled
:raises RuntimeError: If there was an error
"""
remote = databasesync.get_remote_for_local_database(database)
if not remote.has_pulled_projects:
remote.pull_projects()
return databasesync.get_remote_project_for_local_database(database)
[docs] @staticmethod
def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['Project']:
"""
Get the Remote Project for a BinaryView
:param bv: BinaryView, potentially with collaboration metadata
:return: Remote project from one of the connected remotes, or None if not found
or if projects are not pulled
:raises RuntimeError: If there was an error
"""
if not bv.file.has_database:
return None
return Project.get_for_local_database(bv.file.database)
@property
def remote(self) -> 'remote.Remote':
"""
Owning Remote
:return: Remote object
"""
value = core.BNCollaborationProjectGetRemote(self._handle)
if value is None:
raise RuntimeError(util._last_error())
result = remote.Remote(handle=value)
core.BNCollaborationRemoteFree(value)
return result
@property
def url(self):
"""
Web api endpoint URL
:return: URL string
"""
return core.BNCollaborationProjectGetUrl(self._handle)
@property
def id(self):
"""
Unique id
:return: Id string
"""
return core.BNCollaborationProjectGetId(self._handle)
@property
def created(self) -> datetime.datetime:
"""
Created date of the project
:return: Date object
"""
return datetime.datetime.utcfromtimestamp(core.BNCollaborationProjectGetCreated(self._handle))
@property
def last_modified(self) -> datetime.datetime:
"""
Last modified date of the project
:return: Date object
"""
return datetime.datetime.utcfromtimestamp(core.BNCollaborationProjectGetLastModified(self._handle))
@property
def name(self):
"""
Displayed name of project
:return: Name string
"""
return core.BNCollaborationProjectGetName(self._handle)
@property
def description(self):
"""
Description of the project
:return: Description string
"""
return core.BNCollaborationProjectGetDescription(self._handle)
@name.setter
def name(self, value):
"""
Set the display name of the project. You will need to push the project to update the remote version.
:param value: New name
"""
if not core.BNCollaborationProjectSetName(self._handle, value):
raise RuntimeError(util._last_error())
@description.setter
def description(self, value):
"""
Set the description of the project. You will need to push the project to update the remote version.
:param description: New description
"""
if not core.BNCollaborationProjectSetDescription(self._handle, value):
raise RuntimeError(util._last_error())
@property
def default_path(self) -> str:
"""
Get the default directory path for a remote Project. This is based off the Setting for
collaboration.directory, the project's id, and the project's remote's id.
:return: Default project path
:rtype: str
"""
return databasesync.default_project_path(self)
@property
def has_pulled_files(self):
"""
If the project has pulled the files yet
:return: True if they have been pulled
"""
return core.BNCollaborationProjectHasPulledFiles(self._handle)
@property
def has_pulled_folders(self):
"""
If the project has pulled the folders yet
:return: True if they have been pulled
"""
return core.BNCollaborationProjectHasPulledFolders(self._handle)
@property
def has_pulled_group_permissions(self):
"""
If the project has pulled the group permissions yet
:return: True if they have been pulled
"""
return core.BNCollaborationProjectHasPulledGroupPermissions(self._handle)
@property
def has_pulled_user_permissions(self):
"""
If the project has pulled the user permissions yet
:return: True if they have been pulled
"""
return core.BNCollaborationProjectHasPulledUserPermissions(self._handle)
@property
def is_admin(self):
"""
If the currently logged in user is an administrator of the project (and can edit
permissions and such for the project).
:return: True if the user is an admin
"""
return core.BNCollaborationProjectIsAdmin(self._handle)
@property
def files(self) -> List['file.File']:
"""
Get the list of files in this project.
.. note:: If folders have not been pulled, they will be pulled upon calling this.
.. note:: If files have not been pulled, they will be pulled upon calling this.
:return: List of File objects
:raises: RuntimeError if there was an error pulling files
"""
if not self.has_pulled_files:
self.pull_files()
count = ctypes.c_size_t()
value = core.BNCollaborationProjectGetFiles(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(file.File(value[i]))
core.BNCollaborationFileFreeList(value, count.value)
return result
[docs] def get_file_by_id(self, id: str) -> Optional['file.File']:
"""
Get a specific File in the Project by its id
.. note:: If files have not been pulled, they will be pulled upon calling this.
:param id: Id of File
:return: File object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling files
"""
if not self.has_pulled_files:
self.pull_files()
value = core.BNCollaborationProjectGetFileById(self._handle, id)
if value is None:
return None
result = file.File(value)
core.BNCollaborationFileFree(value)
return result
[docs] def get_file_by_name(self, name: str) -> Optional['file.File']:
"""
Get a specific File in the Project by its name
.. note:: If files have not been pulled, they will be pulled upon calling this.
:param name: Name of File
:return: File object, if one with that name exists. Else, None
:raises: RuntimeError if there was an error pulling files
"""
if not self.has_pulled_files:
self.pull_files()
value = core.BNCollaborationProjectGetFileByName(self._handle, name)
if value is None:
return None
result = file.File(value)
core.BNCollaborationFileFree(value)
return result
[docs] def pull_files(self, progress: 'util.ProgressFuncType' = nop):
"""
Pull the list of files from the Remote.
.. note:: If folders have not been pulled, they will be pulled upon calling this.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling files
"""
if not self.has_pulled_folders:
self.pull_folders()
if not core.BNCollaborationProjectPullFiles(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs] def create_file(self, filename: str, contents: bytes, name: str, description: str, parent_folder: Optional[folder.Folder] = None, progress: 'util.ProgressFuncType' = nop) -> 'file.File':
"""
Create a new file on the remote (and pull it)
:param filename: File name
:param contents: File contents
:param name: Displayed file name
:param description: File description
:param parent_folder: Folder that will contain the file
:param progress: Function to call on upload progress updates
:return: Reference to the created file
:raises: RuntimeError if there was an error
"""
folder_handle = parent_folder._handle if parent_folder is not None else None
value = core.BNCollaborationProjectCreateFile(self._handle, filename, contents, len(contents), name, description, folder_handle, util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
result = file.File(value)
core.BNCollaborationFileFree(value)
return result
[docs] def push_file(self, file: 'file.File', extra_fields: Optional[Dict[str, str]] = None):
"""
Push an updated File object to the Remote
:param file: File object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNCollaborationProjectPushFile(self._handle, file._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())
[docs] def delete_file(self, file: 'file.File'):
"""
Delete a file from the remote
:param file: File to delete
:raises: RuntimeError if there was an error
"""
if not core.BNCollaborationProjectDeleteFile(self._handle, file._handle):
raise RuntimeError(util._last_error())
@property
def folders(self) -> List['folder.Folder']:
"""
Get the list of folders in this project.
.. note:: If folders have not been pulled, they will be pulled upon calling this.
:return: List of Folder objects
:raises: RuntimeError if there was an error pulling folders
"""
if not self.has_pulled_folders:
self.pull_folders()
count = ctypes.c_size_t()
value = core.BNCollaborationProjectGetFolders(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(folder.Folder(value[i]))
core.BNCollaborationFolderFreeList(value, count.value)
return result
[docs] def get_folder_by_id(self, id: str) -> Optional['folder.Folder']:
"""
Get a specific Folder in the Project by its id
.. note:: If folders have not been pulled, they will be pulled upon calling this.
:param id: Id of Folder
:return: Folder object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling folders
"""
if not self.has_pulled_folders:
self.pull_folders()
value = core.BNCollaborationProjectGetFolderById(self._handle, id)
if value is None:
return None
result = folder.Folder(value)
core.BNCollaborationFolderFree(value)
return result
[docs] def pull_folders(self, progress: 'util.ProgressFuncType' = nop):
"""
Pull the list of folders from the Remote.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling folders
"""
if not core.BNCollaborationProjectPullFolders(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs] def create_folder(self, name: str, description: str, parent: Optional['folder.Folder'] = None, progress: 'util.ProgressFuncType' = nop) -> 'folder.Folder':
"""
Create a new folder on the remote (and pull it)
:param name: Displayed folder name
:param description: Folder description
:param parent: Parent folder (optional)
:param progress: Function to call on upload progress updates
:return: Reference to the created folder
:raises: RuntimeError if there was an error pulling folders
"""
parent_handle = parent._handle if parent is not None else None
value = core.BNCollaborationProjectCreateFolder(self._handle, name, description, parent_handle, util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
result = folder.Folder(value)
core.BNCollaborationFolderFree(value)
return result
[docs] def push_folder(self, folder: 'folder.Folder', extra_fields: Optional[Dict[str, str]] = None):
"""
Push an updated Folder object to the Remote
:param folder: Folder object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNCollaborationProjectPushFolder(self._handle, folder._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())
[docs] def delete_folder(self, folder: 'folder.Folder'):
"""
Delete a folder from the remote
:param folder: Folder to delete
:raises: RuntimeError if there was an error
"""
if not core.BNCollaborationProjectDeleteFolder(self._handle, folder._handle):
raise RuntimeError(util._last_error())
@property
def group_permissions(self) -> List['permission.Permission']:
"""
Get the list of group permissions in this project.
.. note:: If group permissions have not been pulled, they will be pulled upon calling this.
:return: List of Permission objects
:raises: RuntimeError if there was an error pulling group permissions
"""
if not self.has_pulled_group_permissions:
self.pull_group_permissions()
count = ctypes.c_size_t()
value = core.BNCollaborationProjectGetGroupPermissions(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(permission.Permission(value[i]))
core.BNCollaborationPermissionFreeList(value, count.value)
return result
@property
def user_permissions(self) -> List['permission.Permission']:
"""
Get the list of user permissions in this project.
.. note:: If user permissions have not been pulled, they will be pulled upon calling this.
:return: List of Permission objects
:raises: RuntimeError if there was an error pulling user permissions
"""
if not self.has_pulled_user_permissions:
self.pull_user_permissions()
count = ctypes.c_size_t()
value = core.BNCollaborationProjectGetUserPermissions(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(permission.Permission(value[i]))
core.BNCollaborationPermissionFreeList(value, count.value)
return result
[docs] def get_permission_by_id(self, id: str) -> Optional['permission.Permission']:
"""
Get a specific permission in the Project by its id
.. note:: If group or user permissions have not been pulled, they will be pulled upon calling this.
:param id: Id of Permission
:return: Permission object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling permissions
"""
if not self.has_pulled_user_permissions:
self.pull_user_permissions()
if not self.has_pulled_group_permissions:
self.pull_group_permissions()
value = core.BNCollaborationProjectGetPermissionById(self._handle, id)
if value is None:
return None
result = permission.Permission(value)
core.BNCollaborationPermissionFree(value)
return result
[docs] def pull_group_permissions(self, progress: 'util.ProgressFuncType' = nop):
"""
Pull the list of group permissions from the Remote.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling permissions
"""
if not core.BNCollaborationProjectPullGroupPermissions(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs] def pull_user_permissions(self, progress: 'util.ProgressFuncType' = nop):
"""
Pull the list of user permissions from the Remote.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling permissions
"""
if not core.BNCollaborationProjectPullUserPermissions(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs] def create_group_permission(self, group_id: int, level: PermissionLevel, progress: 'util.ProgressFuncType' = nop) -> 'permission.Permission':
"""
Create a new group permission on the remote (and pull it)
:param group_id: Group id
:param level: Permission level
:param progress: Function to call on upload progress updates
:return: Reference to the created permission
:raises: RuntimeError if there was an error pulling permissions
"""
value = core.BNCollaborationProjectCreateGroupPermission(self._handle, group_id, level, util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
result = permission.Permission(value)
core.BNCollaborationPermissionFree(value)
return result
[docs] def create_user_permission(self, user_id: str, level: PermissionLevel, progress: 'util.ProgressFuncType' = nop) -> 'permission.Permission':
"""
Create a new user permission on the remote (and pull it)
:param user_id: User id
:param level: Permission level
:param progress: Function to call on upload progress updates
:return: Reference to the created permission
:raises: RuntimeError if there was an error pulling permissions
"""
value = core.BNCollaborationProjectCreateUserPermission(self._handle, user_id, level, util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
result = permission.Permission(value)
core.BNCollaborationPermissionFree(value)
return result
[docs] def push_permission(self, permission: 'permission.Permission', extra_fields: Optional[Dict[str, str]] = None):
"""
Push project permissions to the remote
:param permission: Permission object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNCollaborationProjectPushPermission(self._handle, permission._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())
[docs] def delete_permission(self, permission: 'permission.Permission'):
"""
Delete a permission from the remote
:param permission: Permission to delete
:raises: RuntimeError if there was an error
"""
if not core.BNCollaborationProjectDeletePermission(self._handle, permission._handle):
raise RuntimeError(util._last_error())
[docs] def can_user_view(self, username: str) -> bool:
"""
Determine if a user is in any of the view/edit/admin groups
:param username: Username of user to check
:return: True if they are in any of those groups
:raises: RuntimeError if there was an error
"""
return core.BNCollaborationProjectCanUserView(self._handle, username)
[docs] def can_user_edit(self, username: str) -> bool:
"""
Determine if a user is in any of the edit/admin groups
:param username: Username of user to check
:return: True if they are in any of those groups
:raises: RuntimeError if there was an error
"""
return core.BNCollaborationProjectCanUserEdit(self._handle, username)
[docs] def can_user_admin(self, username: str) -> bool:
"""
Determine if a user is in the admin group
:param username: Username of user to check
:return: True if they are in any of those groups
:raises: RuntimeError if there was an error
"""
return core.BNCollaborationProjectCanUserAdmin(self._handle, username)
[docs] def upload_new_file(
self,
target: Union[str, PathLike, 'binaryninja.BinaryView', 'binaryninja.FileMetadata'],
parent_folder: Optional[folder.Folder] = None,
progress: 'util.ProgressFuncType' = nop,
open_view_options = None) -> 'file.File':
"""
Upload a file to the project, creating a new File and pulling it
:param target: Path to file on disk or BinaryView/FileMetadata object of
already-opened file
:param parent_folder: Parent folder to place the uploaded file in
:param progress: Function to call for progress updates
:return: Created File object
:raises: RuntimeError if there was an error
"""
if type(target) is binaryninja.FileMetadata:
if target.has_database:
return databasesync.upload_database(target, self, parent_folder=parent_folder, progress=progress)
else:
target = target.raw
if type(target) is binaryninja.BinaryView:
maybe_bv = target
target = maybe_bv.file.original_filename
else:
# Convert PathLike to string
if isinstance(target, (Path,)):
target = target.resolve()
target = str(target)
# Argument is a path, try opening it:
try:
if open_view_options is None:
open_view_options = {}
maybe_bv = binaryninja.load(
target, progress_func=util.split_progress(progress, 0, [0.25, 0.75]), **open_view_options)
except Exception as e:
raise RuntimeError("Could not upload view: " + str(e))
with maybe_bv as bv:
# Can't open, can't upload
if not bv:
raise RuntimeError("Could not open file at path for uploading")
# If it is backed by a database, just upload that
metadata = bv.file
if metadata.has_database:
uploaded = databasesync.upload_database(
metadata, self, parent_folder=parent_folder, progress=util.split_progress(progress, 1, [0.25, 0.75]))
return uploaded
# Ported from remotebrowser.cpp (original comments):
# TODO: This is not efficient at all!
# No db exists, so create one before uploading so we always have a root snapshot
# on the server
# - Load file into memory
# - Make temp path for temp database
# - Make temp database with file
# - UploadDatabase copies the temp database and makes its own
# - Delete temp database
# - Now you don't have an empty remote file
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / Path(target).name
binaryninja.log_info(f'Saving temporary database at {db_path}')
# Save bndb first to create database
if not metadata.create_database(
str(db_path), util.split_progress(progress, 1, [0.25, 0.25, 0.25, 0.25])):
raise RuntimeError("Could not save database for temporary path")
if not metadata.save_auto_snapshot(
util.split_progress(progress, 2, [0.25, 0.25, 0.25, 0.25])):
raise RuntimeError("Could not create initial snapshot for upload")
metadata.filename = str(db_path)
uploaded = databasesync.upload_database(
metadata, self, parent_folder=parent_folder, progress=util.split_progress(progress, 3, [0.25, 0.25, 0.25, 0.25]))
return uploaded