Source code for heizer._source.status_manager

import json
import os
from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, Optional

from heizer._source.enums import ConsumerStatusEnum
from heizer.env_vars import CONSUMER_STATUS_FILE_PATH


[docs]def write_consumer_status( consumer_id: str, status: ConsumerStatusEnum, pid: int, consumer_name: Optional[str] = None, ) -> None: if os.path.exists(CONSUMER_STATUS_FILE_PATH): data = json.loads(open(CONSUMER_STATUS_FILE_PATH).read()) data.update( { consumer_id: { "name": consumer_name or consumer_id, "status": status, "pid": pid, "timestamp": datetime.utcnow().isoformat(), } } ) else: data = defaultdict(Dict[str, Any]) data[consumer_id] = { "name": consumer_name or consumer_id, "status": status, "pid": pid, "timestamp": datetime.utcnow().isoformat(), } with open(CONSUMER_STATUS_FILE_PATH, "+w") as f: json.dump(data, f)
[docs]def read_consumer_status(consumer_id: Optional[str] = None) -> Dict[str, Any]: with open(CONSUMER_STATUS_FILE_PATH) as f: data = json.loads(f.read()) if consumer_id: return data.get(consumer_id, {}) else: return data