Source code for binaryninja.collaboration.databasesync

import ctypes
from typing import Optional

import binaryninja

from . import _collaboration as core
from . import file, folder, merge, project, remote, snapshot, util

"""
Database syncing and choreography between BN api and remote
"""


[docs] def nop(*args, **kwargs): return True
[docs] def default_project_path(project_: 'project.RemoteProject') -> str: """ Get the default directory path for a remote Project. This is based off the Setting for collaboration.directory, the project's id, and the project's remote's id. :param project_: Remote Project :return: Default project path :raises RuntimeError: If there was an error """ value = core.BNCollaborationDefaultProjectPath(project_._handle) if value is None: raise RuntimeError(util._last_error()) return value
[docs] def default_file_path(file_: 'file.RemoteFile') -> str: """ Get the default filepath for a remote File. This is based off the Setting for collaboration.directory, the file's id, the file's project's id, and the file's remote's id. :param file_: Remote File :return: Default file path :raises RuntimeError: If there was an error """ value = core.BNCollaborationDefaultFilePath(file_._handle) if value is None: raise RuntimeError(util._last_error()) return value
[docs] def download_file(file_: 'file.RemoteFile', db_path: str, progress: 'util.ProgressFuncType' = nop) -> 'binaryninja.FileMetadata': """ Download a file from its remote, saving all snapshots to a database in the specified location. Returns a FileContext for opening the file later. :param file_: Remote File to download and open :param db_path: File path for saved database :param progress: Function to call for progress updates :return: FileContext for opening :raises RuntimeError: If there was an error """ value = core.BNCollaborationDownloadFile(file_._handle, db_path, util.wrap_progress(progress), None) if value is None: raise RuntimeError(util._last_error()) return binaryninja.FileMetadata(handle=ctypes.cast(value, ctypes.POINTER(binaryninja.core.BNKeyValueStore)))
[docs] def upload_database(metadata: 'binaryninja.FileMetadata', project: 'project.RemoteProject', parent_folder: Optional['folder.RemoteFolder'] = None, progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop) -> 'file.RemoteFile': """ Upload a file, with database, to the remote under the given project :param metadata: Local file with database :param project: Remote project under which to place the new file :param progress: Function to call for progress updates :param name_changeset: Function to call for naming a pushed changeset, if necessary :param parent_folder: Optional parent folder in which to place this file :return: Remote File created :raises RuntimeError: If there was an error """ folder_handle = parent_folder._handle if parent_folder is not None else None value = core.BNCollaborationUploadDatabase(ctypes.cast(metadata.handle, ctypes.POINTER(core.BNFileMetadata)), project._handle, folder_handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None) if value is None: raise RuntimeError(util._last_error()) result = file.RemoteFile(handle=value) core.BNCollaborationFileFree(value) return result
[docs] def is_collaboration_database(database: binaryninja.Database) -> bool: """ Test if a database is valid for use in collaboration :param database: Database to test :return: True if valid """ return core.BNCollaborationIsCollaborationDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)))
[docs] def get_remote_for_local_database(database: binaryninja.Database) -> Optional['remote.Remote']: """ Get the Remote for a Database :param database: BN database, potentially with collaboration metadata :return: Remote from one of the connected remotes, or None if not found :raises RuntimeError: If there was an error """ value = ctypes.POINTER(core.BNCollaborationRemote)() if not core.BNCollaborationGetRemoteForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value): raise RuntimeError(util._last_error()) if not value: return None result = remote.Remote(handle=value) core.BNCollaborationRemoteFree(value) return result
[docs] def get_remote_project_for_local_database(database: binaryninja.Database) -> Optional['project.RemoteProject']: """ Get the Remote Project for a Database :param database: BN database, potentially with collaboration metadata :return: Remote project from one of the connected remotes, or None if not found or if projects are not pulled :raises RuntimeError: If there was an error """ value = ctypes.POINTER(core.BNCollaborationProject)() if not core.BNCollaborationGetRemoteProjectForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value): raise RuntimeError(util._last_error()) if not value: return None result = project.RemoteProject(handle=value) core.BNCollaborationProjectFree(value) return result
[docs] def get_remote_file_for_local_database(database: binaryninja.Database) -> Optional['file.RemoteFile']: """ Get the Remote File for a Database :param database: BN database, potentially with collaboration metadata :return: Remote file from one of the connected remotes, or None if not found or if files are not pulled :raises RuntimeError: If there was an error """ value = ctypes.POINTER(core.BNCollaborationFile)() if not core.BNCollaborationGetRemoteFileForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value): raise RuntimeError(util._last_error()) if not value: return None result = file.RemoteFile(handle=value) core.BNCollaborationFileFree(value) return result
[docs] def assign_snapshot_map(local_snapshot: binaryninja.Snapshot, remote_snapshot: snapshot.Snapshot): """ Add a snapshot to the id map in a database :param local_snapshot: Local snapshot, will use this snapshot's database :param remote_snapshot: Remote snapshot :raises RuntimeError: If there was an error """ if not core.BNCollaborationAssignSnapshotMap(ctypes.cast(local_snapshot.handle, ctypes.POINTER(core.BNSnapshot)), remote_snapshot._handle): raise RuntimeError(util._last_error())
[docs] def get_remote_snapshot_for_local(snap: binaryninja.Snapshot) -> Optional['snapshot.Snapshot']: """ Get the remote snapshot associated with a local snapshot (if it exists) :param snap: Local snapshot :return: Remote snapshot if it exists, or None if not :raises RuntimeError: If there was an error """ value = ctypes.POINTER(core.BNCollaborationSnapshot)() if not core.BNCollaborationGetRemoteSnapshotFromLocal(ctypes.cast(snap.handle, ctypes.POINTER(core.BNSnapshot)), value): raise RuntimeError(util._last_error()) if not value: return None result = snapshot.Snapshot(handle=value) core.BNCollaborationSnapshotFree(value) return result
[docs] def get_local_snapshot_for_remote(snapshot: snapshot.Snapshot, database: binaryninja.Database) -> Optional['binaryninja.Snapshot']: """ Get the local snapshot associated with a remote snapshot (if it exists) :param snapshot: Remote snapshot :param database: Local database to search :return: Snapshot reference if it exists, or None reference if not :raises RuntimeError: If there was an error """ value = ctypes.POINTER(core.BNSnapshot)() if not core.BNCollaborationGetLocalSnapshotFromRemote(snapshot._handle, ctypes.cast(database.handle, ctypes.POINTER(core.BNSnapshot)), value): raise RuntimeError(util._last_error()) if not value: return None return binaryninja.Snapshot(handle=ctypes.cast(value, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs] def sync_database(database: binaryninja.Database, file_: 'file.RemoteFile', conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop): """ Completely sync a database, pushing/pulling/merging/applying changes :param database: Database to sync :param file_: File to sync with :param conflict_handler: Function to call to resolve snapshot conflicts :param progress: Function to call for progress updates :param name_changeset: Function to call for naming a pushed changeset, if necessary :raises RuntimeError: If there was an error (or the operation was cancelled) """ if type(conflict_handler) == merge.ConflictHandler: handler = conflict_handler else: handler = util.wrap_conflict_handler(conflict_handler) if not core.BNCollaborationSyncDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, handler._handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None): raise RuntimeError(util._last_error())
[docs] def pull_database(database: binaryninja.Database, file_: 'file.RemoteFile', conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop): """ Pull updated snapshots from the remote. Merge local changes with remote changes and potentially create a new snapshot for unsaved changes, named via name_changeset. :param database: Database to pull :param file_: Remote File to pull to :param conflict_handler: Function to call to resolve snapshot conflicts :param progress: Function to call for progress updates :param name_changeset: Function to call for naming a pushed changeset, if necessary :raises RuntimeError: If there was an error (or the operation was cancelled) """ if type(conflict_handler) == merge.ConflictHandler: handler = conflict_handler else: handler = util.wrap_conflict_handler(conflict_handler) if not core.BNCollaborationPullDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, handler._handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None): raise RuntimeError(util._last_error())
[docs] def merge_database(database: binaryninja.Database, conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop): """ Merge all leaf snapshots in a database down to a single leaf snapshot. :param database: Database to merge :param conflict_handler: Function to call for progress updates :param progress: Function to call to resolve snapshot conflicts :raises RuntimeError: If there was an error (or the operation was cancelled) """ if type(conflict_handler) == merge.ConflictHandler: handler = conflict_handler else: handler = util.wrap_conflict_handler(conflict_handler) if not core.BNCollaborationMergeDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), handler._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def push_database(database: binaryninja.Database, file_: 'file.RemoteFile', progress: 'util.ProgressFuncType' = nop): """ Push locally added snapshots to the remote :param database: Database to push :param file_: Remote File to push to :param progress: Function to call for progress updates :raises RuntimeError: If there was an error (or the operation was cancelled) """ if not core.BNCollaborationPushDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, util.wrap_progress(progress), None): raise RuntimeError(util._last_error())
[docs] def dump_database(database: binaryninja.Database): """ Print debug information about a database to stdout :param database: Database to dump :raises RuntimeError: If there was an error """ if not core.BNCollaborationDumpDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase))): raise RuntimeError(util._last_error())
[docs] def ignore_snapshot(database: binaryninja.Database, snapshot: binaryninja.Snapshot): """ Ignore a snapshot from database syncing operations :param database: Parent database :param snapshot: Snapshot to ignore :raises RuntimeError: If there was an error """ if not core.BNCollaborationIgnoreSnapshot(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot))): raise RuntimeError(util._last_error())
[docs] def is_snapshot_ignored(database: binaryninja.Database, snapshot: binaryninja.Snapshot) -> bool: """ Test if a snapshot is ignored from the database :param database: Parent database :param snapshot: Snapshot to test :return: True if snapshot should be ignored :raises RuntimeError: If there was an error """ return core.BNCollaborationIsSnapshotIgnored(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)))
[docs] def get_snapshot_author(database: binaryninja.Database, snapshot: binaryninja.Snapshot) -> Optional[str]: """ Get the remote author of a local snapshot :param database: Parent database :param snapshot: Snapshot to query :return: Remote author, or None if one could not be determined :raises RuntimeError: If there was an error """ value = ctypes.POINTER(ctypes.c_char_p)() if not core.BNCollaborationGetSnapshotAuthor(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)), value): raise RuntimeError(util._last_error()) if value is None: return None return core.pyNativeStr(value)
[docs] def set_snapshot_author(database: binaryninja.Database, snapshot: binaryninja.Snapshot, author: str): """ Set the remote author of a local snapshot (does not upload) :param database: Parent database :param snapshot: Snapshot to edit :param author: Target author :raises RuntimeError: If there was an error """ if not core.BNCollaborationSetSnapshotAuthor(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)), author): raise RuntimeError(util._last_error())