Source code for pymisp.abstract

#!/usr/bin/env python3

from __future__ import annotations

import logging
from datetime import date, datetime
from deprecated import deprecated  # type: ignore
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum, IntEnum
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
from pathlib import Path

try:
    import orjson  # type: ignore
    from orjson import loads, dumps
    HAS_ORJSON = True
except ImportError:
    from json import loads, dumps
    HAS_ORJSON = False

from .exceptions import PyMISPInvalidFormat, PyMISPError

logger = logging.getLogger('pymisp')


resources_path = Path(__file__).parent / 'data'
misp_objects_path = resources_path / 'misp-objects' / 'objects'
with (resources_path / 'describeTypes.json').open('rb') as f:
    describe_types: dict[str, Any] = loads(f.read())['result']


class MISPFileCache:
    # cache up to 150 JSON structures in class attribute

    @staticmethod
    @lru_cache(maxsize=150)
    def _load_json(path: Path) -> dict[str, Any] | None:
        if not path.exists():
            return None
        with path.open('rb') as f:
            data = loads(f.read())
        return data


[docs] class Distribution(Enum): your_organisation_only = 0 this_community_only = 1 connected_communities = 2 all_communities = 3 sharing_group = 4 inherit = 5
[docs] class ThreatLevel(Enum): high = 1 medium = 2 low = 3 undefined = 4
[docs] class Analysis(Enum): initial = 0 ongoing = 1 completed = 2
def _int_to_str(d: dict[str, Any]) -> dict[str, Any]: # transform all integer back to string for k, v in d.items(): if isinstance(v, dict): d[k] = _int_to_str(v) elif isinstance(v, int) and not isinstance(v, bool): d[k] = str(v) return d
[docs] @deprecated(reason=" Use method default=pymisp_json_default instead of cls=MISPEncode", version='2.4.117', action='default') class MISPEncode(JSONEncoder):
[docs] def default(self, obj: Any) -> dict[str, Any] | str: if isinstance(obj, AbstractMISP): return obj.jsonable() elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Enum): return obj.value elif isinstance(obj, UUID): return str(obj) return JSONEncoder.default(self, obj)
[docs] class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): # type: ignore[type-arg] __resources_path = resources_path __misp_objects_path = misp_objects_path __describe_types = describe_types def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] """Abstract class for all the MISP objects. NOTE: Every method in every classes inheriting this one are doing changes in memory and do not modify data on a remote MISP instance. To do so, you need to call the respective add_* or update_* methods in PyMISP. """ super().__init__() self.__edited: bool = True # As we create a new object, we assume it is edited self.__not_jsonable: list[str] = [] self._fields_for_feed: set[str] self.__self_defined_describe_types: dict[str, Any] | None = None self.uuid: str if kwargs.get('force_timestamps') is not None: # Ignore the edited objects and keep the timestamps. self.__force_timestamps: bool = True else: self.__force_timestamps = False @property def describe_types(self) -> dict[str, Any]: if self.__self_defined_describe_types: return self.__self_defined_describe_types return self.__describe_types @describe_types.setter def describe_types(self, describe_types: dict[str, Any]) -> None: self.__self_defined_describe_types = describe_types @property def resources_path(self) -> Path: return self.__resources_path @property def misp_objects_path(self) -> Path: return self.__misp_objects_path @misp_objects_path.setter def misp_objects_path(self, misp_objects_path: str | Path) -> None: if isinstance(misp_objects_path, str): misp_objects_path = Path(misp_objects_path) self.__misp_objects_path = misp_objects_path
[docs] def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] """Loading all the parameters as class properties, if they aren't `None`. This method aims to be called when all the properties requiring a special treatment are processed. Note: This method is used when you initialize an object with existing data so by default, the class is flaged as not edited.""" for prop, value in kwargs.items(): if value is None: continue setattr(self, prop, value) # We load an existing dictionary, marking it an not-edited self.__edited = False
[docs] def update_not_jsonable(self, *args) -> None: # type: ignore[no-untyped-def] """Add entries to the __not_jsonable list""" self.__not_jsonable += args
[docs] def set_not_jsonable(self, args: list[str]) -> None: """Set __not_jsonable to a new list""" self.__not_jsonable = args
def _remove_from_not_jsonable(self, *args) -> None: # type: ignore[no-untyped-def] """Remove the entries that are in the __not_jsonable list""" for entry in args: try: self.__not_jsonable.remove(entry) except ValueError: pass
[docs] def from_json(self, json_string: str) -> None: """Load a JSON string""" self.from_dict(**loads(json_string))
[docs] def to_dict(self, json_format: bool = False) -> dict[str, Any]: """Dump the class to a dictionary. This method automatically removes the timestamp recursively in every object that has been edited is order to let MISP update the event accordingly.""" is_edited = self.edited to_return = {} for attribute, val in self.items(): if val is None: continue elif isinstance(val, list) and len(val) == 0: continue elif isinstance(val, str): val = val.strip() elif json_format: if isinstance(val, AbstractMISP): val = val.to_json(True) elif isinstance(val, (datetime, date)): val = val.isoformat() elif isinstance(val, Enum): val = val.value elif isinstance(val, UUID): val = str(val) if attribute == 'timestamp': if not self.__force_timestamps and is_edited: # In order to be accepted by MISP, the timestamp of an object # needs to be either newer, or None. # If the current object is marked as edited, the easiest is to # skip the timestamp and let MISP deal with it continue else: val = self._datetime_to_timestamp(val) if (attribute in ('first_seen', 'last_seen', 'datetime') and isinstance(val, datetime) and not val.tzinfo): # Need to make sure the timezone is set. Otherwise, it will be processed as UTC on the server val = val.astimezone() to_return[attribute] = val to_return = _int_to_str(to_return) return to_return
[docs] def jsonable(self) -> dict[str, Any]: """This method is used by the JSON encoder""" return self.to_dict()
def _to_feed(self) -> dict[str, Any]: if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed: raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.') if hasattr(self, '_set_default') and callable(self._set_default): self._set_default() to_return = {} for field in sorted(self._fields_for_feed): if getattr(self, field, None) is not None: if field in ['timestamp', 'publish_timestamp']: to_return[field] = self._datetime_to_timestamp(getattr(self, field)) elif isinstance(getattr(self, field), (datetime, date)): to_return[field] = getattr(self, field).isoformat() else: to_return[field] = getattr(self, field) else: if field in ['data', 'first_seen', 'last_seen', 'deleted']: # special fields continue raise PyMISPError(f'The field {field} is required in {self.__class__.__name__} when generating a feed.') to_return = _int_to_str(to_return) return to_return
[docs] def to_json(self, sort_keys: bool = False, indent: int | None = None) -> str: """Dump recursively any class of type MISPAbstract to a json string""" if HAS_ORJSON: option = 0 if sort_keys: option |= orjson.OPT_SORT_KEYS if indent: option |= orjson.OPT_INDENT_2 # orjson dumps method returns bytes instead of bytes, to keep compatibility with json # we have to convert output to str return dumps(self, default=pymisp_json_default, option=option).decode() return dumps(self, default=pymisp_json_default, sort_keys=sort_keys, indent=indent)
def __getitem__(self, key: str) -> Any: try: if key[0] != '_' and key not in self.__not_jsonable: return self.__dict__[key] raise KeyError except AttributeError: # Expected by pop and other dict-related methods raise KeyError def __setitem__(self, key: str, value: Any) -> None: setattr(self, key, value) def __delitem__(self, key: str) -> None: delattr(self, key) def __iter__(self) -> Any: '''When we call **self, skip keys: * starting with _ * in __not_jsonable * timestamp if the object is edited *unless* it is forced ''' return iter({k: v for k, v in self.__dict__.items() if not (k[0] == '_' or k in self.__not_jsonable or (not self.__force_timestamps and (k == 'timestamp' and self.__edited)))}) def __len__(self) -> int: return len([k for k in self.__dict__.keys() if not (k[0] == '_' or k in self.__not_jsonable)]) @property def force_timestamp(self) -> bool: return self.__force_timestamps @force_timestamp.setter def force_timestamp(self, force: bool) -> None: self.__force_timestamps = force @property def edited(self) -> bool: """Recursively check if an object has been edited and update the flag accordingly to the parent objects""" if self.__edited: return self.__edited for p, val in self.items(): if isinstance(val, AbstractMISP) and val.edited: self.__edited = True break elif isinstance(val, list) and all(isinstance(a, AbstractMISP) for a in val): if any(a.edited for a in val): self.__edited = True break return self.__edited @edited.setter def edited(self, val: bool) -> None: """Set the edit flag""" if isinstance(val, bool): self.__edited = val else: raise PyMISPError('edited can only be True or False') def __setattr__(self, name: str, value: Any) -> None: if name[0] != '_' and not self.__edited and name in self: # The private members don't matter # If we already have a key with that name, we're modifying it. self.__edited = True super().__setattr__(name, value) def _datetime_to_timestamp(self, d: int | float | str | datetime) -> int: """Convert a datetime object to a timestamp (int)""" if isinstance(d, (int, float, str)): # Assume we already have a timestamp return int(d) return int(d.timestamp()) def _add_tag(self, tag: str | MISPTag | Mapping[str, Any] | None = None, **kwargs): # type: ignore[no-untyped-def] """Add a tag to the attribute (by name or a MISPTag object)""" if isinstance(tag, str): misp_tag = MISPTag() misp_tag.from_dict(name=tag) elif isinstance(tag, MISPTag): misp_tag = tag elif isinstance(tag, dict): misp_tag = MISPTag() misp_tag.from_dict(**tag) elif kwargs: misp_tag = MISPTag() misp_tag.from_dict(**kwargs) else: raise PyMISPInvalidFormat(f"The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {tag}") if misp_tag not in self.tags: # type: ignore self.Tag.append(misp_tag) self.edited = True return misp_tag def _set_tags(self, tags: list[MISPTag]) -> None: """Set a list of prepared MISPTag.""" if all(isinstance(x, MISPTag) for x in tags): self.Tag = tags else: raise PyMISPInvalidFormat('All the attributes have to be of type MISPTag.') def __eq__(self, other: object) -> bool: if isinstance(other, AbstractMISP): return self.to_dict() == other.to_dict() elif isinstance(other, dict): return self.to_dict() == other else: return False def __repr__(self) -> str: return f'<{self.__class__.__name__} - please define me>'
[docs] class MISPTag(AbstractMISP): _fields_for_feed: set[str] = {'name', 'colour', 'relationship_type', 'local'} def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) self.name: str self.exportable: bool self.local: bool self.relationship_type: str | None
[docs] def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] if kwargs.get('Tag'): kwargs = kwargs.get('Tag') # type: ignore[assignment] super().from_dict(**kwargs)
def _set_default(self) -> None: if not hasattr(self, 'relationship_type'): self.relationship_type = '' if not hasattr(self, 'colour'): self.colour = '#ffffff' if not hasattr(self, 'local'): self.local = False def _to_feed(self, with_local: bool = True) -> dict[str, Any]: if hasattr(self, 'exportable') and not self.exportable: return {} if with_local is False and hasattr(self, 'local') and self.local: return {} return super()._to_feed() def delete(self) -> None: self.deleted = True self.edited = True def __repr__(self) -> str: if hasattr(self, 'name'): return '<{self.__class__.__name__}(name={self.name})>'.format(self=self) return f'<{self.__class__.__name__}(NotInitialized)>'
# UUID, datetime, date and Enum is serialized by ORJSON by default def pymisp_json_default(obj: AbstractMISP | datetime | date | Enum | UUID) -> dict[str, Any] | str: if isinstance(obj, AbstractMISP): return obj.jsonable() elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Enum): return obj.value elif isinstance(obj, UUID): return str(obj)