import ctypes
import dataclasses
import uuid
from typing import List, Optional, Union
import binaryninja
from binaryninja import BinaryView, Function, BasicBlock, Architecture, Platform, Type, Symbol, LowLevelILInstruction, LowLevelILFunction, DataBuffer, Project, ProjectFile
from binaryninja._binaryninjacore import BNFreeString, BNAllocString, BNType
from . import _warpcore as warpcore
from .warp_enums import WARPContainerSearchItemKind, WARPProcessorIncludedData, WARPProcessorIncludedFunctions
[docs]
class WarpUUID:
[docs]
def __init__(self, _uuid: Union[warpcore.BNWARPUUID, str, uuid.UUID]):
if isinstance(_uuid, str):
_uuid = uuid.UUID(_uuid)
if isinstance(_uuid, uuid.UUID):
uuid_bytes = _uuid.bytes
_uuid = warpcore.BNWARPUUID()
_uuid.uuid = (ctypes.c_ubyte * 16).from_buffer_copy(uuid_bytes)
elif isinstance(_uuid, warpcore.BNWARPUUID):
# We must create a copy!
new_uuid = warpcore.BNWARPUUID()
new_uuid.uuid = (ctypes.c_ubyte * 16).from_buffer_copy(_uuid.uuid)
_uuid = new_uuid
self._uuid = _uuid
[docs]
def to_string(self) -> str:
return warpcore.BNWARPUUIDGetString(self._uuid)
def __str__(self):
return self.to_string()
def __repr__(self):
return f"<WarpUUID '{str(self)}'>"
def __hash__(self):
# Hash based on the UUID bytes
return hash(bytes(self._uuid.uuid))
def __eq__(self, other):
if not isinstance(other, WarpUUID):
return False
return warpcore.BNWARPUUIDEqual(self._uuid, other._uuid)
@property
def uuid(self):
return self._uuid
[docs]
class Source(WarpUUID):
def __repr__(self):
return f"<Source '{str(self)}'>"
[docs]
class BasicBlockGUID(WarpUUID):
def __repr__(self):
return f"<BasicBlockGUID '{str(self)}'>"
[docs]
class FunctionGUID(WarpUUID):
def __repr__(self):
return f"<FunctionGUID '{str(self)}'>"
[docs]
class ConstraintGUID(WarpUUID):
def __repr__(self):
return f"<ConstraintGUID '{str(self)}'>"
[docs]
class TypeGUID(WarpUUID):
def __repr__(self):
return f"<TypeGUID '{str(self)}'>"
[docs]
class WarpType:
[docs]
def __init__(self, handle: warpcore.BNWARPType):
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeTypeReference(self.handle)
def __repr__(self):
return f"<WarpType name: '{self.name}' confidence: '{self.confidence}'>"
@property
def name(self) -> str:
return warpcore.BNWARPTypeGetName(self.handle)
@property
def confidence(self) -> int:
return warpcore.BNWARPTypeGetConfidence(self.handle)
[docs]
def analysis_type(self, arch: Optional[Architecture] = None) -> Type:
if arch is None:
return Type.create(handle=warpcore.BNWARPTypeGetAnalysisType(None, self.handle))
return Type.create(handle=warpcore.BNWARPTypeGetAnalysisType(arch.handle, self.handle))
[docs]
@dataclasses.dataclass
class WarpConstraint:
guid: ConstraintGUID
offset: Optional[int]
def __str__(self):
return repr(self)
def __repr__(self):
if self.offset is None:
return f"<WarpConstraint '{self.guid}'>"
return f"<WarpConstraint '{self.guid}': {self.offset:#x}>"
[docs]
@staticmethod
def from_api(constraint: warpcore.BNWARPConstraint) -> 'WarpConstraint':
if constraint.offset == -1:
return WarpConstraint(guid=ConstraintGUID(constraint.guid), offset=None)
return WarpConstraint(guid=ConstraintGUID(constraint.guid), offset=constraint.offset)
[docs]
class WarpTarget:
[docs]
def __init__(self, handle: Union[warpcore.BNWARPTarget, Platform]):
if isinstance(handle, Platform):
self.handle = warpcore.BNWARPGetTarget(handle.handle)
else:
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeTargetReference(self.handle)
[docs]
class WarpFunction:
[docs]
def __init__(self, handle: Union[warpcore.BNWARPFunction, Function]):
if isinstance(handle, Function):
self.handle = warpcore.BNWARPGetFunction(handle.handle)
else:
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeFunctionReference(self.handle)
def __repr__(self):
return f"<WarpFunction '{self.name}': '{self.guid}'>"
@property
def guid(self) -> FunctionGUID:
return FunctionGUID(warpcore.BNWARPFunctionGetGUID(self.handle))
@property
def name(self) -> str:
return warpcore.BNWARPFunctionGetSymbolName(self.handle)
[docs]
def get_symbol(self, function: Function) -> Symbol:
symbol_handle = warpcore.BNWARPFunctionGetSymbol(self.handle, function.handle)
return Symbol(symbol_handle)
@property
def type(self) -> Optional[WarpType]:
type_handle = warpcore.BNWARPFunctionGetType(self.handle)
if not type_handle:
return None
return WarpType(type_handle)
@property
def constraints(self) -> List[WarpConstraint]:
count = ctypes.c_size_t()
constraints = warpcore.BNWARPFunctionGetConstraints(self.handle, count)
if not constraints:
return []
result = []
for i in range(count.value):
result.append(WarpConstraint.from_api(constraints[i]))
warpcore.BNWARPFreeConstraintList(constraints, count.value)
return result
@property
def comments(self) -> List[WarpFunctionComment]:
count = ctypes.c_size_t()
comments = warpcore.BNWARPFunctionGetComments(self.handle, count)
if not comments:
return []
result = []
for i in range(count.value):
result.append(WarpFunctionComment.from_api(comments[i]))
warpcore.BNWARPFreeFunctionCommentList(comments, count.value)
return result
[docs]
@staticmethod
def get_matched(function: Function) -> Optional['WarpFunction']:
handle = warpcore.BNWARPGetMatchedFunction(function.handle)
if not handle:
return None
return WarpFunction(handle)
[docs]
def apply(self, function: Function):
warpcore.BNWARPFunctionApply(self.handle, function.handle)
[docs]
@staticmethod
def remove_matched(function: Function):
warpcore.BNWARPFunctionApply(None, function.handle)
[docs]
class WarpContainerSearchQuery:
[docs]
def __init__(self, query: str, offset: Optional[int] = None, limit: Optional[int] = None, source: Optional[Source] = None, source_tags: Optional[List[str]] = None):
self.query = query
self.source = source
self.offset = offset
self.limit = limit
offset_ptr = None
if offset is not None:
self._c_offset = ctypes.c_size_t(offset)
offset_ptr = ctypes.byref(self._c_offset)
limit_ptr = None
if limit is not None:
self._c_limit = ctypes.c_size_t(limit)
limit_ptr = ctypes.byref(self._c_limit)
source_ptr = None
if source is not None:
self._c_source = source.uuid
source_ptr = ctypes.byref(self._c_source)
source_tags_len = 0
source_tags_array_ptr = None
if source_tags is not None:
source_tags_ptr = (ctypes.c_char_p * len(source_tags))()
source_tags_len = len(source_tags)
for i in range(len(source_tags)):
source_tags_ptr[i] = source_tags[i].encode('utf-8')
source_tags_array_ptr = ctypes.cast(source_tags_ptr, ctypes.POINTER(ctypes.c_char_p))
self.handle = warpcore.BNWARPNewContainerSearchQuery(query, offset_ptr, limit_ptr, source_ptr, source_tags_array_ptr, source_tags_len)
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeContainerSearchQueryReference(self.handle)
def __repr__(self):
# TODO: Display offset and limit in a pythonic way.
if self.source is None:
return f"<WarpContainerSearchQuery '{self.query}'>"
return f"<WarpContainerSearchQuery '{self.query}': '{self.source}'>"
[docs]
class WarpContainerSearchItem:
[docs]
def __init__(self, handle: warpcore.BNWARPContainerSearchItem):
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeContainerSearchItemReference(self.handle)
@property
def kind(self) -> WARPContainerSearchItemKind:
return WARPContainerSearchItemKind(warpcore.BNWARPContainerSearchItemGetKind(self.handle))
@property
def source(self) -> Source:
return Source(warpcore.BNWARPContainerSearchItemGetSource(self.handle))
@property
def name(self) -> str:
return warpcore.BNWARPContainerSearchItemGetName(self.handle)
@property
def type(self) -> Optional[WarpType]:
ty = warpcore.BNWARPContainerSearchItemGetType(self.handle)
if not ty:
return None
return WarpType(ty)
@property
def function(self) -> Optional[WarpFunction]:
func = warpcore.BNWARPContainerSearchItemGetFunction(self.handle)
if not func:
return None
return WarpFunction(func)
def __repr__(self):
return f"<WarpContainerSearchItem '{self.name}': '{self.source}'>"
[docs]
class WarpContainerResponse:
[docs]
def __init__(self, items: List[WarpContainerSearchItem], offset: int, total: int):
self.items = items
self.offset = offset
self.total = total
def __iter__(self):
return iter(self.items)
def __len__(self):
return len(self.items)
def __repr__(self):
return f"<WarpContainerResponse items={len(self.items)} offset={self.offset} total={self.total}>"
[docs]
@staticmethod
def from_api(response: warpcore.BNWARPContainerSearchResponse) -> 'WarpContainerResponse':
try:
items = []
for i in range(response.count):
items.append(WarpContainerSearchItem(warpcore.BNWARPNewContainerSearchItemReference(response.items[i])))
return WarpContainerResponse(items=items, offset=response.offset, total=response.total)
finally:
warpcore.BNWARPFreeContainerSearchResponse(response)
class _WarpContainerMetaclass(type):
def __iter__(self):
binaryninja._init_plugins()
count = ctypes.c_ulonglong()
containers = warpcore.BNWARPGetContainers(count)
try:
for i in range(0, count.value):
yield WarpContainer(warpcore.BNWARPNewContainerReference(containers[i]))
finally:
warpcore.BNWARPFreeContainerList(containers, count.value)
def __getitem__(self, value):
binaryninja._init_plugins()
count = ctypes.c_ulonglong()
containers = warpcore.BNWARPGetContainers(count)
try:
for i in range(0, count.value):
container = WarpContainer(warpcore.BNWARPNewContainerReference(containers[i]))
if container.name == str(value):
return container
raise KeyError(f"'{value}' is not a valid container name")
finally:
warpcore.BNWARPFreeContainerList(containers, count.value)
[docs]
class WarpContainer(metaclass=_WarpContainerMetaclass):
[docs]
def __init__(self, handle: warpcore.BNWARPContainer):
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeContainerReference(self.handle)
def __repr__(self):
return f"<WarpContainer '{self.name}'>"
[docs]
@staticmethod
def all() -> List['WarpContainer']:
count = ctypes.c_size_t()
containers = warpcore.BNWARPGetContainers(count)
if not containers:
return []
result = []
for i in range(count.value):
result.append(WarpContainer(warpcore.BNWARPNewContainerReference(containers[i])))
warpcore.BNWARPFreeContainerList(containers, count.value)
return result
[docs]
@staticmethod
def add(name: str) -> 'WarpContainer':
container = warpcore.BNWARPAddContainer(name)
if container is None:
raise ValueError(f"Failed to add container: {name}")
return WarpContainer(container)
[docs]
@staticmethod
def by_name(name: str) -> Optional['WarpContainer']:
for container in WarpContainer:
if container.name == name:
return container
return None
@property
def name(self) -> str:
return warpcore.BNWARPContainerGetName(self.handle)
@property
def sources(self) -> List[Source]:
count = ctypes.c_size_t()
sources = warpcore.BNWARPContainerGetSources(self.handle, count)
if not sources:
return []
result = []
for i in range(count.value):
result.append(Source(sources[i]))
warpcore.BNWARPFreeUUIDList(sources, count.value)
return result
[docs]
def add_source(self, source_path: str) -> Optional[Source]:
source = warpcore.BNWARPUUID()
if not warpcore.BNWARPContainerAddSource(self.handle, source_path, source):
return None
return Source(source)
[docs]
def commit_source(self, source: Source) -> bool:
return warpcore.BNWARPContainerCommitSource(self.handle, source.uuid)
[docs]
def is_source_uncommitted(self, source: Source) -> bool:
return warpcore.BNWARPContainerIsSourceWritable(self.handle, source.uuid)
[docs]
def is_source_writable(self, source: Source) -> bool:
return warpcore.BNWARPContainerIsSourceWritable(self.handle, source.uuid)
[docs]
def get_source_path(self, source: Source) -> Optional[str]:
return warpcore.BNWARPContainerGetSourcePath(self.handle, source.uuid)
[docs]
def add_functions(self, target: WarpTarget, source: Source, functions: List[Function]) -> bool:
count = len(functions)
core_funcs = (ctypes.POINTER(warpcore.BNWARPFunction) * count)()
for i in range(count):
core_funcs[i] = functions[i].handle
return warpcore.BNWARPContainerAddFunctions(self.handle, target.handle, source.uuid, core_funcs, count)
[docs]
def add_types(self, source: Source, types: List[WarpType]) -> bool:
count = len(types)
core_types = (ctypes.POINTER(warpcore.BNWARPType) * count)()
for i in range(count):
core_types[i] = types[i].handle
return warpcore.BNWARPContainerAddTypes(self.handle, source.uuid, core_types, count)
[docs]
def remove_functions(self, target: WarpTarget, source: Source, functions: List[Function]) -> bool:
count = len(functions)
core_funcs = (ctypes.POINTER(warpcore.BNWARPFunction) * count)()
for i in range(count):
core_funcs[i] = functions[i].handle
return warpcore.BNWARPContainerRemoveFunctions(self.handle, target.handle, source.uuid, core_funcs, count)
[docs]
def remove_types(self, source: Source, guids: List[TypeGUID]) -> bool:
count = len(guids)
core_guids = (ctypes.POINTER(warpcore.BNWARPTypeGUID) * count)()
for i in range(count):
core_guids[i] = guids[i].uuid
return warpcore.BNWARPContainerRemoveTypes(self.handle, source.uuid, core_guids, count)
[docs]
def fetch_functions(self, target: WarpTarget, guids: List[FunctionGUID], source_tags: Optional[List[str]] = None, constraints: Optional[List[ConstraintGUID]] = None):
count = len(guids)
core_guids = (warpcore.BNWARPFunctionGUID * count)()
for i in range(count):
core_guids[i] = guids[i].uuid
if constraints is None:
constraints = []
constraints_count = len(constraints)
core_constraints = (warpcore.BNWARPConstraintGUID * constraints_count)()
for i in range(constraints_count):
core_constraints[i] = constraints[i].uuid
if source_tags is None:
source_tags = []
source_tags_ptr = (ctypes.c_char_p * len(source_tags))()
source_tags_len = len(source_tags)
for i in range(len(source_tags)):
source_tags_ptr[i] = source_tags[i].encode('utf-8')
source_tags_array_ptr = ctypes.cast(source_tags_ptr, ctypes.POINTER(ctypes.c_char_p))
warpcore.BNWARPContainerFetchFunctions(self.handle, target.handle, source_tags_array_ptr, source_tags_len, core_guids, count, core_constraints, constraints_count)
[docs]
def get_sources_with_function_guid(self, target: WarpTarget, guid: FunctionGUID) -> List[Source]:
count = ctypes.c_size_t()
sources = warpcore.BNWARPContainerGetSourcesWithFunctionGUID(self.handle, target.handle, guid.uuid, count)
if not sources:
return []
result = []
for i in range(count.value):
result.append(Source(sources[i]))
warpcore.BNWARPFreeUUIDList(sources, count.value)
return result
[docs]
def get_sources_with_type_guid(self, guid: TypeGUID) -> List[Source]:
count = ctypes.c_size_t()
sources = warpcore.BNWARPContainerGetSourcesWithTypeGUID(self.handle, guid.uuid, count)
if not sources:
return []
result = []
for i in range(count.value):
result.append(Source(sources[i]))
warpcore.BNWARPFreeUUIDList(sources, count.value)
return result
[docs]
def get_functions_with_guid(self, target: WarpTarget, source: Source, guid: FunctionGUID) -> List[Function]:
count = ctypes.c_size_t()
funcs = warpcore.BNWARPContainerGetFunctionsWithGUID(self.handle, target.handle, source.uuid, guid.uuid, count)
if not funcs:
return []
result = []
for i in range(count.value):
result.append(WarpFunction(warpcore.BNWARPNewFunctionReference(funcs[i])))
warpcore.BNWARPFreeFunctionList(funcs, count.value)
return result
[docs]
def get_type_with_guid(self, source: Source, guid: TypeGUID) -> Optional[WarpType]:
ty = warpcore.BNWARPContainerGetTypeWithGUID(self.handle, source.uuid, guid.uuid)
if not ty:
return None
return WarpType(ty)
[docs]
def get_type_guids_with_name(self, source: Source, name: str) -> List[TypeGUID]:
count = ctypes.c_size_t()
guids = warpcore.BNWARPContainerGetTypeGUIDsWithName(self.handle, source.uuid, name, count)
if not guids:
return []
result = []
for i in range(count.value):
result.append(TypeGUID(guids[i]))
warpcore.BNWARPFreeUUIDList(guids, count.value)
return result
[docs]
def search(self, query: WarpContainerSearchQuery) -> Optional[WarpContainerResponse]:
response = warpcore.BNWARPContainerSearch(self.handle, query.handle)
if not response:
return None
return WarpContainerResponse.from_api(response.contents)
[docs]
class WarpChunk:
[docs]
def __init__(self, handle: warpcore.BNWARPChunk):
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeChunkReference(self.handle)
def __repr__(self):
return f"<WarpChunk functions: '{len(self.functions)}' types: '{len(self.types)}'>"
@property
def functions(self) -> List[WarpFunction]:
count = ctypes.c_size_t()
funcs = warpcore.BNWARPChunkGetFunctions(self.handle, count)
if not funcs:
return []
result = []
for i in range(count.value):
result.append(WarpFunction(warpcore.BNWARPNewFunctionReference(funcs[i])))
warpcore.BNWARPFreeFunctionList(funcs, count.value)
return result
@property
def types(self) -> List[WarpType]:
count = ctypes.c_size_t()
types = warpcore.BNWARPChunkGetTypes(self.handle, count)
if not types:
return []
result = []
for i in range(count.value):
result.append(WarpType(warpcore.BNWARPNewTypeReference(types[i])))
warpcore.BNWARPFreeTypeList(types, count.value)
return result
[docs]
class WarpFile:
[docs]
def __init__(self, handle: Union[warpcore.BNWARPFileHandle, str]):
if isinstance(handle, str):
self.handle = warpcore.BNWARPNewFileFromPath(handle)
else:
self.handle = handle
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeFileReference(self.handle)
def __repr__(self):
return f"<WarpFile chunks: '{len(self.chunks)}'>"
@property
def chunks(self) -> List[WarpChunk]:
count = ctypes.c_size_t()
chunks = warpcore.BNWARPFileGetChunks(self.handle, count)
if not chunks:
return []
result = []
for i in range(count.value):
result.append(WarpChunk(warpcore.BNWARPNewChunkReference(chunks[i])))
warpcore.BNWARPFreeChunkList(chunks, count.value)
return result
[docs]
def to_data_buffer(self) -> DataBuffer:
return DataBuffer(handle=warpcore.BNWARPFileToDataBuffer(self.handle))
[docs]
@dataclasses.dataclass
class WarpProcessorState:
cancelled: bool = False
unprocessed_file_count: int = 0
processed_file_count: int = 0
analyzing_files: List[str] = dataclasses.field(default_factory=list)
processing_files: List[str] = dataclasses.field(default_factory=list)
[docs]
@staticmethod
def from_api(state: warpcore.BNWARPProcessorState) -> 'WarpProcessorState':
analyzing_files = []
processing_files = []
for i in range(state.analyzing_files_count):
analyzing_files.append(state.analyzing_files[i])
for i in range(state.processing_files_count):
processing_files.append(state.processing_files[i])
return WarpProcessorState(
cancelled=state.cancelled,
unprocessed_file_count=state.unprocessed_file_count,
processed_file_count=state.processed_file_count,
analyzing_files=analyzing_files,
processing_files=processing_files
)
[docs]
class WarpProcessor:
[docs]
def __init__(self, included_data: WARPProcessorIncludedData = WARPProcessorIncludedData.WARPProcessorIncludedDataAll,
included_functions: WARPProcessorIncludedFunctions = WARPProcessorIncludedFunctions.WARPProcessorIncludedFunctionsAnnotated,
worker_count: int = 1):
self.handle = warpcore.BNWARPNewProcessor(ctypes.c_int(included_data), ctypes.c_int(included_functions), worker_count)
def __del__(self):
if self.handle is not None:
warpcore.BNWARPFreeProcessor(self.handle)
[docs]
def add_path(self, path: str):
warpcore.BNWARPProcessorAddPath(self.handle, path)
[docs]
def add_project(self, project: Project):
warpcore.BNWARPProcessorAddProject(self.handle, project.handle)
[docs]
def add_project_file(self, project_file: ProjectFile):
warpcore.BNWARPProcessorAddProjectFile(self.handle, project_file.handle)
[docs]
def add_binary_view(self, view: BinaryView):
warpcore.BNWARPProcessorAddBinaryView(self.handle, view.handle)
[docs]
def start(self) -> Optional[WarpFile]:
file = warpcore.BNWARPProcessorStart(self.handle)
if not file:
return None
return WarpFile(file)
[docs]
def state(self) -> WarpProcessorState:
state_raw = warpcore.BNWARPProcessorGetState(self.handle)
warpcore.BNWARPFreeProcessorState(state_raw)
return WarpProcessorState.from_api(state_raw)
[docs]
def run_matcher(view: BinaryView):
warpcore.BNWARPRunMatcher(view.handle)
[docs]
def is_instruction_variant(function: LowLevelILFunction, variant: LowLevelILInstruction) -> bool:
return warpcore.BNWARPIsLiftedInstructionVariant(function.handle, variant.instr_index)
[docs]
def is_instruction_blacklisted(function: LowLevelILFunction, variant: LowLevelILInstruction) -> bool:
return warpcore.BNWARPIsLiftedInstructionBlacklisted(function.handle, variant.instr_index)
[docs]
def is_instruction_computed_variant(function: LowLevelILFunction, variant: LowLevelILInstruction) -> bool:
"""
Checks to see if the instruction is variant due to some computed value. **Must use LLIL.**
"""
return warpcore.BNWARPIsLowLevelInstructionComputedVariant(function.handle, variant.instr_index)
[docs]
def get_function_guid(function: Function) -> Optional[FunctionGUID]:
guid = warpcore.BNWARPUUID()
if not warpcore.BNWARPGetAnalysisFunctionGUID(function.handle, guid):
return None
return FunctionGUID(guid)
[docs]
def get_basic_block_guid(basic_block: BasicBlock) -> Optional[BasicBlockGUID]:
# TODO: I believe this won't work for HLIL: https://github.com/Vector35/binaryninja-api/issues/6998
if basic_block.is_il:
basic_block = basic_block.source_block
guid = warpcore.BNWARPUUID()
if not warpcore.BNWARPGetBasicBlockGUID(basic_block.handle, guid):
return None
return BasicBlockGUID(guid)
# TODO: Magic matched_function, possible_functions