Skip to content

Commit e724508

Browse files
committed
feature: working version of RAS message storage handler
1 parent b89870f commit e724508

File tree

3 files changed

+14
-329
lines changed

3 files changed

+14
-329
lines changed

remedial_action_schedules/config.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ RMQ_QUEUE_IN = object-storage.schedules.csa
33

44
[HANDLER]
55
CONVERTER_KEY_MODE = local
6-
ELASTIC_SCHEDULES_INDEX = csa-remedial-action-schedules
6+
ELASTIC_SCHEDULES_INDEX = csa-optimized-schedules
77

remedial_action_schedules/handlers.py

Lines changed: 7 additions & 325 deletions
Original file line numberDiff line numberDiff line change
@@ -3,320 +3,24 @@
33
import json
44
from datetime import datetime
55
from pathlib import Path
6-
from typing import Any, Dict, List, Optional, Set, Tuple, Union
7-
from collections import defaultdict
8-
from rdflib import Graph, URIRef, Literal
9-
from rdflib.namespace import RDF, XSD
106
import pandas as pd
117
import config
128
from loguru import logger
139
from integrations.elastic import Elastic
1410
from common.config_parser import parse_app_properties
11+
from common.rdf_converter import convert_cim_rdf_to_json, normalize_cim_payload
1512

1613
parse_app_properties(caller_globals=globals(),
1714
path=str(Path(__file__).parent.joinpath("config.properties")),
1815
section="HANDLER",
1916
eval_types=True)
2017

2118

22-
# ---------- Helper functions ----------
23-
def _strip_namespace(uri_or_qname: str) -> str:
24-
"""Remove namespace/prefix, keep only local name (e.g., 'nc.X.y'|'...#y' -> 'X.y' or 'y')."""
25-
u = str(uri_or_qname)
26-
if ":" in u and not u.startswith("http"):
27-
u = u.split(":", 1)[1]
28-
if "#" in u:
29-
u = u.split("#", 1)[1]
30-
elif "/" in u:
31-
u = u.rsplit("/", 1)[1]
32-
return u
33-
34-
def _literal_to_py(lit: Literal):
35-
dt = lit.datatype
36-
if dt in (XSD.integer, XSD.int, XSD.long, XSD.short, XSD.byte,
37-
XSD.unsignedInt, XSD.unsignedLong, XSD.unsignedShort, XSD.unsignedByte):
38-
try: return int(lit)
39-
except: return str(lit)
40-
if dt in (XSD.decimal, XSD.double, XSD.float):
41-
try: return float(lit)
42-
except: return str(lit)
43-
if dt in (XSD.boolean,):
44-
s = str(lit).strip().lower()
45-
return s in ("true", "1", "yes")
46-
return str(lit)
47-
48-
def _best_id_for_subject(g: Graph, s: URIRef) -> str:
49-
"""Prefer fragment, then mRID, then last path segment."""
50-
uri = str(s)
51-
if "#" in uri:
52-
frag = uri.split("#", 1)[1]
53-
if frag:
54-
return frag
55-
mrid = URIRef("http://iec.ch/TC57/2013/CIM-schema-cim16#IdentifiedObject.mRID")
56-
for _, _, v in g.triples((s, mrid, None)):
57-
if isinstance(v, Literal):
58-
return str(v)
59-
return uri.rsplit("/", 1)[-1]
60-
61-
def _class_of_subject(g: Graph, s: URIRef) -> Optional[str]:
62-
for _, _, t in g.triples((s, RDF.type, None)):
63-
if isinstance(t, URIRef):
64-
try:
65-
return _strip_namespace(g.qname(t))
66-
except Exception:
67-
return _strip_namespace(str(t))
68-
return None
69-
70-
def _localname(uri: Union[str, URIRef]) -> str:
71-
return _strip_namespace(uri)
72-
73-
74-
class CIMFlattener:
75-
def __init__(self, g: Graph, *, inline_depth: int = 99, include_uri: bool = False, key_mode: str = "qualified"):
76-
"""
77-
"qualified" -> "IdentifiedObject.mRID"; "local" -> "mRID"
78-
"""
79-
if key_mode not in ("qualified", "local"):
80-
raise ValueError("key_mode must be 'qualified' or 'local'")
81-
self.g = g
82-
self.inline_depth = inline_depth
83-
self.include_uri = include_uri
84-
self.key_mode = key_mode
85-
86-
# Build incoming index once. Exclude md:FullModel nodes as sources (header).
87-
self._incoming_index: Dict[URIRef, List[Tuple[URIRef, URIRef]]] = defaultdict(list)
88-
for s, p, o in self.g.triples((None, None, None)):
89-
if isinstance(o, URIRef):
90-
if not self._is_fullmodel(s):
91-
self._incoming_index[o].append((s, p))
92-
93-
def _is_fullmodel(self, s: URIRef) -> bool:
94-
"""Detect md:FullModel or any *FullModel class/localname (prefix-agnostic)."""
95-
for _, _, t in self.g.triples((s, RDF.type, None)):
96-
if isinstance(t, URIRef) and _localname(t).endswith("FullModel"):
97-
return True
98-
return _localname(s).endswith("FullModel")
99-
100-
def _format_key(self, qname_or_uri: str) -> str:
101-
"""
102-
Strip namespace, then optionally drop the class segment before the final dot.
103-
- qualified: 'IdentifiedObject.mRID'
104-
- local: 'mRID'
105-
"""
106-
base = _strip_namespace(qname_or_uri)
107-
if self.key_mode == "local" and "." in base:
108-
return base.rsplit(".", 1)[-1]
109-
return base
110-
111-
def _key(self, pred) -> str:
112-
try:
113-
qname = self.g.qname(pred)
114-
except Exception:
115-
qname = str(pred)
116-
return self._format_key(qname)
117-
118-
def _value_for_object(self, o, depth_left: int, visited: Set[URIRef]) -> Any:
119-
if isinstance(o, Literal):
120-
return _literal_to_py(o)
121-
if isinstance(o, URIRef):
122-
if depth_left > 0 and o not in visited and any(True for _ in self.g.triples((o, None, None))):
123-
return self._subject_to_object(o, depth_left - 1, visited, is_root=False)
124-
return _best_id_for_subject(self.g, o)
125-
return str(o)
126-
127-
def _subject_to_object(self, s: URIRef, depth_left: int, visited: Optional[Set[URIRef]] = None, *, is_root: bool = False) -> Dict[str, Any]:
128-
if visited is None:
129-
visited = set()
130-
if s in visited:
131-
return _best_id_for_subject(self.g, s)
132-
visited = set(visited)
133-
visited.add(s)
134-
135-
obj: Dict[str, Any] = {}
136-
cls = _class_of_subject(self.g, s)
137-
if cls:
138-
obj["@type"] = self._format_key(cls) # respects key_mode
139-
obj["@id"] = _best_id_for_subject(self.g, s)
140-
if self.include_uri:
141-
obj["@uri"] = str(s)
142-
143-
from collections import defaultdict
144-
multimap: Dict[str, List[Any]] = defaultdict(list)
145-
object_keys: Set[str] = set() # keys that contain expanded dict children
146-
147-
# ---- Outgoing edges ----
148-
for _, p, o in self.g.triples((s, None, None)):
149-
if p == RDF.type:
150-
continue
151-
152-
pred_key = self._key(p) # default key = predicate (stripped + key_mode)
153-
154-
if isinstance(o, URIRef) and depth_left > 0 and o not in visited and any(
155-
True for _ in self.g.triples((o, None, None))):
156-
child = self._subject_to_object(o, depth_left - 1, visited, is_root=is_root)
157-
# If expanded, prefer child's @type as the key
158-
use_key = child.get("@type", pred_key) if isinstance(child, dict) else pred_key
159-
multimap[use_key].append(child if child else _best_id_for_subject(self.g, o))
160-
if isinstance(child, dict):
161-
object_keys.add(use_key)
162-
else:
163-
multimap[pred_key].append(self._value_for_object(o, depth_left, visited))
164-
165-
# ---- Incoming edges (same rule; merged under same key) ----
166-
if depth_left > 0:
167-
for src, pred in self._incoming_index.get(s, []):
168-
pred_key = self._key(pred)
169-
if src not in visited and any(True for _ in self.g.triples((src, None, None))):
170-
child = self._subject_to_object(src, depth_left - 1, visited, is_root=is_root)
171-
use_key = child.get("@type", pred_key) if isinstance(child, dict) else pred_key
172-
multimap[use_key].append(child if child else _best_id_for_subject(self.g, src))
173-
if isinstance(child, dict):
174-
object_keys.add(use_key)
175-
else:
176-
multimap[pred_key].append(_best_id_for_subject(self.g, src))
177-
178-
# ---- collapse: lists for object-like keys (or when multiple values) ----
179-
for k, vals in multimap.items():
180-
if (k in object_keys) or (len(vals) > 1):
181-
obj[k] = vals # always list for expanded children
182-
else:
183-
obj[k] = vals[0] # keep simple scalars as scalars
184-
185-
return obj
186-
187-
def _subjects_by_class(self, class_name: str) -> List[URIRef]:
188-
# Accept "nc:RemedialActionSchedule" or "RemedialActionSchedule"
189-
want = _strip_namespace(class_name)
190-
out: List[URIRef] = []
191-
for s, _, t in self.g.triples((None, RDF.type, None)):
192-
if isinstance(t, URIRef):
193-
try:
194-
cur = _strip_namespace(self.g.qname(t))
195-
except Exception:
196-
cur = _strip_namespace(str(t))
197-
if cur == want:
198-
out.append(s)
199-
return out
200-
201-
def build_from_class(self, class_name: str) -> List[Dict[str, Any]]:
202-
roots = self._subjects_by_class(class_name)
203-
seen: Set[URIRef] = set()
204-
uniq: List[URIRef] = []
205-
for r in roots:
206-
if r not in seen:
207-
seen.add(r); uniq.append(r)
208-
return [self._subject_to_object(s, self.inline_depth, set(), is_root=True) for s in uniq]
209-
210-
211-
def convert_cim_rdf_to_json(rdfxml: str, *, root_class: List[str], inline_depth: int = 99, key_mode: str = "qualified") -> Dict[str, Any]:
212-
"""
213-
Returns:
214-
{
215-
"fullModel": {...}, # header (FullModel), keys per key_mode
216-
"<root_class>": [ ... ] # all objects of that class with full linked subgraph
217-
}
218-
"""
219-
g = Graph()
220-
# Accept either a filepath or an XML string
221-
try:
222-
# Try as a path/URI first
223-
g.parse(rdfxml)
224-
except Exception:
225-
# Fallback: treat as raw RDF/XML string
226-
g.parse(data=rdfxml, format="application/rdf+xml")
227-
228-
# Extract header FullModel (if present) — keys also respect key_mode
229-
def _format_key_local(qname_or_uri: str) -> str:
230-
base = _strip_namespace(qname_or_uri)
231-
return base.rsplit(".", 1)[-1] if key_mode == "local" and "." in base else base
232-
233-
header: Dict[str, Any] = {}
234-
for s, _, _ in g.triples((None, None, None)):
235-
is_full = False
236-
for _, _, t in g.triples((s, RDF.type, None)):
237-
if isinstance(t, URIRef) and _localname(t).endswith("FullModel"):
238-
is_full = True; break
239-
if is_full or _localname(s).endswith("FullModel"):
240-
hmap: Dict[str, List[Any]] = defaultdict(list)
241-
for _, p, o in g.triples((s, None, None)):
242-
if p == RDF.type:
243-
continue
244-
try:
245-
pk = g.qname(p)
246-
except Exception:
247-
pk = str(p)
248-
key = _format_key_local(pk)
249-
val = _literal_to_py(o) if isinstance(o, Literal) else _strip_namespace(o)
250-
hmap[key].append(val)
251-
for k, vals in hmap.items():
252-
header[k] = vals[0] if len(vals) == 1 else vals
253-
break # assume single header
254-
255-
fl = CIMFlattener(g, inline_depth=inline_depth, include_uri=False, key_mode=key_mode)
256-
257-
output = {"FullModel": header}
258-
259-
for cls in root_class:
260-
roots = fl.build_from_class(cls)
261-
output[cls] = roots
262-
263-
return output
264-
265-
266-
class RemedialActionScheduleToElasticHandler:
19+
class HandlerRemedialActionScheduleToElastic:
26720

26821
def __init__(self):
26922
self.elastic_service = Elastic()
27023

271-
@staticmethod
272-
def normalize_cim_payload(payload: dict) -> pd.DataFrame:
273-
# --- 1) FullModel meta ---
274-
meta = pd.json_normalize(payload.get("FullModel", {})).iloc[0].to_dict() if payload.get("FullModel") else {}
275-
276-
# --- 2) Per-root normalize + attach meta ---
277-
frames = []
278-
for root_key, rows in payload.items():
279-
if root_key == "FullModel":
280-
continue
281-
rows = rows if isinstance(rows, list) else [rows]
282-
df = pd.json_normalize(rows)
283-
for k, v in meta.items():
284-
df[f"FullModel.{k}"] = v
285-
frames.append(df)
286-
287-
if not frames:
288-
return pd.DataFrame()
289-
290-
df = pd.concat(frames, ignore_index=True)
291-
292-
# --- helpers ---
293-
def any_list(s: pd.Series) -> bool:
294-
return s.apply(lambda x: isinstance(x, list)).any()
295-
296-
def any_dict(s: pd.Series) -> bool:
297-
return s.apply(lambda x: isinstance(x, dict)).any()
298-
299-
# --- 3) Recursively explode list columns and normalize dicts they contain ---
300-
changed = True
301-
while changed:
302-
changed = False
303-
for col in list(df.columns):
304-
if any_list(df[col]):
305-
df = df.explode(col, ignore_index=True)
306-
# if exploded into dicts, normalize & merge
307-
if any_dict(df[col]):
308-
norm = pd.json_normalize(df[col]).add_prefix(f"{col}.")
309-
df = pd.concat([df.drop(columns=[col]).reset_index(drop=True), norm], axis=1)
310-
changed = True
311-
312-
# --- 4) Normalize any remaining plain dict columns (not lists) ---
313-
for col in list(df.columns):
314-
if any_dict(df[col]):
315-
norm = pd.json_normalize(df[col]).add_prefix(f"{col}.")
316-
df = pd.concat([df.drop(columns=[col]).reset_index(drop=True), norm], axis=1)
317-
318-
return df
319-
32024
def handle(self, message: bytes, properties: BasicProperties, **kwargs):
32125

32226
# Convert message from NC to JSON
@@ -325,18 +29,16 @@ def handle(self, message: bytes, properties: BasicProperties, **kwargs):
32529
key_mode=CONVERTER_KEY_MODE)
32630

32731
# JSON normalize and transform to DataFrame
328-
df = self.normalize_cim_payload(data)
32+
df = normalize_cim_payload(payload=data, root_only=False)
32933

33034
# TODO need to get CO and RA from object storage and merge
33135

33236
# Convert to dictionary
333-
data_to_send = df.to_dict(orient='records')
37+
data_to_send = df.astype(object).where(pd.notna(df), None).to_dict("records")
33438

33539
response = self.elastic_service.send_to_elastic_bulk(
336-
index=ELASTIC_METADATA_INDEX,
40+
index=ELASTIC_SCHEDULES_INDEX,
33741
json_message_list=data_to_send,
338-
id_from_metadata=True,
339-
id_metadata_list=["@id"],
34042
)
34143

34244
logger.info(f"Message sending to Elastic successful: {response}")
@@ -345,26 +47,6 @@ def handle(self, message: bytes, properties: BasicProperties, **kwargs):
34547

34648

34749
if __name__ == "__main__":
348-
# rdf_xml = r"C:\Users\martynas.karobcickas\Downloads\ras-example.xml"
349-
# rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_assessed_elements.xml"
350-
# rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_contingencies.xml"
351-
# rdf_xml = r"C:\Users\martynas.karobcickas\Documents\Python projects\RAO\test-data\TC1_remedial_actions.xml"
352-
# g = Graph()
353-
# g.parse(rdf_xml, format="xml") # Put your RDF/XML file
354-
355-
# result = convert_cim_rdf_to_json(rdf_xml, root_class=["RemedialActionSchedule"], key_mode="local")
356-
# result = convert_cim_rdf_to_json(rdf_xml, root_class=["RemedialActionSchedule"], key_mode="qualified")
357-
# result = convert_cim_rdf_to_json(rdf_xml, root_class=["GridStateAlterationRemedialAction"], key_mode="local")
358-
# result = convert_cim_rdf_to_json(rdf_xml, root_class=["OrdinaryContingency", "ExceptionalContingency"], key_mode="local")
359-
360-
# print json
361-
# print(json.dumps(result, indent=2))
362-
363-
# with open("test.json", "w") as f:
364-
# json.dump(result, f, ensure_ascii=False, indent=4)
365-
# df = RemedialActionScheduleToElasticHandler.normalize_cim_payload(result)
366-
# print(df.head())
367-
36850
# Define RMQ test message
36951
headers = {
37052
"baCorrelationID": f"{uuid.uuid4()}",
@@ -386,9 +68,9 @@ def handle(self, message: bytes, properties: BasicProperties, **kwargs):
38668
headers=headers,
38769
)
38870

389-
with open(r"C:\Users\martynas.karobcickas\Downloads\ras-example.xml", "rb") as file:
71+
with open(r"C:\Users\martynas.karobcikas\Downloads\ras-example.xml", "rb") as file:
39072
file_bytes = file.read()
39173

39274
# Create instance
393-
service = RemedialActionScheduleToElasticHandler()
75+
service = HandlerRemedialActionScheduleToElastic()
39476
result = service.handle(message=file_bytes, properties=properties)

0 commit comments

Comments
 (0)