import json
import os
from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, Optional
from heizer._source.enums import ConsumerStatusEnum
from heizer.env_vars import CONSUMER_STATUS_FILE_PATH
[docs]def write_consumer_status(
consumer_id: str,
status: ConsumerStatusEnum,
pid: int,
consumer_name: Optional[str] = None,
) -> None:
if os.path.exists(CONSUMER_STATUS_FILE_PATH):
data = json.loads(open(CONSUMER_STATUS_FILE_PATH).read())
data.update(
{
consumer_id: {
"name": consumer_name or consumer_id,
"status": status,
"pid": pid,
"timestamp": datetime.utcnow().isoformat(),
}
}
)
else:
data = defaultdict(Dict[str, Any])
data[consumer_id] = {
"name": consumer_name or consumer_id,
"status": status,
"pid": pid,
"timestamp": datetime.utcnow().isoformat(),
}
with open(CONSUMER_STATUS_FILE_PATH, "+w") as f:
json.dump(data, f)
[docs]def read_consumer_status(consumer_id: Optional[str] = None) -> Dict[str, Any]:
with open(CONSUMER_STATUS_FILE_PATH) as f:
data = json.loads(f.read())
if consumer_id:
return data.get(consumer_id, {})
else:
return data