feature_flags.py
| 4.9 KB | Satir:
0
| py
Geri
""" Shared reader for the local feature flags file. The file is written by: - Go resident-agent FeatureFlags plugin (IM360 mode) - Python FeatureFlagsSync plugin (AV mode) Other subsystems (e.g. message_status_publisher) use this module to check individual flag values at runtime. Supported JSON shapes on disk (readers / ``is_enabled``): - Legacy object ``{"mqtt_tracking": true, ...}`` (preferred on disk after sync). - JSON array of enabled names ``["mqtt_tracking"]`` (still accepted). - Legacy wrapper ``{"flags": ["mqtt_tracking", ...]}``. The sync API checksum is always over a **canonical JSON array** of enabled names (same as correlation sync). ``sync_checksum_hex_from_flags_file`` derives that MD5 from whatever shape is on disk. The sync plugin also writes ``FLAGS_PLAIN_PATH`` (``/var/imunify360/feature_flags``): plain text, one enabled flag name per line (sorted), for scripts. """ from __future__ import annotations import hashlib import json import os from typing import Any FLAGS_PATH = "/var/imunify360/feature_flags.json" # Plain list of enabled flag names (one per line), same order as sorted JSON array. FLAGS_PLAIN_PATH = "/var/imunify360/feature_flags" _cached_flags: dict[str, Any] = {} _cached_mtime: float = 0.0 def _normalize_flags_from_file(raw: Any) -> dict[str, Any]: """Map file JSON to a flat name->value dict for :func:`is_enabled`.""" if raw is None: return {} if isinstance(raw, list): out: dict[str, Any] = {} for item in raw: if isinstance(item, str): out[item] = True return out if isinstance(raw, dict): inner = raw.get("flags") if isinstance(inner, list): return _normalize_flags_from_file(inner) return raw return {} def _read_flags() -> dict[str, Any]: global _cached_flags, _cached_mtime try: mtime = os.path.getmtime(FLAGS_PATH) except OSError: _cached_flags = {} _cached_mtime = 0.0 return _cached_flags if mtime == _cached_mtime: return _cached_flags try: with open(FLAGS_PATH) as f: raw = json.load(f) _cached_flags = _normalize_flags_from_file(raw) except (OSError, json.JSONDecodeError): _cached_flags = {} _cached_mtime = mtime return _cached_flags def enabled_flag_names_sorted(flags: Any) -> list[str]: """Return sorted enabled flag names for JSON and plain-text sidecar. Accepts the same shapes as :func:`_normalize_flags_from_file` (array, flat map, ``{"flags": [...]}``) so checksums and sidecars match Go ``enabledNamesSortedForChecksum`` / :func:`is_enabled`. """ if not isinstance(flags, (list, dict)): raise TypeError( f"flags must be list or dict, not {type(flags).__name__}" ) normalized = _normalize_flags_from_file(flags) return sorted(k for k, v in normalized.items() if v) def canonical_sync_flag_list_bytes(names: list[str]) -> bytes: """JSON array bytes used for sync MD5 (matches correlation_api checksum_for_sync_flag_list).""" ordered = sorted(names) return json.dumps(ordered, sort_keys=True, indent=2).encode() def sync_checksum_hex_from_flags_file(path: str) -> str: """MD5 hex of canonical enabled-name array for ``path``, or "" if missing/invalid.""" try: with open(path, encoding="utf-8") as f: raw = json.load(f) except (OSError, UnicodeDecodeError, json.JSONDecodeError): return "" names = enabled_flag_names_sorted(raw) payload = canonical_sync_flag_list_bytes(names) return hashlib.md5(payload, usedforsecurity=False).hexdigest() def legacy_feature_flags_map_bytes(names: list[str]) -> bytes: """On-disk legacy JSON: ``{flag: true, ...}`` with sorted keys.""" d = {n: True for n in sorted({x for x in names if isinstance(x, str)})} return json.dumps(d, sort_keys=True, indent=2).encode() def plain_text_payload_for_enabled_flags(flags: Any) -> bytes: """Body for ``FLAGS_PLAIN_PATH``: one name per line, trailing newline if non-empty.""" names = enabled_flag_names_sorted(flags) if not names: return b"" return ("\n".join(names) + "\n").encode() def serialize_feature_flags_file_payload(flags: Any) -> bytes: """Serialize dict flags for writing ``FLAGS_PATH`` (legacy map only).""" if isinstance(flags, dict): return json.dumps(flags, sort_keys=True, indent=2).encode() raise TypeError(f"flags must be dict, not {type(flags).__name__}") def is_enabled(flag_name: str, default: bool = False) -> bool: """Return whether *flag_name* is enabled. If the file is missing, unreadable, or the flag is absent, *default* is returned. Defaults to False so unknown flags are treated as disabled unless the caller explicitly opts in. """ flags = _read_flags() value = flags.get(flag_name) if value is None: return default return bool(value)
Kaydet
Ctrl+S ile kaydet