import datetime
from pathlib import Path
import binaryninja
from . import _collaboration as core
from . import databasesync
from . import remote
from . import project
from . import folder as _folder
from . import snapshot
from . import util
from typing import List, Tuple, Optional, Dict, Union
import ctypes
[docs]def nop(*args, **kwargs):
return True
[docs]class File:
"""
Class representing a remote analysis database. It controls the various
snapshots and raw file contents associated with the analysis.
"""
def __init__(self, handle):
self._handle = core.BNCollaborationFileCopy(handle)
def __del__(self):
core.BNCollaborationFileFree(self._handle)
def __eq__(self, other):
if type(other) is not File:
return False
return other.id == self.id
def __str__(self):
path = self.name
parent = self.folder
while parent is not None:
path = parent.name + '/' + path
parent = parent.parent
return f'<file: {self.remote.name}/{self.project.name}/{path}>'
def __repr__(self):
path = self.name
parent = self.folder
while parent is not None:
path = parent.name + '/' + path
parent = parent.parent
return f'<file: {self.remote.name}/{self.project.name}/{path}>'
[docs] @staticmethod
def get_for_local_database(database: 'binaryninja.Database') -> Optional['File']:
"""
Look up the remote File for a local database, or None if there is no matching
remote File found.
See :func:`get_for_bv` to load from a BinaryView.
:param database: Local database
:return: Remote File object
:rtype: File or None
"""
remote = databasesync.get_remote_for_local_database(database)
if not remote.has_pulled_projects:
remote.pull_projects()
project = databasesync.get_remote_project_for_local_database(database)
if not project.has_pulled_files:
project.pull_files()
return databasesync.get_remote_file_for_local_database(database)
[docs] @staticmethod
def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['File']:
"""
Look up the remote File for a local BinaryView, or None if there is no matching
remote File found.
:param bv: Local BinaryView
:return: Remote File object
:rtype: File or None
"""
if not bv.file.has_database:
return None
return File.get_for_local_database(bv.file.database)
@property
def project(self) -> 'project.Project':
"""
Owning Project
:return: Project object
"""
value = core.BNCollaborationFileGetProject(self._handle)
if value is None:
raise RuntimeError(util._last_error())
result = project.Project(handle=value)
core.BNCollaborationProjectFree(value)
return result
@property
def remote(self) -> 'remote.Remote':
"""
Owning Remote
:return: Remote object
"""
value = core.BNCollaborationFileGetRemote(self._handle)
if value is None:
raise RuntimeError(util._last_error())
result = remote.Remote(handle=value)
core.BNCollaborationRemoteFree(value)
return result
@property
def folder(self) -> Optional['_folder.Folder']:
"""
Parent folder, if one exists. None if this is in the root of the project.
:return: Folder object or None
"""
if not self.project.has_pulled_folders:
self.project.pull_folders()
value = core.BNCollaborationFileGetFolder(self._handle)
if value is None:
return None
result = _folder.Folder(handle=value)
core.BNCollaborationFolderFree(value)
return result
@folder.setter
def folder(self, folder: Optional['_folder.Folder']):
"""
Set the parent folder of a file.
:param folder: New parent folder, or None to move file to the root of the project.
"""
folder_handle = folder._handle if folder is not None else None
if not core.BNCollaborationFileSetFolder(self._handle, folder_handle):
raise RuntimeError(util._last_error())
@property
def url(self) -> str:
"""
Web api endpoint URL
:return: URL string
"""
return core.BNCollaborationFileGetUrl(self._handle)
@property
def chat_log_url(self) -> str:
"""
Chat log api endpoint URL
:return: URL string
"""
return core.BNCollaborationFileGetChatLogUrl(self._handle)
@property
def id(self) -> str:
"""
Unique id
:return: Id string
"""
return core.BNCollaborationFileGetId(self._handle)
@property
def created(self) -> datetime.datetime:
"""
Created date of the file
:return: Date object
"""
return datetime.datetime.utcfromtimestamp(core.BNCollaborationFileGetCreated(self._handle))
@property
def last_modified(self) -> datetime.datetime:
"""
Last modified date of the file
:return: Date object
"""
return datetime.datetime.utcfromtimestamp(core.BNCollaborationFileGetLastModified(self._handle))
@property
def last_snapshot(self) -> datetime.datetime:
"""
Date of last snapshot in the file
:return: Date object
"""
return datetime.datetime.utcfromtimestamp(core.BNCollaborationFileGetLastSnapshot(self._handle))
@property
def last_snapshot_by(self) -> str:
"""
Username of user who pushed the last snapshot in the file
:return: Username string
"""
return core.BNCollaborationFileGetLastSnapshotBy(self._handle)
@property
def hash(self) -> str:
"""
Hash of file contents (no algorithm guaranteed)
:return: Hash string
"""
return core.BNCollaborationFileGetHash(self._handle)
@property
def name(self) -> str:
"""
Displayed name of file
:return: Name string
"""
return core.BNCollaborationFileGetName(self._handle)
@name.setter
def name(self, value: str):
"""
Set the display name of the file. You will need to push the file to update the remote version.
:param value: New name
"""
if not core.BNCollaborationFileSetName(self._handle, value):
raise RuntimeError(util._last_error())
@property
def description(self) -> str:
"""
Description of the file
:return: Description string
"""
return core.BNCollaborationFileGetDescription(self._handle)
@description.setter
def description(self, value: str):
"""
Set the description of the file. You will need to push the file to update the remote version.
:param description: New description
"""
if not core.BNCollaborationFileSetDescription(self._handle, value):
raise RuntimeError(util._last_error())
@property
def size(self) -> int:
"""
Size of raw content of file, in bytes
:return: Size in bytes
"""
return core.BNCollaborationFileGetSize(self._handle)
@property
def default_bndb_path(self) -> str:
"""
Get the default filepath for a remote File. This is based off the Setting for
collaboration.directory, the file's id, the file's project's id, and the file's
remote's id.
:return: Default file path
:rtype: str
"""
return databasesync.default_file_path(self)
@property
def has_pulled_snapshots(self) -> bool:
"""
If the file has pulled the snapshots yet
:return: True if they have been pulled
"""
return core.BNCollaborationFileHasPulledSnapshots(self._handle)
@property
def snapshots(self) -> List['snapshot.Snapshot']:
"""
Get the list of snapshots in this file.
.. note:: If snapshots have not been pulled, they will be pulled upon calling this.
:return: List of Snapshot objects
:raises: RuntimeError if there was an error pulling snapshots
"""
if not self.has_pulled_snapshots:
self.pull_snapshots()
count = ctypes.c_size_t()
value = core.BNCollaborationFileGetSnapshots(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(snapshot.Snapshot(value[i]))
core.BNCollaborationSnapshotFreeList(value, count.value)
return result
[docs] def get_snapshot_by_id(self, id: str) -> Optional['snapshot.Snapshot']:
"""
Get a specific Snapshot in the File by its id
.. note:: If snapshots have not been pulled, they will be pulled upon calling this.
:param id: Id of Snapshot
:return: Snapshot object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling snapshots
"""
if not self.has_pulled_snapshots:
self.pull_snapshots()
value = core.BNCollaborationFileGetSnapshotById(self._handle, id)
if value is None:
return None
result = snapshot.Snapshot(value)
core.BNCollaborationSnapshotFree(value)
return result
[docs] def pull_snapshots(self, progress: 'util.ProgressFuncType' = nop):
"""
Pull the list of Snapshots from the Remote.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling snapshots
"""
if not core.BNCollaborationFilePullSnapshots(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs] def create_snapshot(self, name: str, contents: bytes, analysis_cache_contents: bytes, file: bytes, parent_ids: List[str], progress: 'util.ProgressFuncType' = nop) -> 'snapshot.Snapshot':
"""
Create a new snapshot on the remote (and pull it)
:param name: Snapshot name
:param contents: Snapshot contents
:param analysis_cache_contents: Contents of analysis cache of snapshot
:param file: New file contents (if contents changed)
:param parent_ids: List of ids of parent snapshots (or empty if this is a root snapshot)
:param progress: Function to call on progress updates
:return: Reference to the created snapshot
:raises: RuntimeError if there was an error
"""
array = (ctypes.c_char_p * len(parent_ids))()
for i in range(len(parent_ids)):
array[i] = parent_ids[i]
value = core.BNCollaborationFileCreateSnapshot(self._handle, name, contents, len(contents), analysis_cache_contents, len(analysis_cache_contents), file, len(file), array, len(parent_ids), util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
result = snapshot.Snapshot(value)
core.BNCollaborationSnapshotFree(value)
return result
[docs] def delete_snapshot(self, snapshot: 'snapshot.Snapshot'):
"""
Delete a snapshot from the remote
:param snapshot: Snapshot to delete
:raises: RuntimeError if there was an error
"""
if not core.BNCollaborationFileDeleteSnapshot(self._handle, snapshot._handle):
raise RuntimeError(util._last_error())
[docs] def download(self, progress: 'util.ProgressFuncType' = nop) -> bytes:
"""
Download the contents of a remote file
:param progress: Function to call on progress updates
:return: Contents of the file
:raises: RuntimeError if there was an error
"""
data = (ctypes.POINTER(ctypes.c_ubyte))()
size = ctypes.c_size_t()
value = core.BNCollaborationFileDownload(self._handle, util.wrap_progress(progress), None, data, size)
if not value:
raise RuntimeError(util._last_error())
return bytes(ctypes.cast(data, ctypes.POINTER(ctypes.c_uint8 * size.value)).contents)
[docs] def download_to_bndb(self, path: Optional[str] = None, progress: 'util.ProgressFuncType' = nop) -> binaryninja.FileMetadata:
"""
Download a remote file and save it to a bndb at the given path.
This calls databasesync.download_file and self.sync to fully prepare the bndb.
:param path: Path to new bndb to create
:param progress: Function to call on progress updates
:return: Constructed FileMetadata object
:raises: RuntimeError if there was an error
"""
if path is None:
path = self.default_bndb_path
file = databasesync.download_file(self, path, util.split_progress(progress, 0, [0.5, 0.5]))
self.sync(
file.database, lambda conflicts: False, util.split_progress(progress, 1, [0.5, 0.5]))
return file
[docs] def sync(self, bv_or_db: Union['binaryninja.BinaryView', 'binaryninja.Database'], conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop):
"""
Completely sync a file, pushing/pulling/merging/applying changes
:param bv_or_db: Binary view or database to sync with
:param conflict_handler: Function to call to resolve snapshot conflicts
:param name_changeset: Function to call for naming a pushed changeset, if necessary
:param progress: Function to call for progress updates
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(bv_or_db) == binaryninja.BinaryView:
if not bv_or_db.file.has_database:
raise RuntimeError("Cannot sync non-database view")
db = bv_or_db.file.database
else:
db = bv_or_db
databasesync.sync_database(db, self, conflict_handler, progress, name_changeset)
[docs] def pull(self, bv_or_db: Union['binaryninja.BinaryView', 'binaryninja.Database'], conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop):
"""
Pull updated snapshots from the remote. Merge local changes with remote changes and
potentially create a new snapshot for unsaved changes, named via name_changeset.
:param bv_or_db: Binary view or database to sync with
:param conflict_handler: Function to call to resolve snapshot conflicts
:param name_changeset: Function to call for naming a pushed changeset, if necessary
:param progress: Function to call for progress updates
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(bv_or_db) == binaryninja.BinaryView:
if not bv_or_db.file.has_database:
raise RuntimeError("Cannot pull non-database view")
db = bv_or_db.file.database
else:
db = bv_or_db
databasesync.pull_database(db, self, conflict_handler, progress, name_changeset)
[docs] def push(self, bv_or_db: Union['binaryninja.BinaryView', 'binaryninja.Database'], progress: 'util.ProgressFuncType' = nop):
"""
Push locally added snapshots to the remote
:param bv_or_db: Binary view or database to sync with
:param progress: Function to call for progress updates
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(bv_or_db) == binaryninja.BinaryView:
if not bv_or_db.file.has_database:
raise RuntimeError("Cannot pull non-database view")
db = bv_or_db.file.database
else:
db = bv_or_db
databasesync.push_database(db, self, progress)