import ctypes
from typing import Optional, Union
import binaryninja
from . import _collaboration as core
from . import util
from . import remote
from . import project
from . import file
from . import folder
from . import snapshot
from . import merge
"""
Database syncing and choreography between BN api and remote
"""
[docs]def nop(*args, **kwargs):
return True
[docs]def default_project_path(project_: 'project.Project') -> str:
"""
Get the default directory path for a remote Project. This is based off the Setting for
collaboration.directory, the project's id, and the project's remote's id.
:param project_: Remote Project
:return: Default project path
:raises RuntimeError: If there was an error
"""
value = core.BNCollaborationDefaultProjectPath(project_._handle)
if value is None:
raise RuntimeError(util._last_error())
return value
[docs]def default_file_path(file_: 'file.File') -> str:
"""
Get the default filepath for a remote File. This is based off the Setting for
collaboration.directory, the file's id, the file's project's id, and the file's
remote's id.
:param file_: Remote File
:return: Default file path
:raises RuntimeError: If there was an error
"""
value = core.BNCollaborationDefaultFilePath(file_._handle)
if value is None:
raise RuntimeError(util._last_error())
return value
[docs]def download_file(file_: 'file.File', db_path: str, progress: 'util.ProgressFuncType' = nop) -> 'binaryninja.FileMetadata':
"""
Download a file from its remote, saving all snapshots to a database in the
specified location. Returns a FileContext for opening the file later.
:param file_: Remote File to download and open
:param db_path: File path for saved database
:param progress: Function to call for progress updates
:return: FileContext for opening
:raises RuntimeError: If there was an error
"""
value = core.BNCollaborationDownloadFile(file_._handle, db_path, util.wrap_progress(progress), None)
if value is None:
raise RuntimeError(util._last_error())
return binaryninja.FileMetadata(handle=ctypes.cast(value, ctypes.POINTER(binaryninja.core.BNKeyValueStore)))
[docs]def upload_database(metadata: 'binaryninja.FileMetadata', project: 'project.Project', parent_folder: Optional['folder.Folder'] = None, progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop) -> 'file.File':
"""
Upload a file, with database, to the remote under the given project
:param metadata: Local file with database
:param project: Remote project under which to place the new file
:param progress: Function to call for progress updates
:param name_changeset: Function to call for naming a pushed changeset, if necessary
:param parent_folder: Optional parent folder in which to place this file
:return: Remote File created
:raises RuntimeError: If there was an error
"""
folder_handle = parent_folder._handle if parent_folder is not None else None
value = core.BNCollaborationUploadDatabase(ctypes.cast(metadata.handle, ctypes.POINTER(core.BNFileMetadata)), project._handle, folder_handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None)
if value is None:
raise RuntimeError(util._last_error())
result = file.File(handle=value)
core.BNCollaborationFileFree(value)
return result
[docs]def is_collaboration_database(database: binaryninja.Database) -> bool:
"""
Test if a database is valid for use in collaboration
:param database: Database to test
:return: True if valid
"""
return core.BNCollaborationIsCollaborationDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)))
[docs]def get_remote_for_local_database(database: binaryninja.Database) -> Optional['remote.Remote']:
"""
Get the Remote for a Database
:param database: BN database, potentially with collaboration metadata
:return: Remote from one of the connected remotes, or None if not found
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(core.BNCollaborationRemote)()
if not core.BNCollaborationGetRemoteForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value):
raise RuntimeError(util._last_error())
if not value:
return None
result = remote.Remote(handle=value)
core.BNCollaborationRemoteFree(value)
return result
[docs]def get_remote_project_for_local_database(database: binaryninja.Database) -> Optional['project.Project']:
"""
Get the Remote Project for a Database
:param database: BN database, potentially with collaboration metadata
:return: Remote project from one of the connected remotes, or None if not found
or if projects are not pulled
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(core.BNCollaborationProject)()
if not core.BNCollaborationGetRemoteProjectForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value):
raise RuntimeError(util._last_error())
if not value:
return None
result = project.Project(handle=value)
core.BNCollaborationProjectFree(value)
return result
[docs]def get_remote_file_for_local_database(database: binaryninja.Database) -> Optional['file.File']:
"""
Get the Remote File for a Database
:param database: BN database, potentially with collaboration metadata
:return: Remote file from one of the connected remotes, or None if not found
or if files are not pulled
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(core.BNCollaborationFile)()
if not core.BNCollaborationGetRemoteFileForLocalDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), value):
raise RuntimeError(util._last_error())
if not value:
return None
result = file.File(handle=value)
core.BNCollaborationFileFree(value)
return result
[docs]def assign_snapshot_map(local_snapshot: binaryninja.Snapshot, remote_snapshot: snapshot.Snapshot):
"""
Add a snapshot to the id map in a database
:param local_snapshot: Local snapshot, will use this snapshot's database
:param remote_snapshot: Remote snapshot
:raises RuntimeError: If there was an error
"""
if not core.BNCollaborationAssignSnapshotMap(ctypes.cast(local_snapshot.handle, ctypes.POINTER(core.BNSnapshot)), remote_snapshot._handle):
raise RuntimeError(util._last_error())
[docs]def get_remote_snapshot_for_local(snap: binaryninja.Snapshot) -> Optional['snapshot.Snapshot']:
"""
Get the remote snapshot associated with a local snapshot (if it exists)
:param snap: Local snapshot
:return: Remote snapshot if it exists, or None if not
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(core.BNCollaborationSnapshot)()
if not core.BNCollaborationGetRemoteSnapshotFromLocal(ctypes.cast(snap.handle, ctypes.POINTER(core.BNSnapshot)), value):
raise RuntimeError(util._last_error())
if not value:
return None
result = snapshot.Snapshot(handle=value)
core.BNCollaborationSnapshotFree(value)
return result
[docs]def get_local_snapshot_for_remote(snapshot: snapshot.Snapshot, database: binaryninja.Database) -> Optional['binaryninja.Snapshot']:
"""
Get the local snapshot associated with a remote snapshot (if it exists)
:param snapshot: Remote snapshot
:param database: Local database to search
:return: Snapshot reference if it exists, or None reference if not
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(core.BNSnapshot)()
if not core.BNCollaborationGetLocalSnapshotFromRemote(snapshot._handle, ctypes.cast(database.handle, ctypes.POINTER(core.BNSnapshot)), value):
raise RuntimeError(util._last_error())
if not value:
return None
return binaryninja.Snapshot(handle=ctypes.cast(value, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs]def sync_database(database: binaryninja.Database, file_: 'file.File', conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop):
"""
Completely sync a database, pushing/pulling/merging/applying changes
:param database: Database to sync
:param file_: File to sync with
:param conflict_handler: Function to call to resolve snapshot conflicts
:param progress: Function to call for progress updates
:param name_changeset: Function to call for naming a pushed changeset, if necessary
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(conflict_handler) == merge.ConflictHandler:
handler = conflict_handler
else:
handler = util.wrap_conflict_handler(conflict_handler)
if not core.BNCollaborationSyncDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, handler._handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None):
raise RuntimeError(util._last_error())
[docs]def pull_database(database: binaryninja.Database, file_: 'file.File', conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop, name_changeset: 'util.NameChangesetFuncType' = nop):
"""
Pull updated snapshots from the remote. Merge local changes with remote changes and
potentially create a new snapshot for unsaved changes, named via name_changeset.
:param database: Database to pull
:param file_: Remote File to pull to
:param conflict_handler: Function to call to resolve snapshot conflicts
:param progress: Function to call for progress updates
:param name_changeset: Function to call for naming a pushed changeset, if necessary
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(conflict_handler) == merge.ConflictHandler:
handler = conflict_handler
else:
handler = util.wrap_conflict_handler(conflict_handler)
if not core.BNCollaborationPullDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, handler._handle, util.wrap_progress(progress), None, util.wrap_name_changeset(name_changeset), None):
raise RuntimeError(util._last_error())
[docs]def merge_database(database: binaryninja.Database, conflict_handler: 'util.ConflictHandlerType', progress: 'util.ProgressFuncType' = nop):
"""
Merge all leaf snapshots in a database down to a single leaf snapshot.
:param database: Database to merge
:param conflict_handler: Function to call for progress updates
:param progress: Function to call to resolve snapshot conflicts
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if type(conflict_handler) == merge.ConflictHandler:
handler = conflict_handler
else:
handler = util.wrap_conflict_handler(conflict_handler)
if not core.BNCollaborationMergeDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), handler._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs]def push_database(database: binaryninja.Database, file_: 'file.File', progress: 'util.ProgressFuncType' = nop):
"""
Push locally added snapshots to the remote
:param database: Database to push
:param file_: Remote File to push to
:param progress: Function to call for progress updates
:raises RuntimeError: If there was an error (or the operation was cancelled)
"""
if not core.BNCollaborationPushDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), file_._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs]def dump_database(database: binaryninja.Database):
"""
Print debug information about a database to stdout
:param database: Database to dump
:raises RuntimeError: If there was an error
"""
if not core.BNCollaborationDumpDatabase(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase))):
raise RuntimeError(util._last_error())
[docs]def ignore_snapshot(database: binaryninja.Database, snapshot: binaryninja.Snapshot):
"""
Ignore a snapshot from database syncing operations
:param database: Parent database
:param snapshot: Snapshot to ignore
:raises RuntimeError: If there was an error
"""
if not core.BNCollaborationIgnoreSnapshot(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot))):
raise RuntimeError(util._last_error())
[docs]def is_snapshot_ignored(database: binaryninja.Database, snapshot: binaryninja.Snapshot) -> bool:
"""
Test if a snapshot is ignored from the database
:param database: Parent database
:param snapshot: Snapshot to test
:return: True if snapshot should be ignored
:raises RuntimeError: If there was an error
"""
return core.BNCollaborationIsSnapshotIgnored(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)))
[docs]def get_snapshot_author(database: binaryninja.Database, snapshot: binaryninja.Snapshot) -> Optional[str]:
"""
Get the remote author of a local snapshot
:param database: Parent database
:param snapshot: Snapshot to query
:return: Remote author, or None if one could not be determined
:raises RuntimeError: If there was an error
"""
value = ctypes.POINTER(ctypes.c_char_p)()
if not core.BNCollaborationGetSnapshotAuthor(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)), value):
raise RuntimeError(util._last_error())
if value is None:
return None
return core.pyNativeStr(value)
[docs]def set_snapshot_author(database: binaryninja.Database, snapshot: binaryninja.Snapshot, author: str):
"""
Set the remote author of a local snapshot (does not upload)
:param database: Parent database
:param snapshot: Snapshot to edit
:param author: Target author
:raises RuntimeError: If there was an error
"""
if not core.BNCollaborationSetSnapshotAuthor(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(snapshot.handle, ctypes.POINTER(core.BNSnapshot)), author):
raise RuntimeError(util._last_error())