Source code for pymisp.tools.attributevalidationtool

#!/usr/bin/env python3

import ipaddress
import json
import logging
import re
from base64 import b64decode
from datetime import datetime
from dateutil.parser import parse
from pymisp import MISPAttribute, MISPEvent, MISPObject
from pymisp.exceptions import PyMISPError
from typing import Generator
from urllib.parse import urlparse

HASH_HEX_LENGTH = {
    'authentihash': 64,
    'md5': 32,
    'imphash': 32,
    'telfhash': 70,
    'sha1': 40,
    'git-commit-id': 40,
    'x509-fingerprint-md5': 32,
    'x509-fingerprint-sha1': 40,
    'x509-fingerprint-sha256': 64,
    'ja3-fingerprint-md5': 32,
    'jarm-fingerprint': 62,
    'hassh-md5': 32,
    'hasshserver-md5': 32,
    'pehash': 40,
    'sha224': 56,
    'sha256': 64,
    'sha384': 96,
    'sha512': 128,
    'sha512/224': 56,
    'sha512/256': 64,
    'sha3-224': 56,
    'sha3-256': 64,
    'sha3-384': 96,
    'sha3-512': 128,
    'dom-hash': 32,
}
HTTP_METHODS = (
    'OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT',
    'PROPFIND', 'PROPPATCH', 'MKCOL', 'COPY', 'MOVE', 'LOCK', 'UNLOCK',
    'VERSION-CONTROL', 'REPORT', 'CHECKOUT', 'CHECKIN', 'UNCHECKOUT',
    'MKWORKSPACE', 'UPDATE', 'LABEL', 'MERGE', 'BASELINE-CONTROL',
    'MKACTIVITY', 'ORDERPATCH', 'ACL', 'PATCH', 'SEARCH'
)
REFANG_REGEX_TABLE = (
    {
        'from': re.compile(r'^(hxxp|hxtp|htxp|meow|h\[tt\]p)', re.IGNORECASE),
        'to': 'http',
        'types': ('link', 'url')
    },
    {
        'from': re.compile(r'(\[\.\]|\[dot\]|\(dot\))', re.IGNORECASE),
        'to': '.',
        'types': (
            'link', 'url', 'ip-dst', 'ip-src', 'domain|ip', 'domain',
            'hostname', 'email', 'email-src', 'email-dst'
        )
    },
    {
        'from': re.compile(r'\[hxxp:\/\/\]', re.IGNORECASE),
        'to': 'http',
        'types': ('link', 'url')
    },
    {
        'from': re.compile(r'\[\@\]|\[at\]', re.IGNORECASE),
        'to': '@',
        'types': ('email', 'email-src', 'email-dst')
    },
    {
        'from': re.compile(r'\[:\]'),
        'to': ':',
        'types': ('link', 'url')
    }
)
VULNERABILITY_REGEXES = (
    r'CVE-\d{4}-\d{4,}',
    r'GCVE-\d+-\d{4}-\d+',
    r'fkie_cve-\d{4}-\d{4,}',
    r'ghsa-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}',
    r'pysec-\d{4}-\d{2,5}',
    r'gsd-\d{4}-\d{4,5}',
    r'mal-\d{4}-\d+',
    r'wid-sec-w-\d{4}-\d{4}',
    r'ncsc-\d{4}-\d{4}',
    r'ssa-\d{6}',
    r'rh(ba|ea|sa)-\d{4}:\d{4,}',
    r'ics(ma|a)-\d{2}-\d{3}-\d{2}',
    r'va-\d{2}-\d{3}-\d{2}',
    r'cisco-sa(-[a-zA-Z0-9_]+)+',
    r'sca-\d{4}-\d{4,}',
    r'nn-\d{4}[:_]\d-\d{2}',
    r'oxas-adv-\d{4}-\d{4}',
    r'msrc_cve-\d{4}-\d{4,}',
    r'var-\d{6}-\d{4}',
    r'jvndb-\d{4}-\d{6}',
    r'ts-\d{4}-\d{4}',
    r'(open)?suse-su-\d{4}:\d{4,}-\d',
    r'cnvd-\d{4}-\d{5}',
    r'certfr-\d{4}-avi-\d{4}',
    r'certfr-\d{4}-ale-\d{3}'
)

CDHASH_RE = re.compile(r'^[0-9a-f]{40,}$')
EMAIL_RE = re.compile(r'^.[^\s]*\@.*\..*$', flags=re.IGNORECASE)
DOMAIN_RE = re.compile(r'^[A-Z0-9.\-_]+\.[A-Z0-9\-]{2,}$', flags=re.IGNORECASE)
HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
MAC_ADDRESS_RE = re.compile(r'^([a-f0-9]{2}:){5}[a-f0-9]{2}$')
MAC_EUI_64_RE = re.compile(r'^([a-f0-9]{2}:){3}ff:fe:(:[a-f0-9]{2}){3}$')
ONION_RE = re.compile(r'^([a-z2-7]{16}|[a-z2-7]{56})\.onion$')
REMOVE_NON_ALPHANUM_CAP_RE = re.compile(r'[^0-9A-Z]+')
REMOVE_NON_ALPHANUM_RE = re.compile(r'[^0-9A-Fa-f]')
REMOVE_NON_NUM_RE = re.compile(r'[^0-9]+')
REMOVE_PHONE_PARENTHESIS_RE = re.compile(r'\(0\)')
SANITISE_PHONE_NUMBER_RE = re.compile(r'[^\+0-9]+')
SSDEEP_RE = re.compile(r'^([0-9]+):([0-9a-zA-Z/+]*):([0-9a-zA-Z/+]*)$')
UUID_RE = re.compile(r'[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$')
VULNERABILITY_RE = re.compile(
    r'^(?:' + '|'.join(VULNERABILITY_REGEXES) + r')$', flags=re.IGNORECASE
)
WEAKNESS_RE = re.compile(r"^CWE-[0-9]+$", flags=re.IGNORECASE)

logger = logging.getLogger('pymisp')


[docs] class ValidationError(PyMISPError): pass
class AttributeValidationTool: @classmethod def modifyBeforeValidation(cls, attribute_type, value): if isinstance(value, str): value = cls._refang_value(attribute_type, value.strip()) match attribute_type: case ('ip-src' | 'ip-dst'): return cls._normalise_ip(value) case ('md5' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | 'sha512' | 'sha512/224' | 'sha512/256' | 'sha3-224' | 'sha3-256' | 'sha3-384' | 'sha3-512' | 'ja3-fingerprint-md5' | 'jarm-fingerprint' | 'hassh-md5' | 'hasshserver-md5' | 'hostname' | 'pehash' | 'authentihash' | 'vhash' | 'imphash' | 'telfhash' | 'tlsh' | 'anonymised' | 'cdhash' | 'email' | 'email-src' | 'email-dst' | 'target-email' | 'whois-registrant-email' | 'dom-hash' | 'onion-address'): return value.lower() case 'domain': value = value.lower().strip('.') # Domain is not valid, try to convert to punycode if not cls._is_domain_valid(value): return value.encode('idna').decode('ascii') return value case 'domain|ip': parts = value.lower().split('|') if len(parts) != 2: return value # not a composite domain, ip = parts domain = domain.strip('.') # Domain is not valid, try to convert to punycode if not cls._is_domain_valid(domain): domain = domain.encode('idna').decode('ascii') return f'{domain}|{cls._normalise_ip(ip)}' case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | 'filename|sha512' | 'filename|sha512/224' | 'filename|sha512/256' | 'filename|sha3-224' | 'filename|sha3-256' | 'filename|sha3-384' | 'filename|sha3-512' | 'filename|authentihash' | 'filename|vhash' | 'filename|pehash' | 'filename|tlsh'): # Convert hash to lowercase composite = value.split('|') if len(composite) != 2: return value # not a composite filename, _hash = composite return f'{filename}|{_hash.lower()}' case 'http-method' | 'hex': return value.upper() case 'vulnerability': value = value.replace('–', '-') source = value.split('-')[0] if source in ('cve', 'gcve'): return value.upper() return value case 'weakness': return value.replace('–', '-').upper() case 'cc-number' | 'bin': return re.sub(REMOVE_NON_NUM_RE, '', value) case 'iban' | 'bic': return re.sub(REMOVE_NON_ALPHANUM_CAP_RE, '', value.upper()) case 'prtn' | 'whois-registrant-phone' | 'phone-number': if value.startswith('00'): value = f'+{value[2:]}' value = re.sub(REMOVE_PHONE_PARENTHESIS_RE, '', value) return re.sub(SANITISE_PHONE_NUMBER_RE, '', value) case 'x509-fingerprint-md5' | 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1': return value.replace(':', '').lower() case 'ip-dst|port' | 'ip-src|port': if value.count(':') >= 2: # (ipv6|port) - tokenize ip and port if '|' in value: # 2001:db8::1|80 ip, port = value.split('|', 1) return f'{cls._normalise_ip(ip)}|{port}' if value.startswith('[') and ']' in value: # [2001:db8::1]:80 ip, port = value[1:].split(']', 1) return f'{cls._normalise_ip(ip)}|{port.lstrip(":")}' for separator in ('.', ' port ', 'p', '#'): if separator in value: ip, port = value.split(separator, 1) return f'{cls._normalise_ip(ip)}|{port}' # 2001:db8::1:80 this one is ambiguous *parts, port = value.split(':') return f'{cls._normalise_ip(":".join(parts))}|{port}' for separator in (':', '|'): if separator in value: # ipv4:port or ipv4|port ip, port = value.split(separator, 1) return f'{cls._normalise_ip(ip)}|{port}' return value case 'mac-address' | 'mac-eui-64': value = re.sub(REMOVE_NON_ALPHANUM_RE, '', value).lower() return ':'.join(value[i:i+2] for i in range(0, 12, 2)) case 'hostname|port': return value.replace(':', '|').lower() case 'boolean': if isinstance(value, int): return bool(value) if isinstance(value, str): value = value.lower() if value in ('true', '1'): return True if value in ('false', '0'): return False return value case 'datetime': if isinstance(value, str): try: return datetime.fromisoformat(value) except ValueError: try: return parse(value) except Exception: return value return value case 'AS': if value.upper().startswith('AS'): value = value[2:] # remove 'AS' if '.' in value: # maybe value is in asdot notation multiplier, remainder = value.split('.', 1) if cls._is_positive_integer(multiplier) and cls._is_positive_integer(remainder): return int(multiplier) * 65536 + int(remainder) return value case _: return value @classmethod def validate(cls, attribute_type, value): match attribute_type: case ('md5' | 'imphash' | 'sha1' | 'sha224' | 'sha256' | 'sha384' | 'sha512' | 'sha512/224' | 'sha512/256' | 'sha3-224' | 'sha3-256' | 'sha3-384' | 'sha3-512' | 'authentihash' | 'ja3-fingerprint-md5' | 'jarm-fingerprint' | 'hassh-md5' | 'hasshserver-md5' | 'x509-fingerprint-md5' | 'x509-fingerprint-sha256' | 'x509-fingerprint-sha1' | 'git-commit-id' | 'dom-hash'): if cls._is_hash_valid(attribute_type, value): return True length = HASH_HEX_LENGTH[attribute_type] return ( 'Checksum has an invalid length or format (expected: ' f'{length} hexadecimal characters). Please double check ' 'the value or select type "other".' ) case 'tlsh': if cls._is_tlsh_valid(value): return True return ( 'Checksum has an invalid length or format (expected: at ' 'least 35 hexadecimal characters, optionally starting ' 'with t1 instead of hexadecimal characters). Please ' 'double check the value or select type "other".' ) case 'telfhash': if cls._is_telfhash_valid(value): return True return ( 'Checksum has an invalid length or format (expected: ' '70 or 72 hexadecimal characters). Please double check ' 'the value or select type "other".' ) case 'pehash': if cls._is_hash_valid('pehash', value): return True return ( "The input doesn't match the expected sha1 format " '(expected: 40 hexadecimal characters). Keep in mind that ' 'MISP currently only supports SHA1 for PEhashes, if you ' 'would like to get the support extended to other hash ' 'types, make sure to create a github ticket about it at ' 'https://github.com/MISP/MISP!' ) case 'ssdeep': if cls._is_ssdeep(value): return True return 'Invalid SSDeep hash. The format has to be blocksize:hash:hash' case 'impfuzzy': if value.count(':') == 2: imports, *_ = value.split(':') if cls._is_positive_integer(imports): return True return 'Invalid impfuzzy format. The format has to be imports:hash:hash' case 'cdhash': if CDHASH_RE.fullmatch(value): return True return ( "The input doesn't match the expected format " '(expected: 40 or more hexadecimal characters)' ) case 'http-method': if value in HTTP_METHODS: return True return 'Unknown HTTP method.' case 'filename|pehash': if re.fullmatch(r'^.+\|[0-9a-f]{40}$', value): return True return ( "The input doesn't match the expected filename|sha1 format " '(expected: filename|40 hexadecimal characters). Keep in ' 'mind that MISP currently only supports SHA1 for PEhashes, ' 'if you would like to get the support extended to other ' 'hash types, make sure to create a github ticket about it ' 'at https://github.com/MISP/MISP!' ) case ('filename|md5' | 'filename|sha1' | 'filename|imphash' | 'filename|sha224' | 'filename|sha256' | 'filename|sha384' | 'filename|sha512' | 'filename|sha512/224' | 'filename|sha512/256' | 'filename|sha3-224' | 'filename|sha3-256' | 'filename|sha3-384' | 'filename|sha3-512' | 'filename|authentihash'): length = HASH_HEX_LENGTH[attribute_type[9:]] # strip `filename|`] if re.fullmatch(r'^.+\|[0-9a-f]{' + str(length) + r'}$', value): return True return ( 'Checksum has an invalid length or format (expected:' f'filename|{length} hexadecimal characters). Please' 'double check the value or select type "other".' ) case 'filename|ssdeep': composite = value.split('|') if len(composite) == 2: filename, ssdeep = composite if '\n' in filename: return 'Filename must not contain new line character.' if cls._is_ssdeep(ssdeep): return True return 'Invalid ssdeep hash (expected: blocksize:hash:hash).' case 'filename|tlsh': composite = value.split('|') if len(composite) == 2: filename, tlsh = composite if '\n' in filename: return 'Filename must not contain new line character.' if cls._is_tlsh_valid(tlsh): return True return ( 'TLSH hash has an invalid length or format (expected: ' 'filename|at least 35 hexadecimal characters, optionally ' 'starting with t1 instead of hexadecimal characters). ' 'Please double check the value or select type "other".' ) case 'filename|vhash': if re.fullmatch(r'^.+\|.+$', value): return True return ( 'Checksum has an invalid length or format (expected: ' 'filename|string characters). Please double check the ' 'value or select type "other".' ) case 'ip-src' | 'ip-dst': return cls._validate_ip(value) case 'port': if cls._is_port_valid(value): return True return 'Port numbers have to be integers between 1 and 65535.' case 'ip-dst|port' | 'ip-src|port': composite = value.split('|') if len(composite) != 2: return 'Invalid ip-dst|port format.' ip, port = composite if not cls._is_port_valid(port): return 'Port numbers have to be integers between 1 and 65535.' return cls._validate_ip(ip) case 'onion-address': if ONION_RE.fullmatch(value): return True return 'Onion address has an invalid format.' case 'mac-address': if MAC_ADDRESS_RE.fullmatch(value): return True return 'MAC address has an invalid format.' case 'mac-eui-64': if MAC_EUI_64_RE.fullmatch(value): return True return 'MAC EUI-64 address has an invalid format.' case 'hostname' | 'domain': if cls._is_domain_valid(value): return True return ( f'{attribute_type.capitalize()} has an invalid format. ' 'Please double check the value or select type "other".' ) case 'hostname|port': composite = value.split('|') if len(composite) != 2: return 'Invalid hostname|port format.' hostname, port = composite if not cls._is_domain_valid(hostname): return 'Hostname has an invalid format.' if not cls._is_port_valid(port): return 'Port numbers have to be integers between 1 and 65535.' return True case 'domain|ip': composite = value.split('|') if len(composite) != 2: return 'Invalid domain|ip format.' domain, ip = composite if not cls._is_domain_valid(domain): return 'Domain has an invalid format.' return cls._validate_ip(ip) case ('email' | 'email-src' | 'eppn' | 'email-dst' | 'target-email' | 'whois-registrant-email' | 'dns-soa-email' | 'jabber-id'): # we don't use the native function to prevent issues with partial email addresses if EMAIL_RE.fullmatch(value): return True return ( 'Email address has an invalid format. Please double ' 'check the value or select type "other".' ) case 'vulnerability': if VULNERABILITY_RE.fullmatch(value): return True return 'Invalid vulnerability ID format.' case 'weakness': if WEAKNESS_RE.fullmatch(value): return True return 'Invalid format. Expected: CWE-x...' case 'windows-service-name' | 'windows-service-displayname': if len(value) > 256 or re.search(r'[\\/]', value): return ( 'Invalid format. Only values shorter than 256 characters ' "that don't include any forward or backward slashes are allowed." ) return True case ('mutex' | 'process-state' | 'snort' | 'suricata' | 'bro' | 'zeek' | 'community-id' | 'anonymised' | 'pattern-in-file' | 'pattern-in-traffic' | 'pattern-in-memory' | 'filename-pattern' | 'pgp-public-key' | 'pgp-private-key' | 'yara' | 'stix2-pattern' | 'sigma' | 'gene' | 'kusto-query' | 'mime-type' | 'identity-card-number' | 'cookie' | 'attachment' | 'malware-sample' | 'comment' | 'text' | 'other' | 'cpe' | 'email-attachment' | 'email-body' | 'email-header' | 'first-name' | 'middle-name' | 'last-name' | 'full-name'): return True case 'link': parsed = urlparse(value) if all([parsed.scheme, parsed.netloc]): return True return 'Link has to be a valid URL.' case 'hex': if HEX_RE.fullmatch(value): return True return 'Value has to be a hexadecimal string.' case ('target-user' | 'campaign-name' | 'campaign-id' | 'threat-actor' | 'target-machine' | 'target-org' | 'target-location' | 'target-external' | 'email-subject' | 'malware-type' | 'url' | 'uri' | 'user-agent' | 'regkey' | 'regkey|value' | 'filename' | 'pdb' | 'windows-scheduled-task' | 'whois-registrant-name' | 'whois-registrant-org' | 'whois-registrar' | 'whois-creation-date' | 'date-of-birth' | 'place-of-birth' | 'gender' | 'passport-number' | 'passport-country' | 'passport-expiration' | 'redress-number' | 'nationality' | 'visa-number' | 'issue-date-of-the-visa' | 'primary-residence' | 'country-of-residence' | 'special-service-request' | 'frequent-flyer-number' | 'travel-details' | 'payment-details' | 'place-port-of-original-embarkation' | 'place-port-of-clearance' | 'place-port-of-onward-foreign-destination' | 'passenger-name-record-locator-number' | 'email-dst-display-name' | 'email-src-display-name' | 'email-reply-to' | 'email-x-mailer' | 'email-mime-boundary' | 'email-thread-index' | 'email-message-id' | 'github-username' | 'github-repository' | 'github-organisation' | 'twitter-id' | 'dkim' | 'dkim-signature' | 'favicon-mmh3' | 'chrome-extension-id' | 'mobile-application-id' | 'azure-application-id' | 'named pipe'): if '\n' in value: return 'Value must not contain new line character.' return True case 'ssh-fingerprint': if cls._is_ssh_fingerprint(value): return True return 'SSH fingerprint must be in MD5 or SHA256 format.' case 'datetime': if isinstance(value, datetime): return True try: parse(value) return True except Exception: return 'Datetime has to be in the ISO 8601 format.' case 'size-in-bytes' | 'counter': if cls._is_positive_integer(value): return True return 'The value has to be a whole number greater or equal 0.' # case 'targeted-threat-index': # if (!is_numeric($value) || $value < 0 || $value > 10) { # return __('The value has to be a number between 0 and 10.'); # } # return True case 'integer': try: int(value) return True except ValueError: return 'The value has to be an integer value.' case 'iban' | 'bic' | 'btc' | 'dash' | 'xmr': if value.isalnum(): return True return f'{attribute_type.upper()} has to be alphanumeric.' case 'vhash': if len(value) > 0: return True return 'Vhash must not be an empty string.' case ('bin' | 'cc-number' | 'bank-account-nr' | 'aba-rtn' | 'prtn' | 'phone-number' | 'whois-registrant-phone' | 'float'): try: float(value) return True except ValueError: return f'The value has to be a valid {attribute_type}' case 'cortex': try: json.loads(value) return True except json.JSONDecodeError: return 'The Cortex analysis result has to be a valid JSON string.' case 'boolean': if isinstance(value, bool): return True return 'The value has to be either true or false.' case 'AS': if cls._is_positive_integer(value) and int(value) <= 4294967295: return True return 'AS number have to be integer between 1 and 4294967295' case 'uuid': if UUID_RE.fullmatch(value): return True return 'The value has to be a valid UUID format.' case _: return value @staticmethod def _handle_4byte_unicode(value): # Replace 4-byte UTF-8 characters with '?' return ''.join(ch if ord(ch) <= 0xFFFF else '?' for ch in value) @staticmethod def _is_domain_valid(value): return DOMAIN_RE.fullmatch(value) @staticmethod def _is_hash_valid(attribute_type, value): return len(value) == HASH_HEX_LENGTH[attribute_type] and HEX_RE.fullmatch(value) @classmethod def _is_port_valid(cls, value): return cls._is_positive_integer(value) and int(value) in range(1, 65536) @staticmethod def _is_positive_integer(value: int | str) -> bool: if isinstance(value, int): return value >= 0 return value.isdigit() and int(value) >= 0 @staticmethod def _is_ssdeep(value): return SSDEEP_RE.fullmatch(value) @classmethod def _is_ssh_fingerprint(cls, value): if value.startswith('SHA256:'): try: decoded = b64decode(value[7:]) except Exception: return False return decoded is not None and len(decoded) == 32 if value.startswith('MD5:'): return cls._is_hash_valid('md5', value[3:].replace(':', '')) return cls._is_hash_valid('md5', value.replace(':', '')) @staticmethod def _is_tlsh_valid(value): if value.startswith('t'): value = value.lstrip('t') return len(value) > 35 and HEX_RE.fullmatch(value) @staticmethod def _is_telfhash_valid(value): return len(value) in (70, 72) and HEX_RE.fullmatch(value) @staticmethod def _normalise_ip(value): # If IP is a CIDR if '/' in value: address, length = value.split('/', 2) if ':' in address: try: address = str(ipaddress.IPv6Address(address)) except ipaddress.AddressValueError: return value if length == '128': return address else: try: address = str(ipaddress.IPv4Address(address)) except ipaddress.AddressValueError: return value if length == '32': return address return f'{address}/{length}' try: return ( str(ipaddress.IPv6Address(value)) if ':' in value else str(ipaddress.IPv4Address(value)) ) except ipaddress.AddressValueError: return value @classmethod def _refang_value(cls, attribute_type, value): for rule in REFANG_REGEX_TABLE: if attribute_type in rule['types']: # type: ignore value = rule['from'].sub(rule['to'], value) # type: ignore return cls._handle_4byte_unicode(value) @classmethod def _validate_ip(cls, value): if '/' in value: composite = value.split('/') if len(composite) != 2 or not cls._is_positive_integer(composite[1]): return ('Invalid CIDR notation value found.') address, length = composite try: ip_obj = ipaddress.ip_address(address) if isinstance(ip_obj, ipaddress.IPv4Address): if int(length) > 32: return ( 'Invalid CIDR notation value found, for ' 'IPv4 must be lower or equal 32.' ) return True if isinstance(ip_obj, ipaddress.IPv6Address): if int(length) > 128: return ( 'Invalid CIDR notation value found, for ' 'IPv6 must be lower or equal 128.' ) return True except ValueError: return 'IP address has an invalid format.' try: ipaddress.ip_address(value) except ValueError: return 'IP address has an invalid format.' return True
[docs] def validate_attribute(attribute: dict | MISPAttribute) -> MISPAttribute: # type: ignore """ Validates a MISP Attribute and returns a MISPAttribute if valid. Replicates MISP server-side validation behavior on Attributes. :param attribute: dict or MISPAttribute to validate :return: Validated MISPAttribute object :raises PyMISPError: If the attribute cannot be loaded or a validation error occurs :raises ValidationError: If the attribute is invalid """ if not isinstance(attribute, MISPAttribute): try: attribute = _load_misp_attribute(attribute) except Exception as e: message = f'Error loading Attribute: {e}' logger.error(message) raise PyMISPError(message) is_edited = attribute.edited try: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) except Exception as e: message = f'Error validating Attribute <{attribute.uuid}>: {e}' logger.error(message) raise PyMISPError(message) if validated is not True: message = _message_logging(validated, attribute) logger.warning(message) raise ValidationError(message) if attribute.value != value: attribute.value = value attribute.edited = is_edited return attribute
[docs] def validate_attributes(attributes: list, errors: dict) -> Generator: # type: ignore """ Validates a list of MISP attributes and skips any that doesn't validate. :param attributes: List of MISPAttribute objects :param errors: Dictionary to populate with any validation error messages :return: Generator yielding only valid MISPAttribute objects """ for attribute in attributes: try: misp_attribute = validate_attribute(attribute) except ValidationError as e: _populate_error_message(errors, 'warnings', str(e)) continue except PyMISPError as e: _populate_error_message(errors, 'errors', str(e)) continue yield misp_attribute
[docs] def validate_event(event: dict | MISPEvent, errors: dict) -> MISPEvent: # type: ignore """ Validates an event and skips Attributes or Object Attributes that don't validate. :param event: MISPEvent object or dict representing an event :param errors: Dictionary to populate with any validation error messages :return: MISPEvent with only valid attributes :raises PyMISPError: If the event cannot be loaded """ if not isinstance(event, MISPEvent): try: event = _load_misp_event(event) except Exception as e: message = f'Error loading Event: {e}' logger.error(message) raise PyMISPError(message) # Validation of Attributes event.attributes = list(validate_attributes(event.attributes, errors)) # Validation of Objects event.objects = list(validate_objects(event.objects, errors)) return event
[docs] def validate_object(misp_object: dict | MISPObject, errors: dict) -> MISPObject: # type: ignore """ Validates an object and skips any Object Attribute that doesn't validate. :param misp_object: MISPObject object or dict representing an object :param errors: Dictionary to populate with any validation error messages :return: MISPObject with only valid attributes :raises PyMISPError: If the object cannot be loaded """ if not isinstance(misp_object, MISPObject): try: misp_object = _load_misp_object(misp_object) except Exception as e: message = f'Error loading Object: {e}' logger.error(message) raise PyMISPError(message) is_edited = misp_object.edited # Validation of Object Attributes misp_object.attributes = list(_validate_object_attributes(misp_object, errors)) misp_object.edited = is_edited return misp_object
[docs] def validate_objects(misp_objects: list, errors: dict) -> Generator: # type: ignore """ Validates a list of MISP objects and skips any Object Attribute that doesn't validate. :param misp_objects: List of MISPObject objects :param errors: Dictionary to populate with any validation error messages :return: Generator yielding only valid MISPObject objects """ for mispObject in misp_objects: try: misp_object = validate_object(mispObject, errors) except PyMISPError as e: _populate_error_message(errors, 'errors', str(e)) continue yield misp_object
def _load_misp_attribute(attribute: dict) -> MISPAttribute: # type: ignore misp_attribute = MISPAttribute() misp_attribute.from_dict(**attribute) return misp_attribute def _load_misp_event(event: dict) -> MISPEvent: # type: ignore misp_event = MISPEvent() misp_event.from_dict(**event) return misp_event def _load_misp_object(mispObject: dict) -> MISPObject: # type: ignore misp_object = MISPObject(mispObject['name']) misp_object.from_dict(**mispObject) return misp_object def _message_logging(validated: str, attribute: MISPAttribute, misp_object: MISPObject | None = None) -> str: message = f'Failed validation for {attribute.type} Attribute <{attribute.uuid}>' if misp_object is not None: message = f'{message} in {misp_object.name} Object <{misp_object.uuid}>' return f'{message}:\n{attribute.value} - {validated}' def _populate_error_message(errors: dict[str, list[str]], key: str, message: str) -> None: try: errors[key].append(message) except KeyError: errors[key] = [message] def _validate_object_attributes(misp_object: MISPObject, errors: dict) -> Generator: # type: ignore for attribute in misp_object.attributes: is_edited = attribute.edited try: value = AttributeValidationTool.modifyBeforeValidation(attribute.type, attribute.value) validated = AttributeValidationTool.validate(attribute.type, value) except Exception as e: message = f'Error validating Object Attribute <{attribute.uuid}> in Object <{misp_object.uuid}>: {e}' logger.error(message) _populate_error_message(errors, 'errors', message) continue if validated is not True: message = _message_logging(validated, attribute, misp_object) logger.warning(message) _populate_error_message(errors, 'warnings', message) continue if attribute.value != value: attribute.value = value attribute.edited = is_edited yield attribute