rules_checker.py
| 15.2 KB | Satir:
0
| py
Geri
""" Usage /opt/imunify360/venv/bin/python3 /opt/imunify360/venv/share/imunify360/scripts/rules_checker.py <action> choose an action from ACTIONS map e.g. /opt/imunify360/venv/bin/python3 rules_checker.py recreate Actions: - `recreate` - recreates rules if needed and checks ipsets consistent - `clear` - waits RulesChecker stop and destroys all rules and ipsets Actions based on lazy_init plugin placed in im360.utils.lazy_init """ import itertools import asyncio import json from pathlib import Path import os import sys import time import logging import argparse from defence360agent.internals import logger as lg from defence360agent.internals.global_scope import g from defence360agent.model import instance, tls_check, simplification, infected_domain from defence360agent.contracts.config import Model, Merger from im360.contracts.config import IPSET_LISTS_PATH from im360.files import WHITELISTS, Index from im360.internals.core import ip_versions from im360.internals.strategy import Strategy from im360.model import ( firewall, incident, messages_to_send, proactive, ) from im360.utils.lazy_init import ( RulesChecker, RealProtector, RULES_CHECK_IN_PROGRESS, ) logger = logging.getLogger("rules-checker") # go-resident reads from stdout in case we have errors handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) STATE = {"last_ipset_check": 0.0} DAY = 24 * 60 * 60 REAL_PROTECTOR_STATE = Path("/var/imunify360/.realprotector.state") RULES_CHECKER_STATE = Path("/var/imunify360/.ruleschecker.state") RULES_CHECKER_EXTERN_STATE = Path( "/var/imunify360/.ruleschecker.extern_state.json" ) REAL_PROTECTOR_STATE_JSON = Path("/var/imunify360/.realprotector.state.json") class RealProtectorState: """RealProtector state to save and restore.""" def __init__(self, _ws, _pb_dmv, _pbm, _de, _lic): self._webshield_status = _ws self._port_blocking_deny_mode_values = _pb_dmv self._port_blocking_mode = _pbm self._dos_enabled = _de self.last_ipset_check = _lic def __str__(self) -> str: return ( "RealProtectorState(" f"_webshield_status={self._webshield_status}, " f"_port_blocking_mode={self._port_blocking_mode}, " "_port_blocking_deny_mode_values" f"={self._port_blocking_deny_mode_values}," f"last_ipset_check={self.last_ipset_check}," f"_dos_enabled={self._dos_enabled})" ) def to_json(self) -> dict: return { "webshield_status": self._webshield_status, "port_blocking_deny_mode_values": self._port_blocking_deny_mode_values, "port_blocking_mode": self._port_blocking_mode, "dos_enabled": self._dos_enabled, "last_ipset_check": self.last_ipset_check, } @classmethod def from_json(cls, data: dict) -> "RealProtectorState": # webshield_status and port_blocking_deny_mode_values are tuples in # RealProtector; the previous pickle round-trip preserved that type # but JSON arrays decode as lists. RulesChecker compares them with # != against the current tuple value, so without coercion every # restart would see a spurious "change" and recreate firewall rules. ws = data.get("webshield_status") if isinstance(ws, list): ws = tuple(ws) pbdmv = data.get("port_blocking_deny_mode_values") if isinstance(pbdmv, list): pbdmv = tuple(pbdmv) return cls( _ws=ws, _pb_dmv=pbdmv, _pbm=data.get("port_blocking_mode"), _de=data.get("dos_enabled", True), _lic=data.get("last_ipset_check", 0), ) class RulesCheckerState: def __init__( self, interface_conf, ipset_outdated_events, outdated_ipsets, versions ): self._interface_conf = interface_conf self._ipsets_outdated_events = ipset_outdated_events self.outdated_ipsets = outdated_ipsets self.versions = versions def __str__(self) -> str: return ( "RulesCheckerState(" f"_interface_conf={self._interface_conf}, " f"_ipsets_outdated_events={self._ipsets_outdated_events})" f"outdated_ipsets={self.outdated_ipsets})" f"versions={self.versions})" ) def make_external_state(self) -> dict: versions_data = {} for version, version_state in self.versions.items(): versions_data[version] = { "transient_error_on_create": version_state.transient_error_on_create, "errors": version_state.errors, "next_try_time": version_state.next_try_time, "running": version_state.running, } _outdated_ipsets = { ver: list(sets) for ver, sets in self.outdated_ipsets.items() } return { "versions": versions_data, "outdated_ipsets": _outdated_ipsets, # Persist interface config so recreate_rules_if_needed can # detect interface changes across restarts; the pickle path # used to do this implicitly via the pickled object graph. "interface_conf": self._interface_conf, } async def _check_for_config_change(rc: RulesChecker, rp: RealProtector): """Checking that config state is consistent with the current state.""" rp._rules_checker = rc await rp._on_config_update_unlocked(None) async def recreate_rules(rc: RulesChecker, rp: RealProtector, **kwargs): """Recreates rules if needed and checks ipsets consistent.""" logger.info("Checking that need to recreate rules") # TODO: check if we need to check it too often # for Python implementation we do it only once per day if time.time() - STATE["last_ipset_check"] < DAY: logger.info("Skip ipsets check") else: await rc._check_ipsets_consistent() STATE["last_ipset_check"] = time.time() await rc.recreate_rules_if_needed() logger.info("IP sets verification and initialization completed") async def check_config_update(rc: RulesChecker, rp: RealProtector, **kwargs): """Checking config update.""" logger.info("Checking config update") await _check_for_config_change(rc, rp) logger.info("Completed") async def check_global_whitelist_update( rc: RulesChecker, rp: RealProtector, **kwargs ): """Checking config update.""" rp._rules_checker = rc logger.info("Checking global whitelist update") await rp.process_global_whitelist_update() logger.info("Completed") async def check_country_update(rc: RulesChecker, rp: RealProtector, **kwargs): """Checking config update.""" rp._rules_checker = rc logger.info("Checking country list update") await rp.process_country_list_update() logger.info("Completed") async def recreate_rules_on_strategy_change( rc: RulesChecker, rp: RealProtector, **kwargs ): """Recreates rules if needed and checks ipsets consistent.""" logger.info("Checking that need to recreate rules on strategy change") await rc.recreate_rules_if_needed() logger.info( "Firewall rules recreated due to StrategyChange %s", Strategy.current ) async def check_ipsets_consistent( rc: RulesChecker, rp: RealProtector, check_all=False, **kwargs ): """Check ipsets consistent.""" logger.info("Checking ipsets consistent") await rc._check_ipsets_consistent(check_all) STATE["last_ipset_check"] = time.time() if any(sets for sets in rc.outdated_ipsets.values()): await rc.recreate_rules_if_needed() logger.info("Completed") async def _stop_and_wait(rc: RulesChecker): rc.should_stop() await rc.wait() async def clear_everything(rc: RulesChecker, rp: RealProtector, **kwargs): """Clear rules and ipsets on stop.""" logger.info("Clear rules and ipsets") await _stop_and_wait(rc) await rc.clear_everything() logger.info("Completed") async def clear_rules(rc: RulesChecker, rp: RealProtector, **kwargs): """Clear rules on stop.""" logger.info("Clear rules") await _stop_and_wait(rc) await rc.clear_rules() logger.info("Completed") async def force_recreate_rules_and_refill_ports_if_needed( rc: RulesChecker, rp: RealProtector, refill=False, **kwargs ): rp._rules_checker = rc await rc.recreate_rules_if_needed(recreate_any_way=True) logger.info("Firewall rules recreated due to ConfigUpdate") if refill and await rp._refill_port_blocking_ipsets(): logger.info("Blocked ports ipsets reffiled") async def refill_ports(rc: RulesChecker, rp: RealProtector, **kwargs): rp._rules_checker = rc if await rp._refill_port_blocking_ipsets(): logger.info("Blocked ports deny mode updated on ConfigUpdate") def setup_environment(): """Setup environment for rules checker.""" lg.reconfigure() ip_versions.init() instance.db.init(f"file:{Model.PATH}?mode=ro", uri=True) instance.db.execute_sql("ATTACH ? AS resident", (Model.RESIDENT_PATH,)) instance.db.execute_sql("ATTACH ? AS ipsetlists", (IPSET_LISTS_PATH,)) models = list(itertools.chain( *[ simplification.get_models(module) for module in ( simplification, firewall, incident, messages_to_send, proactive, infected_domain, ) ] )) instance.db.bind(models) if os.environ.get("DEBUG") == "true": g.DEBUG = True Index.add_type(WHITELISTS, "whitelist/v2", 0o770, 0o660, all_zip=True) def restore_state(rp: RealProtector, rc: RulesChecker): """Restore RealProtector state.""" Strategy.current = Strategy.get() # Remove legacy pickle files to prevent deserialization of arbitrary objects for legacy in (REAL_PROTECTOR_STATE, RULES_CHECKER_STATE): try: if legacy.exists(): legacy.unlink() except Exception: pass try: if REAL_PROTECTOR_STATE_JSON.exists(): with REAL_PROTECTOR_STATE_JSON.open("r") as f: rp_state = RealProtectorState.from_json(json.load(f)) rp._webshield_status = rp_state._webshield_status rp._port_blocking_deny_mode_values = ( rp_state._port_blocking_deny_mode_values ) rp._port_blocking_mode = rp_state._port_blocking_mode # next is new functionality and _dos_enabled will be missing # on startup. Since this is used to prevent adding connection # tracking to netfilter we basically had this before set to # True, that is why default value needs to be True rp._dos_enabled = rp_state._dos_enabled STATE["last_ipset_check"] = rp_state.last_ipset_check except Exception as e: logger.error("Failed to restore RealProtector state: %s", e) try: if RULES_CHECKER_EXTERN_STATE.exists(): with RULES_CHECKER_EXTERN_STATE.open("r") as f: extern_state = json.load(f) # Restore versions state from JSON if "versions" in extern_state: for version, vdata in extern_state["versions"].items(): if version in rc.versions: vs = rc.versions[version] vs.transient_error_on_create = vdata.get( "transient_error_on_create", False ) vs.errors = vdata.get("errors", 0) vs.next_try_time = vdata.get("next_try_time", 0) vs.running = True # Restore outdated_ipsets from JSON if "outdated_ipsets" in extern_state: rc.outdated_ipsets = { ver: set(sets) for ver, sets in extern_state["outdated_ipsets"].items() } # Restore the active interface config so # recreate_rules_if_needed can detect interface changes # across restarts (the pickle path persisted this # implicitly). if "interface_conf" in extern_state: rc.active_interface_conf = extern_state["interface_conf"] except Exception as e: logger.error("Failed to restore RulesChecker state: %s", e) return rp, rc def save_state(rp: RealProtector, rc: RulesChecker): """Save RealProtector state.""" rp_state = RealProtectorState( rp._webshield_status, rp._port_blocking_deny_mode_values, rp._port_blocking_mode, rp._dos_enabled, STATE["last_ipset_check"], ) rc_state = RulesCheckerState( rc.active_interface_conf, rc._ipsets_outdated_events, rc.outdated_ipsets, rc.versions, ) with REAL_PROTECTOR_STATE_JSON.open("w") as f: json.dump(rp_state.to_json(), f, indent=4) with RULES_CHECKER_EXTERN_STATE.open("w") as f: json.dump(rc_state.make_external_state(), f, indent=4) ACTIONS = { "recreate": recreate_rules, "clear": clear_everything, "clear-rules": clear_rules, "config-update": check_config_update, "global-whitelist-update": check_global_whitelist_update, "country-update": check_country_update, "strategy-change": recreate_rules_on_strategy_change, "ipsets-consistent": check_ipsets_consistent, "force-recreate-rules": force_recreate_rules_and_refill_ports_if_needed, "refill-ports": refill_ports, } def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Imunify360 Rules Checker CLI", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument( "action", choices=ACTIONS.keys(), help="Action to perform" ) parser.add_argument( "--refill-ports", action="store_true", help="refills ports in force-recreate-rules", ) parser.add_argument( "--check-all", action="store_true", help=( "Check consistency off all ipsets on python side. Should be used" " only in tests" ), ) return parser.parse_args() def main(action, **kwargs): try: RULES_CHECK_IN_PROGRESS.touch() except Exception as e: logger.error("Failed to create RULES_CHECK_IN_PROGRESS file: %s", e) tls_check.reset() setup_environment() Merger.update_merged_config() loop = asyncio.get_event_loop() rp, rc = restore_state(RealProtector(), RulesChecker(loop)) action_func = ACTIONS[action] loop.run_until_complete(action_func(rc, rp, **kwargs)) save_state(rp, rc) logger.info("Script finished") if __name__ == "__main__": args = parse_arguments() kwargs = {} if args.refill_ports: kwargs["refill"] = args.refill_ports if args.check_all: kwargs["check_all"] = args.check_all try: main(args.action, **kwargs) except Exception as e: RULES_CHECK_IN_PROGRESS.unlink(missing_ok=True) logger.exception("rules checker failed with unhandled error: %r", e) sys.exit(2) else: RULES_CHECK_IN_PROGRESS.unlink(missing_ok=True)
Kaydet
Ctrl+S ile kaydet