Source code for binaryninja.collaboration.merge

import abc
import traceback
try:
	from collections import MutableSequence
except:
	from collections.abc import MutableSequence

import sys

from . import _collaboration as core
from .enums import MergeConflictDataType, FormattedConflictFormatType
from . import util
from typing import List, Optional, Dict, Callable
import binaryninja
import ctypes
import json


OptionalStringDict = Optional[Dict[str, object]]


[docs] class MergeConflict: """ Structure representing an individual merge conflict """ def __init__(self, handle): """ FFI constructor :param handle: FFI handle for internal use """ self._handle = core.BNCollaborationMergeConflictCopy(handle) def __del__(self): core.BNCollaborationMergeConflictFree(self._handle) @property def database(self) -> binaryninja.Database: """ Database backing all snapshots in the merge conflict :return: Database object """ result = core.BNCollaborationMergeConflictGetDatabase(self._handle) return binaryninja.Database(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNDatabase))) @property def base_snapshot(self) -> Optional[binaryninja.Snapshot]: """ Snapshot which is the parent of the two being merged :return: Snapshot object """ result = core.BNCollaborationMergeConflictGetBaseSnapshot(self._handle) if result is None: return None return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot))) @property def first_snapshot(self) -> Optional[binaryninja.Snapshot]: """ First snapshot being merged :return: Snapshot object """ result = core.BNCollaborationMergeConflictGetFirstSnapshot(self._handle) if result is None: return None return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot))) @property def second_snapshot(self) -> Optional[binaryninja.Snapshot]: """ Second snapshot being merged :return: Snapshot object """ result = core.BNCollaborationMergeConflictGetSecondSnapshot(self._handle) if result is None: return None return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot))) @property def base_file(self) -> Optional[binaryninja.FileMetadata]: """ FileMetadata with contents of file for base snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNCollaborationMergeConflictGetBaseFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def first_file(self) -> Optional[binaryninja.FileMetadata]: """ FileMetadata with contents of file for first snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNCollaborationMergeConflictGetFirstFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def second_file(self) -> Optional[binaryninja.FileMetadata]: """ FileMetadata with contents of file for second snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNCollaborationMergeConflictGetSecondFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = binaryninja.FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(binaryninja.core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def base(self) -> OptionalStringDict: """ Json object for conflicting data in the base snapshot :return: Python dictionary from parsed json """ result = core.BNCollaborationMergeConflictGetBase(self._handle) if result is None: return None return json.loads(result) @property def first(self) -> OptionalStringDict: """ Json object for conflicting data in the first snapshot :return: Python dictionary from parsed json """ result = core.BNCollaborationMergeConflictGetFirst(self._handle) if result is None: return None return json.loads(result) @property def second(self) -> OptionalStringDict: """ Json object for conflicting data in the second snapshot :return: Python dictionary from parsed json """ result = core.BNCollaborationMergeConflictGetSecond(self._handle) if result is None: return None return json.loads(result) @property def data_type(self) -> 'MergeConflictDataType': """ Type of data in the conflict, Text/Json/Binary :return: Conflict data type """ return MergeConflictDataType(core.BNCollaborationMergeConflictGetDataType(self._handle)) @property def type(self) -> str: """ String representing the type name of the data, not the same as data_type. This is like "typeName" or "tag" depending on what object the conflict represents. :return: Type name """ return core.BNCollaborationMergeConflictGetType(self._handle) @property def key(self) -> str: """ Lookup key for the merge conflict, ideally a tree path that contains the name of the conflict and all the recursive children leading up to this conflict. :return: Key name """ return core.BNCollaborationMergeConflictGetKey(self._handle)
[docs] def success(self, value: OptionalStringDict) -> bool: """ Call this when you've resolved the conflict to save the result :param value: Resolved value :return: True if successful """ if value is None: printed = None else: printed = json.dumps(value) return core.BNCollaborationMergeConflictSuccess(self._handle, printed)
[docs] def make_child(self, new_data_type, new_type: str, new_key: str, new_path_key: str, new_path_value: object, new_base: OptionalStringDict, new_first: OptionalStringDict, new_second: OptionalStringDict, new_success: Callable[[OptionalStringDict], bool]): """ Convenience function for making a new merge conflict using the snapshots/storage from this one :param new_data_type: Desired dataType of new conflict :param new_type: Desired type of new conflict :param new_key: Desired key of new conflict :param new_path_key: Key to add to path for new conflict :param new_path_value: Value for key added to path :param new_base: Desired base of new conflict :param new_first: Desired first of new conflict :param new_second: Desired second of new conflict :param new_success: Success function for new conflict :return: Created child object """ value = core.BNCollaborationMergeConflictMakeChild(self._handle, new_data_type, new_type, new_key, new_path_key, ctypes.cast(ctypes.py_object(new_path_value), ctypes.c_void_p), new_base, new_first, new_second, ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_char_p)(lambda ctxt, value: new_success(value)), None) if value is None: return None result = MergeConflict(handle=value) core.BNCollaborationMergeConflictFree(value) return result
[docs] def get_path_item(self, path_key: str) -> Optional[object]: """ Get item in the merge conflict's path for a given key. :param path_key: Key for path item :return: Path item, or an None if not found """ value = core.BNCollaborationMergeConflictGetPathItem(self._handle, path_key) if value is None: return None return ctypes.py_object(value)
[docs] class FormattedConflict: """ Class taking a merge conflict and producing a formatted text output. """ def __init__(self, conflict: Optional[MergeConflict] = None, type: Optional[FormattedConflictFormatType] = None, name: Optional[str] = None, base_text: Optional[str] = None, first_text: Optional[str] = None, second_text: Optional[str] = None, base: OptionalStringDict = None, first: OptionalStringDict = None, second: OptionalStringDict = None, handle = None): """ Structure with information for turning a json conflict into a formatted text representation. Populate either base_text/first_text/second_text (type = Text) or base/first/second (type = Code) :param conflict: Unformatted merge conflict structure :param type: Type of formatted content (Text/Code) :param name: Formatted name of conflict :param base_text: Text of base version of conflict :param first_text: Text of first version of conflict :param second_text: Text of second version of conflict :param base: Json of base first of conflict :param first: Json of first first of conflict :param second: Json of second first of conflict :param handle: FFI handle for internal use """ if handle is None: if base is None: base_json = None else: base_json = json.dumps(base) if first is None: first_json = None else: first_json = json.dumps(first) if second is None: second_json = None else: second_json = json.dumps(second) self._handle = core.BNCollaborationFormattedConflictCreate(conflict._handle, type, name, base_text, first_text,second_text, base_json, first_json, second_json) else: self._handle = core.BNCollaborationFormattedConflictCopy(handle) def __del__(self): core.BNCollaborationFormattedConflictFree(self._handle) @property def conflict(self) -> MergeConflict: """ Unformatted merge conflict structure :return: Merge Conflict structure """ value = core.BNCollaborationFormattedConflictGetConflict(self._handle) assert value is not None result = MergeConflict(handle=value) core.BNCollaborationMergeConflictFree(value) return result @property def type(self) -> FormattedConflictFormatType: """ Type of formatted content (Text/Code) :return: Type """ return FormattedConflictFormatType(core.BNCollaborationFormattedConflictGetType(self._handle)) @property def name(self) -> str: """ Formatted name of conflict :return: Name string """ return core.BNCollaborationFormattedConflictGetName(self._handle) @property def base_text(self) -> Optional[str]: """ Text of base version of conflict :return: Formatted text, if type = Text, else None """ return core.BNCollaborationFormattedConflictGetBaseText(self._handle) @property def first_text(self) -> Optional[str]: """ Text of first version of conflict :return: Formatted text, if type = Text, else None """ return core.BNCollaborationFormattedConflictGetFirstText(self._handle) @property def second_text(self) -> Optional[str]: """ Text of second version of conflict :return: Formatted text, if type = Text, else None """ return core.BNCollaborationFormattedConflictGetSecondText(self._handle) @property def base(self) -> OptionalStringDict: """ Json of base first of conflict :return: Formatted Json value, if type = Code, else None """ value = core.BNCollaborationFormattedConflictGetBase(self._handle) if value is None: return None return json.loads(value) @property def first(self) -> OptionalStringDict: """ Json of first first of conflict :return: Formatted Json value, if type = Code, else None """ value = core.BNCollaborationFormattedConflictGetFirst(self._handle) if value is None: return None return json.loads(value) @property def second(self) -> OptionalStringDict: """ Json of second first of conflict :return: Formatted Json value, if type = Code, else None """ value = core.BNCollaborationFormattedConflictGetSecond(self._handle) if value is None: return None return json.loads(value)
[docs] class ConflictSplitter: """ Helper class that takes one merge conflict and splits it into multiple conflicts Eg takes conflicts for View/symbols and splits to one conflict per symbol """ def __init__(self, handle=None): if handle is not None: self._handle = core.BNCollaborationConflictSplitterCopy(handle) else: self._cb = core.BNCollaborationConflictSplitterCallbacks() self._cb.context = 0 self._cb.getName = self._cb.getName.__class__(self._get_name) self._cb.canSplit = self._cb.canSplit.__class__(self._can_split) self._cb.split = self._cb.split.__class__(self._split) self._cb.freeName = self._cb.freeName.__class__(self._free_name) self._cb.freeKeyList = self._cb.freeKeyList.__class__(self._free_key_list) self._cb.freeConflictList = self._cb.freeConflictList.__class__(self._free_conflict_list) self._handle = core.BNCollaborationConflictSplitterCreate(self._cb) self._split_keys = None self._split_conflicts = None def _get_name(self, ctxt: ctypes.c_void_p) -> ctypes.c_char_p: try: return core.BNCollaborationAllocString(core.cstr(self.name)) except: # Not sure why your get_name() would throw but let's handle it anyway traceback.print_exc(file=sys.stderr) return core.BNCollaborationAllocString(core.cstr(type(self).__name__)) def _can_split(self, ctxt: ctypes.c_void_p, key: ctypes.c_char_p, conflict: ctypes.POINTER(core.BNCollaborationMergeConflict)) -> bool: try: py_conflict = MergeConflict(handle=conflict) return self.can_split(core.pyNativeStr(key), py_conflict) except: traceback.print_exc(file=sys.stderr) return False def _split(self, ctxt: ctypes.c_void_p, original_key: ctypes.c_char_p, original_conflict: ctypes.POINTER(core.BNCollaborationMergeConflict), result_kvs: ctypes.POINTER(core.BNKeyValueStore), new_keys: ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p)), new_conflicts: ctypes.POINTER(ctypes.POINTER(core.BNCollaborationMergeConflict)), new_count: ctypes.POINTER(ctypes.c_size_t)) -> bool: try: py_original_conflict = MergeConflict(handle=original_conflict) py_result_kvs = binaryninja.KeyValueStore(handle=ctypes.cast(result_kvs, ctypes.POINTER(binaryninja.core.BNKeyValueStore))) result = self.split(core.pyNativeStr(original_key), py_original_conflict, py_result_kvs) if result is None: return False new_count.contents = len(result) new_keys.contents = (ctypes.c_char_p * len(result))() new_conflicts.contents = (ctypes.POINTER(core.BNCollaborationMergeConflict) * len(result))() self._split_keys = [] self._split_conflicts = [] for (i, (key, conflict)) in enumerate(result): self._split_keys.append(key) self._split_conflicts.append(conflict) new_keys.contents[i] = core.cstr(self._split_keys[-1]) new_conflicts.contents[i] = self._split_conflicts[-1]._handle return True except: traceback.print_exc(file=sys.stderr) return False def _free_name(self, ctxt: ctypes.c_void_p, name: ctypes.c_char_p): core.BNCollaborationFreeString(name) def _free_key_list(self, ctxt: ctypes.c_void_p, key_list: ctypes.POINTER(ctypes.c_char_p), count: ctypes.c_size_t): del self._split_keys def _free_conflict_list(self, ctxt: ctypes.c_void_p, conflict_list: ctypes.POINTER(core.BNCollaborationMergeConflict), count: ctypes.c_size_t): del self._split_conflicts @property def name(self): """ Get a friendly name for the splitter :return: Name of the splitter """ return self.get_name()
[docs] def get_name(self) -> str: """ Get a friendly name for the splitter :return: Name of the splitter """ return type(self).__name__
[docs] @abc.abstractmethod def can_split(self, key: str, conflict: MergeConflict) -> bool: """ Test if the splitter applies to a given conflict (by key). :param key: Key of the conflicting field :param conflict: Conflict data :return: True if this splitter should be used on the conflict """ raise NotImplementedError("Not implemented")
[docs] @abc.abstractmethod def split(self, key: str, conflict: MergeConflict, result: binaryninja.KeyValueStore) -> Optional[Dict[str, MergeConflict]]: """ Split a field conflict into any number of alternate conflicts. Note: Returned conflicts will also be checked for splitting, beware infinite loops! If this function raises, it will be treated as returning None :param key: Original conflicting field's key :param conflict: Original conflict data :param result: Kvs structure containing the result of all splits. You should use the original conflict's success() function in most cases unless you specifically want to write a new key to this. :return: A collection of conflicts into which the original conflict was split, or None if this splitter cannot handle the conflict """ raise NotImplementedError("Not implemented")
[docs] class CoreConflictSplitter(ConflictSplitter): """ FFI wrapper for core-constructed splitters """
[docs] def get_name(self) -> str: return core.BNCollaborationConflictSplitterGetName(self._handle)
[docs] def can_split(self, key: str, conflict: MergeConflict) -> bool: return core.BNCollaborationConflictSplitterCanSplit(self._handle, key, conflict._handle)
[docs] def split(self, key: str, conflict: MergeConflict, result: binaryninja.KeyValueStore) -> Optional[Dict[str, MergeConflict]]: raise NotImplementedError("Not implemented")
[docs] class SplitterList(MutableSequence): """ Mutable list of splitters for FFI purposes """ def __init__(self): self._update_rep() def _update_rep(self): count = ctypes.c_size_t() value = core.BNCollaborationGetConflictSplitterList(count) self.rep = [] for i in range(count.value): self.rep.append(CoreConflictSplitter(handle=value[i])) core.BNCollaborationConflictSplitterFreeList(value, count.value) def __len__(self): # Not super efficient but probably not used in a hotpath self._update_rep() return len(self.rep) def __getitem__(self, item): # Not super efficient but probably not used in a hotpath self._update_rep() return self.rep[item] def __delitem__(self, index): core.BNCollaborationConflictSplitterListDelete(index) def __setitem__(self, index, value: ConflictSplitter): core.BNCollaborationConflictSplitterListInsert(index, value._handle)
[docs] def insert(self, index: int, value: ConflictSplitter) -> None: self.__setitem__(index, value)
[docs] class ConflictFormatter: """ Helper class that turns json conflicts into pretty text for the ui to render """ def __init__(self, handle=None): if handle is not None: self._handle = core.BNCollaborationConflictFormatterCopy(handle) else: self._cb = core.BNCollaborationConflictFormatterCallbacks() self._cb.context = 0 self._cb.getName = self._cb.getName.__class__(self._get_name) self._cb.canFormat = self._cb.canFormat.__class__(self._can_format) self._cb.format = self._cb.format.__class__(self._format) self._cb.freeName = self._cb.freeName.__class__(self._free_name) self._cb.freeFormattedConflict = self._cb.freeFormattedConflict.__class__(self._free_formatted_conflict) self._handle = core.BNCollaborationConflictFormatterCreate(self._cb) self._format_conflict = None def _get_name(self, ctxt: ctypes.c_void_p) -> ctypes.c_char_p: try: return core.BNCollaborationAllocString(core.cstr(self.name)) except: traceback.print_exc(file=sys.stderr) return core.BNCollaborationAllocString(core.cstr(type(self).__name__)) def _can_format(self, ctxt: ctypes.c_void_p, key: ctypes.c_void_p, conflict: ctypes.POINTER(core.BNCollaborationMergeConflict)) -> bool: try: py_conflict = MergeConflict(handle=conflict) return self.can_format(core.pyNativeStr(key), py_conflict) except: traceback.print_exc(file=sys.stderr) return False def _format(self, ctxt: ctypes.c_void_p, key: ctypes.c_char_p, conflict: ctypes.POINTER(core.BNCollaborationMergeConflict), formattedConflict: ctypes.POINTER(ctypes.POINTER(core.BNCollaborationFormattedConflict))) -> bool: try: py_conflict = MergeConflict(handle=conflict) result = self.format(core.pyNativeStr(key), py_conflict) if result is None: return False self._format_conflict = result formattedConflict.contents = self._format_conflict._handle return True except: traceback.print_exc(file=sys.stderr) return False def _free_name(self, ctxt: ctypes.c_void_p, name: ctypes.c_void_p): core.BNCollaborationFreeString(name) def _free_formatted_conflict(self, ctxt: ctypes.c_void_p, formatted_conflict: ctypes.POINTER(core.BNCollaborationFormattedConflict)): del self._format_conflict @property def name(self): """ Get a friendly name for the formatter :return: Name of the formatter """ return self.get_name()
[docs] def get_name(self) -> str: """ Get a friendly name for the formatter :return: Name of the formatter """ return type(self).__name__
[docs] @abc.abstractmethod def can_format(self, key: str, conflict: MergeConflict) -> bool: """ Test if the formatter can format a specific conflict :param key: Unique key for the conflict :param conflict: Conflict structure :return: True if the formatter should try to format this conflict """ raise NotImplementedError("Not implemented")
[docs] @abc.abstractmethod def format(self, key: str, conflict: MergeConflict) -> Optional[FormattedConflict]: """ Format a conflict into a human-readable string :param key: Unique key for the conflict :param conflict: Conflict structure :return: Formatted conflict structure, if it could be formatted. None otherwise. """ raise NotImplementedError("Not implemented")
[docs] class CoreConflictFormatter(ConflictFormatter): """ FFI wrapper for conflict formatters """
[docs] def get_name(self) -> str: return core.BNCollaborationConflictFormatterGetName(self._handle)
[docs] def can_format(self, key: str, conflict: MergeConflict) -> bool: return core.BNCollaborationConflictFormatterCanFormat(self._handle, key, conflict._handle)
[docs] def format(self, key: str, conflict: MergeConflict) -> Optional[FormattedConflict]: raise NotImplementedError("Not implemented")
[docs] class FormatterList(MutableSequence): """ Mutable list of formatters for FFI purposes """ def __init__(self): self._update_rep() def _update_rep(self): count = ctypes.c_size_t() value = core.BNCollaborationGetConflictFormatterList(count) self.rep = [] for i in range(count.value): self.rep.append(CoreConflictFormatter(handle=value[i])) core.BNCollaborationConflictFormatterFreeList(value, count.value) def __len__(self): # Not super efficient but probably not used in a hotpath self._update_rep() return len(self.rep) def __getitem__(self, item): # Not super efficient but probably not used in a hotpath self._update_rep() return self.rep[item] def __delitem__(self, index): core.BNCollaborationConflictFormatterListDelete(index) def __setitem__(self, index, value: ConflictFormatter): core.BNCollaborationConflictFormatterListInsert(index, value._handle)
[docs] def insert(self, index: int, value: ConflictFormatter) -> None: self.__setitem__(index, value)
[docs] class ConflictHandler: """ Helper class that resolves conflicts """ def __init__(self, handle=None): if handle is not None: self._handle = core.BNCollaborationConflictHandlerCopy(handle) else: self._cb = core.BNCollaborationConflictHandlerCallbacks() self._cb.context = 0 self._cb.handle = self._cb.handle.__class__(self._handle) self._handle = core.BNCollaborationConflictHandlerCreate(self._cb) def _handle(self, ctxt: ctypes.c_void_p, keys: ctypes.POINTER(ctypes.c_char_p), conflicts: ctypes.POINTER(ctypes.POINTER(core.BNCollaborationMergeConflict)), count: int) -> bool: try: py_conflicts = {} for i in range(count): py_conflicts[core.pyNativeStr(keys[i])] = MergeConflict(handle=conflicts[i]) return self.handle(py_conflicts) except: traceback.print_exc(file=sys.stderr) return False
[docs] @abc.abstractmethod def handle(self, conflicts: Dict[str, MergeConflict]) -> bool: """ Handle any merge conflicts by calling their success() function with a merged value :param conflicts: Map of conflict id to conflict structure :return: True if all conflicts were successfully merged """ raise NotImplementedError("Not implemented")
[docs] def least_common_ancestor(first: binaryninja.Snapshot, second: binaryninja.Snapshot) -> Optional[binaryninja.Snapshot]: """ Get the least common ancestor snapshot between two snapshots. The two snapshots must be in the same database. :param first: First snapshot for ancestor searching :param second: First snapshot for ancestor searching :return: Ancestor snapshot, or None if the snapshots are on disjoint trees """ result = core.BNCollaborationLeastCommonAncestor(ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot))) if result is None: return None return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs] def merge_snapshots(first: binaryninja.Snapshot, second: binaryninja.Snapshot, conflict_handler: ConflictHandler, progress: Callable[[int, int], bool]) -> Optional[binaryninja.Snapshot]: """ Merge a pair of snapshots and create a new snapshot with the result. :param first: First snapshot to merge :param second: Second snapshot to merge :param conflict_handler: Function to call when merge conflicts are encountered :param progress: Function to call for progress updates and cancelling :return: Result snapshot, or None if there was an error """ result = core.BNCollaborationMergeSnapshots(ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot)), conflict_handler._handle, util.wrap_progress(progress), None) if result is None: return None return binaryninja.Snapshot(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNSnapshot)))
[docs] def merge_kvs(database: binaryninja.Database, base: binaryninja.Snapshot, first: binaryninja.Snapshot, second: binaryninja.Snapshot, conflict_handler: ConflictHandler, progress: Callable[[int, int], bool]) -> Optional[binaryninja.KeyValueStore]: """ Merge the underlying KVS data structure in two snapshots. :param database: Owning database :param base: ancestor snapshot's data :param first: First snapshot's data :param second: Second snapshot's data :param conflict_handler: Function to call when merge conflicts are encountered :param progress: Function to call for progress updates and cancelling :return: Merged KVS structure, or None if there was an error """ result = core.BNCollaborationMergeKvs(ctypes.cast(database.handle, ctypes.POINTER(core.BNDatabase)), ctypes.cast(base.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(first.handle, ctypes.POINTER(core.BNSnapshot)), ctypes.cast(second.handle, ctypes.POINTER(core.BNSnapshot)), conflict_handler._handle, util.wrap_progress(progress), None) if result is None: return None return binaryninja.KeyValueStore(handle=ctypes.cast(result, ctypes.POINTER(binaryninja.core.BNKeyValueStore)))
[docs] def diff_kvs(base: binaryninja.KeyValueStore, target: binaryninja.KeyValueStore) -> Dict[str, List[str]]: """ Calculate the difference between two KVS structures :param base: First KVS structure :param target: Second KVS structure :return: Map: {"added": [added fields], "modified": [modified fields], "removed": [removed fields]} """ diff_keys = ctypes.POINTER(ctypes.c_char_p)() diff_values = ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p))() diff_field_counts = ctypes.POINTER(ctypes.c_size_t)() diff_count = ctypes.c_size_t() value = core.BNCollaborationDiffKvs(ctypes.cast(base.handle, ctypes.POINTER(core.BNKeyValueStore)), ctypes.cast(target.handle, ctypes.POINTER(core.BNKeyValueStore)), diff_keys, diff_values, diff_field_counts, diff_count) if not value: raise RuntimeError(util._last_error()) result = {} for i in range(diff_count.value): key = core.pyNativeStr(diff_keys[i]) values = [] for j in range(diff_field_counts[i]): values.append(core.pyNativeStr(diff_values[i][j])) result[key] = values core.BNCollaborationFreeStringList(diff_keys.contents, diff_count.value) core.BNCollaborationFreeStringListList(diff_values.contents, diff_field_counts.contents, diff_count.value) return result