Source code for binaryninja.collaboration.merge

import abc
import ctypes
import json
import sys
import traceback
from typing import Dict, Optional

from .. import _binaryninjacore as core
from ..enums import MergeConflictDataType
from . import util

from ..database import Database, Snapshot
from ..filemetadata import FileMetadata

OptionalStringDict = Optional[Dict[str, object]]


[docs] class MergeConflict: """ Structure representing an individual merge conflict """ def __init__(self, handle: core.BNAnalysisMergeConflictHandle): """ FFI constructor :param handle: FFI handle for internal use """ self._handle = ctypes.cast(handle, core.BNAnalysisMergeConflictHandle) def __del__(self): core.BNFreeAnalysisMergeConflict(self._handle) @property def database(self) -> Database: """ Database backing all snapshots in the merge conflict :return: Database object """ result = core.BNAnalysisMergeConflictGetDatabase(self._handle) return Database(handle=ctypes.cast(result, ctypes.POINTER(core.BNDatabase))) @property def base_snapshot(self) -> Optional[Snapshot]: """ Snapshot which is the parent of the two being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetBaseSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def first_snapshot(self) -> Optional[Snapshot]: """ First snapshot being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetFirstSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def second_snapshot(self) -> Optional[Snapshot]: """ Second snapshot being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetSecondSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def base_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for base snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetBaseFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def first_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for first snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetFirstFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def second_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for second snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetSecondFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def base(self) -> OptionalStringDict: """ Json object for conflicting data in the base snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetBase(self._handle) if result is None: return None return json.loads(result) @property def first(self) -> OptionalStringDict: """ Json object for conflicting data in the first snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetFirst(self._handle) if result is None: return None return json.loads(result) @property def second(self) -> OptionalStringDict: """ Json object for conflicting data in the second snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetSecond(self._handle) if result is None: return None return json.loads(result) @property def data_type(self) -> 'MergeConflictDataType': """ Type of data in the conflict, Text/Json/Binary :return: Conflict data type """ return MergeConflictDataType(core.BNAnalysisMergeConflictGetDataType(self._handle)) @property def type(self) -> str: """ String representing the type name of the data, not the same as data_type. This is like "typeName" or "tag" depending on what object the conflict represents. :return: Type name """ return core.BNAnalysisMergeConflictGetType(self._handle) @property def key(self) -> str: """ Lookup key for the merge conflict, ideally a tree path that contains the name of the conflict and all the recursive children leading up to this conflict. :return: Key name """ return core.BNAnalysisMergeConflictGetKey(self._handle)
[docs] def success(self, value: OptionalStringDict) -> bool: """ Call this when you've resolved the conflict to save the result :param value: Resolved value :return: True if successful """ if value is None: printed = None else: printed = json.dumps(value) return core.BNAnalysisMergeConflictSuccess(self._handle, printed)
[docs] def get_path_item(self, path_key: str) -> Optional[object]: """ Get item in the merge conflict's path for a given key. :param path_key: Key for path item :return: Path item, or an None if not found """ value = core.BNAnalysisMergeConflictGetPathItem(self._handle, path_key) if value is None: return None return ctypes.py_object(value)
[docs] class ConflictHandler: """ Helper class that resolves conflicts """ def _handle(self, ctxt: ctypes.c_void_p, keys: ctypes.POINTER(ctypes.c_char_p), conflicts: ctypes.POINTER(core.BNAnalysisMergeConflictHandle), count: ctypes.c_ulonglong) -> bool: try: py_conflicts = {} for i in range(count.value): py_conflicts[core.pyNativeStr(keys[i])] = MergeConflict(handle=conflicts[i]) return self.handle(py_conflicts) except: traceback.print_exc(file=sys.stderr) return False
[docs] @abc.abstractmethod def handle(self, conflicts: Dict[str, MergeConflict]) -> bool: """ Handle any merge conflicts by calling their success() function with a merged value :param conflicts: Map of conflict id to conflict structure :return: True if all conflicts were successfully merged """ raise NotImplementedError("Not implemented")