import abc
import traceback
try:
from collections import MutableSequence
except:
from collections.abc import MutableSequence
import sys
from . import _collaboration as core
from .enums import MergeConflictDataType, FormattedConflictFormatType
from . import util
from typing import List, Tuple, Optional, Dict, Callable
import binaryninja
import ctypes
import json
OptionalStringDict = Optional[Dict[str, object]]
[docs]class MergeConflict:
"""
Structure representing an individual merge conflict
"""
def __init__(self, handle):
"""
FFI constructor
:param handle: FFI handle for internal use
"""
self._handle = core.BNCollaborationMergeConflictCopy(handle)
def __del__(self):
core.BNCollaborationMergeConflictFree(self._handle)
@property
def database(self) -> binaryninja.Database:
"""
Database backing all snapshots in the merge conflict
:return: Database object
"""
result = core.BNCollaborationMergeConflictGetDatabase(self._handle)
return binaryninja.Database(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNDatabase)))
@property
def base_snapshot(self) -> Optional[binaryninja.Snapshot]:
"""
Snapshot which is the parent of the two being merged
:return: Snapshot object
"""
result = core.BNCollaborationMergeConflictGetBaseSnapshot(self._handle)
if result is None:
return None
return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
@property
def first_snapshot(self) -> Optional[binaryninja.Snapshot]:
"""
First snapshot being merged
:return: Snapshot object
"""
result = core.BNCollaborationMergeConflictGetFirstSnapshot(self._handle)
if result is None:
return None
return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
@property
def second_snapshot(self) -> Optional[binaryninja.Snapshot]:
"""
Second snapshot being merged
:return: Snapshot object
"""
result = core.BNCollaborationMergeConflictGetSecondSnapshot(self._handle)
if result is None:
return None
return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
@property
def base_file(self) -> Optional[binaryninja.FileMetadata]:
"""
FileMetadata with contents of file for base snapshot
This function is slow! Only use it if you really need it.
:return: FileMetadata object
"""
result = core.BNCollaborationMergeConflictGetBaseFile(self._handle)
if result is None:
return None
lazy = util.LazyT(handle=result)
file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata)))
core.BNCollaborationFreeLazyT(result)
return file
@property
def first_file(self) -> Optional[binaryninja.FileMetadata]:
"""
FileMetadata with contents of file for first snapshot
This function is slow! Only use it if you really need it.
:return: FileMetadata object
"""
result = core.BNCollaborationMergeConflictGetFirstFile(self._handle)
if result is None:
return None
lazy = util.LazyT(handle=result)
file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata)))
core.BNCollaborationFreeLazyT(result)
return file
@property
def second_file(self) -> Optional[binaryninja.FileMetadata]:
"""
FileMetadata with contents of file for second snapshot
This function is slow! Only use it if you really need it.
:return: FileMetadata object
"""
result = core.BNCollaborationMergeConflictGetSecondFile(self._handle)
if result is None:
return None
lazy = util.LazyT(handle=result)
file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata)))
core.BNCollaborationFreeLazyT(result)
return file
@property
def base(self) -> OptionalStringDict:
"""
Json object for conflicting data in the base snapshot
:return: Python dictionary from parsed json
"""
result = core.BNCollaborationMergeConflictGetBase(self._handle)
if result is None:
return None
return json.loads(result)
@property
def first(self) -> OptionalStringDict:
"""
Json object for conflicting data in the first snapshot
:return: Python dictionary from parsed json
"""
result = core.BNCollaborationMergeConflictGetFirst(self._handle)
if result is None:
return None
return json.loads(result)
@property
def second(self) -> OptionalStringDict:
"""
Json object for conflicting data in the second snapshot
:return: Python dictionary from parsed json
"""
result = core.BNCollaborationMergeConflictGetSecond(self._handle)
if result is None:
return None
return json.loads(result)
@property
def data_type(self) -> 'MergeConflictDataType':
"""
Type of data in the conflict, Text/Json/Binary
:return: Conflict data type
"""
return MergeConflictDataType(core.BNCollaborationMergeConflictGetDataType(self._handle))
@property
def type(self) -> str:
"""
String representing the type name of the data, not the same as data_type.
This is like "typeName" or "tag" depending on what object the conflict represents.
:return: Type name
"""
return core.BNCollaborationMergeConflictGetType(self._handle)
@property
def key(self) -> str:
"""
Lookup key for the merge conflict, ideally a tree path that contains the name of the conflict
and all the recursive children leading up to this conflict.
:return: Key name
"""
return core.BNCollaborationMergeConflictGetKey(self._handle)
[docs] def success(self, value: OptionalStringDict) -> bool:
"""
Call this when you've resolved the conflict to save the result
:param value: Resolved value
:return: True if successful
"""
if value is None:
printed = None
else:
printed = json.dumps(value)
return core.BNCollaborationMergeConflictSuccess(self._handle, printed)
[docs] def make_child(self, new_data_type, new_type: str, new_key: str, new_path_key: str, new_path_value: object, new_base: OptionalStringDict, new_first: OptionalStringDict, new_second: OptionalStringDict, new_success: Callable[[OptionalStringDict], bool]):
"""
Convenience function for making a new merge conflict using the snapshots/storage from this one
:param new_data_type: Desired dataType of new conflict
:param new_type: Desired type of new conflict
:param new_key: Desired key of new conflict
:param new_path_key: Key to add to path for new conflict
:param new_path_value: Value for key added to path
:param new_base: Desired base of new conflict
:param new_first: Desired first of new conflict
:param new_second: Desired second of new conflict
:param new_success: Success function for new conflict
:return: Created child object
"""
value = core.BNCollaborationMergeConflictMakeChild(self._handle, new_data_type, new_type, new_key, new_path_key, ctypes.cast(ctypes.py_object(new_path_value), ctypes.c_void_p), new_base, new_first, new_second, ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_char_p)(lambda ctxt, value: new_success(value)), None)
if value is None:
return None
result = MergeConflict(handle=value)
core.BNCollaborationMergeConflictFree(value)
return result
[docs] def get_path_item(self, path_key: str) -> Optional[object]:
"""
Get item in the merge conflict's path for a given key.
:param path_key: Key for path item
:return: Path item, or an None if not found
"""
value = core.BNCollaborationMergeConflictGetPathItem(self._handle, path_key)
if value is None:
return None
return ctypes.py_object(value)
[docs]class ConflictSplitter:
"""
Helper class that takes one merge conflict and splits it into multiple conflicts
Eg takes conflicts for View/symbols and splits to one conflict per symbol
"""
def __init__(self, handle=None):
if handle is not None:
self._handle = core.BNCollaborationConflictSplitterCopy(handle)
else:
self._cb = core.BNCollaborationConflictSplitterCallbacks()
self._cb.context = 0
self._cb.getName = self._cb.getName.__class__(self._get_name)
self._cb.canSplit = self._cb.canSplit.__class__(self._can_split)
self._cb.split = self._cb.split.__class__(self._split)
self._cb.freeName = self._cb.freeName.__class__(self._free_name)
self._cb.freeKeyList = self._cb.freeKeyList.__class__(self._free_key_list)
self._cb.freeConflictList = self._cb.freeConflictList.__class__(self._free_conflict_list)
self._handle = core.BNCollaborationConflictSplitterCreate(self._cb)
self._split_keys = None
self._split_conflicts = None
def _get_name(self, ctxt: ctypes.c_void_p) -> ctypes.c_char_p:
try:
return core.BNCollaborationAllocString(core.cstr(self.name))
except:
# Not sure why your get_name() would throw but let's handle it anyway
traceback.print_exc(file=sys.stderr)
return core.BNCollaborationAllocString(core.cstr(type(self).__name__))
def _can_split(self, ctxt: ctypes.c_void_p, key: ctypes.c_char_p, conflict: ctypes.POINTER(core.BNCollaborationMergeConflict)) -> bool:
try:
py_conflict = MergeConflict(handle=conflict)
return self.can_split(core.pyNativeStr(key), py_conflict)
except:
traceback.print_exc(file=sys.stderr)
return False
def _split(self, ctxt: ctypes.c_void_p, original_key: ctypes.c_char_p, original_conflict: ctypes.POINTER(core.BNCollaborationMergeConflict), result_kvs: ctypes.POINTER(core.BNKeyValueStore), new_keys: ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p)), new_conflicts: ctypes.POINTER(ctypes.POINTER(core.BNCollaborationMergeConflict)), new_count: ctypes.POINTER(ctypes.c_size_t)) -> bool:
try:
py_original_conflict = MergeConflict(handle=original_conflict)
py_result_kvs = binaryninja.KeyValueStore(handle=ctypes.cast(result_kvs, ctypes.POINTER(binaryninja.core.BNKeyValueStore)))
result = self.split(core.pyNativeStr(original_key), py_original_conflict, py_result_kvs)
if result is None:
return False
new_count.contents = len(result)
new_keys.contents = (ctypes.c_char_p * len(result))()
new_conflicts.contents = (ctypes.POINTER(core.BNCollaborationMergeConflict) * len(result))()
self._split_keys = []
self._split_conflicts = []
for (i, (key, conflict)) in enumerate(result):
self._split_keys.append(key)
self._split_conflicts.append(conflict)
new_keys.contents[i] = core.cstr(self._split_keys[-1])
new_conflicts.contents[i] = self._split_conflicts[-1]._handle
return True
except:
traceback.print_exc(file=sys.stderr)
return False
def _free_name(self, ctxt: ctypes.c_void_p, name: ctypes.c_char_p):
core.BNCollaborationFreeString(name)
def _free_key_list(self, ctxt: ctypes.c_void_p, key_list: ctypes.POINTER(ctypes.c_char_p), count: ctypes.c_size_t):
del self._split_keys
def _free_conflict_list(self, ctxt: ctypes.c_void_p, conflict_list: ctypes.POINTER(core.BNCollaborationMergeConflict), count: ctypes.c_size_t):
del self._split_conflicts
@property
def name(self):
"""
Get a friendly name for the splitter
:return: Name of the splitter
"""
return self.get_name()
[docs] def get_name(self) -> str:
"""
Get a friendly name for the splitter
:return: Name of the splitter
"""
return type(self).__name__
[docs] @abc.abstractmethod
def can_split(self, key: str, conflict: MergeConflict) -> bool:
"""
Test if the splitter applies to a given conflict (by key).
:param key: Key of the conflicting field
:param conflict: Conflict data
:return: True if this splitter should be used on the conflict
"""
raise NotImplementedError("Not implemented")
[docs] @abc.abstractmethod
def split(self, key: str, conflict: MergeConflict, result: binaryninja.KeyValueStore) -> Optional[Dict[str, MergeConflict]]:
"""
Split a field conflict into any number of alternate conflicts.
Note: Returned conflicts will also be checked for splitting, beware infinite loops!
If this function raises, it will be treated as returning None
:param key: Original conflicting field's key
:param conflict: Original conflict data
:param result: Kvs structure containing the result of all splits. You should use the original conflict's
success() function in most cases unless you specifically want to write a new key to this.
:return: A collection of conflicts into which the original conflict was split, or None if
this splitter cannot handle the conflict
"""
raise NotImplementedError("Not implemented")
[docs]class CoreConflictSplitter(ConflictSplitter):
"""
FFI wrapper for core-constructed splitters
"""
[docs] def get_name(self) -> str:
return core.BNCollaborationConflictSplitterGetName(self._handle)
[docs] def can_split(self, key: str, conflict: MergeConflict) -> bool:
return core.BNCollaborationConflictSplitterCanSplit(self._handle, key, conflict._handle)
[docs] def split(self, key: str, conflict: MergeConflict, result: binaryninja.KeyValueStore) -> Optional[Dict[str, MergeConflict]]:
raise NotImplementedError("Not implemented")
[docs]class SplitterList(MutableSequence):
"""
Mutable list of splitters for FFI purposes
"""
def __init__(self):
self._update_rep()
def _update_rep(self):
count = ctypes.c_size_t()
value = core.BNCollaborationGetConflictSplitterList(count)
self.rep = []
for i in range(count.value):
self.rep.append(CoreConflictSplitter(handle=value[i]))
core.BNCollaborationConflictSplitterFreeList(value, count.value)
def __len__(self):
# Not super efficient but probably not used in a hotpath
self._update_rep()
return len(self.rep)
def __getitem__(self, item):
# Not super efficient but probably not used in a hotpath
self._update_rep()
return self.rep[item]
def __delitem__(self, index):
core.BNCollaborationConflictSplitterListDelete(index)
def __setitem__(self, index, value: ConflictSplitter):
core.BNCollaborationConflictSplitterListInsert(index, value._handle)
[docs] def insert(self, index: int, value: ConflictSplitter) -> None:
self.__setitem__(index, value)
[docs]class ConflictHandler:
"""
Helper class that resolves conflicts
"""
def __init__(self, handle=None):
if handle is not None:
self._handle = core.BNCollaborationConflictHandlerCopy(handle)
else:
self._cb = core.BNCollaborationConflictHandlerCallbacks()
self._cb.context = 0
self._cb.handle = self._cb.handle.__class__(self._handle)
self._handle = core.BNCollaborationConflictHandlerCreate(self._cb)
def _handle(self, ctxt: ctypes.c_void_p, keys: ctypes.POINTER(ctypes.c_char_p), conflicts: ctypes.POINTER(ctypes.POINTER(core.BNCollaborationMergeConflict)), count: int) -> bool:
try:
py_conflicts = {}
for i in range(count):
py_conflicts[core.pyNativeStr(keys[i])] = MergeConflict(handle=conflicts[i])
return self.handle(py_conflicts)
except:
traceback.print_exc(file=sys.stderr)
return False
[docs] @abc.abstractmethod
def handle(self, conflicts: Dict[str, MergeConflict]) -> bool:
"""
Handle any merge conflicts by calling their success() function with a merged value
:param conflicts: Map of conflict id to conflict structure
:return: True if all conflicts were successfully merged
"""
raise NotImplementedError("Not implemented")
[docs]def least_common_ancestor(first: binaryninja.Snapshot, second: binaryninja.Snapshot) -> Optional[binaryninja.Snapshot]:
"""
Get the least common ancestor snapshot between two snapshots. The two snapshots must be in the same database.
:param first: First snapshot for ancestor searching
:param second: First snapshot for ancestor searching
:return: Ancestor snapshot, or None if the snapshots are on disjoint trees
"""
result = core.BNCollaborationLeastCommonAncestor(ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot)))
if result is None:
return None
return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs]def merge_snapshots(first: binaryninja.Snapshot, second: binaryninja.Snapshot, conflict_handler: ConflictHandler, progress: Callable[[int, int], bool]) -> Optional[binaryninja.Snapshot]:
"""
Merge a pair of snapshots and create a new snapshot with the result.
:param first: First snapshot to merge
:param second: Second snapshot to merge
:param conflict_handler: Function to call when merge conflicts are encountered
:param progress: Function to call for progress updates and cancelling
:return: Result snapshot, or None if there was an error
"""
result = core.BNCollaborationMergeSnapshots(ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot)), conflict_handler._handle, util.wrap_progress(progress), None)
if result is None:
return None
return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs]def merge_kvs(database: binaryninja.Database, base: binaryninja.Snapshot, first: binaryninja.Snapshot, second: binaryninja.Snapshot, conflict_handler: ConflictHandler, progress: Callable[[int, int], bool]) -> Optional[binaryninja.KeyValueStore]:
"""
Merge the underlying KVS data structure in two snapshots.
:param database: Owning database
:param base: ancestor snapshot's data
:param first: First snapshot's data
:param second: Second snapshot's data
:param conflict_handler: Function to call when merge conflicts are encountered
:param progress: Function to call for progress updates and cancelling
:return: Merged KVS structure, or None if there was an error
"""
result = core.BNCollaborationMergeKvs(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(base.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot)), conflict_handler._handle, util.wrap_progress(progress), None)
if result is None:
return None
return binaryninja.KeyValueStore(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNKeyValueStore)))
[docs]def diff_kvs(base: binaryninja.KeyValueStore, target: binaryninja.KeyValueStore) -> Dict[str, List[str]]:
"""
Calculate the difference between two KVS structures
:param base: First KVS structure
:param target: Second KVS structure
:return: Map: {"added": [added fields], "modified": [modified fields], "removed": [removed fields]}
"""
diff_keys = ctypes.POINTER(ctypes.c_char_p)()
diff_values = ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p))()
diff_field_counts = ctypes.POINTER(ctypes.c_size_t)()
diff_count = ctypes.c_size_t()
value = core.BNCollaborationDiffKvs(ctypes.cast(base.handle, ctypes.POINTER(core.BNKeyValueStore)), ctypes.cast(target.handle, ctypes.POINTER(core.BNKeyValueStore)), diff_keys, diff_values, diff_field_counts, diff_count)
if not value:
raise RuntimeError(util._last_error())
result = {}
for i in range(diff_count.value):
key = core.pyNativeStr(diff_keys[i])
values = []
for j in range(diff_field_counts[i]):
values.append(core.pyNativeStr(diff_values[i][j]))
result[key] = values
core.BNCollaborationFreeStringList(diff_keys.contents, diff_count.value)
core.BNCollaborationFreeStringListList(diff_values.contents, diff_field_counts.contents, diff_count.value)
return result