Skip to content

Commit d086dbe

Browse files
bcdurakstefannica
andauthored
Improving the Datadog fetch logic (#4314)
* first checkpoint * migration * second check * docs checkpoint * another checkpoint * new checkpoint * solving things * some other checkpoint * formatting * new changes * checkpoint * fixing the secret * some broken checkpoint * new big checkpoint * removing docs for now * some fixes * fix the local orch * removed unused * delete the old step logging * some fixes * running checkpoint * new changes * stack changes * more fixes * new changes * new stuff * new defaults and formatting * moving stuff around * adding the dependency * fixing the migration * new changes * formating * new creation * some new import structure * fixing the order * some minor changes * more minor changes * new default constants * some more minor fixes * fix * some fixes * new try * Fix infinite loop on debug logs * Log exceptions raised during the logger context emit calls * Decoupled logging context from the log store and added flush method to the log store abstraction. * optimizing * some changes * Intermediate fixes for scalability * Update logging for deployers to use utils * Refactored the EOF operation * Fix first round of bugs after last changes * formatting * formatting, linting, docstrings and tests * unit tests * docstrings * removed old tests * format * Apply code review suggestions * Another round of code review suggestions * Add metadata to logs and replaced the datadog exporter with the standard OTEL exporter * Fixed datadog log fetching * docstrings * Update src/zenml/log_stores/artifact/artifact_log_exporter.py * Removed context, fixed datadog fetch time window, used OTEL handler * Implement generic OTEL exporter * Fix docstrings and spelling errors * Fix linter errors * fixing the unit tests * format * Improved otel exporter to use correct fields * small fix to the runner * one more * removed todo * more minor fixes * sql zen store changes * more minor fixes * another small fix * minor fixes * better log entry fetching * late night changes * proper limits * new logic * docstring fix --------- Co-authored-by: Stefan Nica <[email protected]>
1 parent 3989d92 commit d086dbe

File tree

1 file changed

+124
-87
lines changed

1 file changed

+124
-87
lines changed

src/zenml/log_stores/datadog/datadog_log_store.py

Lines changed: 124 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ def fetch(
7171
"""Fetch logs from Datadog's API.
7272
7373
This method queries Datadog's Logs API to retrieve logs for the
74-
specified pipeline run and step. It uses the HTTP API without
75-
requiring the Datadog SDK.
74+
specified pipeline run and step. It automatically paginates through
75+
results to fetch up to the requested limit.
7676
7777
Args:
7878
logs_model: The logs model containing run and step metadata.
@@ -99,104 +99,141 @@ def fetch(
9999
"Content-Type": "application/json",
100100
}
101101

102-
body: Dict[str, Any] = {
103-
"filter": {
104-
"query": query,
105-
"from": (
106-
start_time.isoformat()
107-
if start_time
108-
else logs_model.created.isoformat()
109-
),
110-
"to": (
111-
end_time.isoformat()
112-
if end_time
113-
else datetime.now().astimezone().isoformat()
114-
),
115-
},
116-
"page": {
117-
"limit": min(limit, 1000), # Datadog API limit
118-
},
119-
"sort": "@otel.timestamp",
120-
}
102+
log_entries: List[LogEntry] = []
103+
cursor: Optional[str] = None
104+
remaining = limit
121105

122106
try:
123-
response = requests.post(
124-
api_endpoint,
125-
headers=headers,
126-
json=body,
127-
timeout=30,
128-
)
129-
130-
if response.status_code != 200:
131-
logger.error(
132-
f"Failed to fetch logs from Datadog: {response.status_code} - {response.text[:200]}"
107+
while remaining > 0:
108+
# Datadog API limit is 1000 per request
109+
page_limit = min(remaining, 1000)
110+
111+
body: Dict[str, Any] = {
112+
"filter": {
113+
"query": query,
114+
"from": (
115+
start_time.isoformat()
116+
if start_time
117+
else logs_model.created.isoformat()
118+
),
119+
"to": (
120+
end_time.isoformat()
121+
if end_time
122+
else datetime.now().astimezone().isoformat()
123+
),
124+
},
125+
"page": {
126+
"limit": page_limit,
127+
},
128+
"sort": "@otel.timestamp",
129+
}
130+
131+
if cursor:
132+
body["page"]["cursor"] = cursor
133+
134+
response = requests.post(
135+
api_endpoint,
136+
headers=headers,
137+
json=body,
138+
timeout=30,
133139
)
134-
return []
135-
136-
data = response.json()
137-
log_entries = []
138-
139-
for log in data.get("data", []):
140-
log_fields = log.get("attributes", {})
141-
message = log_fields.get("message", "")
142-
nested_attrs = log_fields.get("attributes", {})
143-
144-
if exc_info := nested_attrs.get("exception"):
145-
exc_message = exc_info.get("message")
146-
exc_type = exc_info.get("type")
147-
exc_stacktrace = exc_info.get("stacktrace")
148-
message += f"\n{exc_type}: {exc_message}\n{exc_stacktrace}"
149-
150-
code_info = nested_attrs.get("code", {})
151-
filename = code_info.get("file", {}).get("path")
152-
lineno = code_info.get("line", {}).get("number")
153-
function_name = code_info.get("function", {}).get("name")
154-
155-
otel_info = nested_attrs.get("otel", {})
156-
logger_name = otel_info.get("library", {}).get("name")
157-
158-
timestamp_ns_str = otel_info.get("timestamp")
159-
if timestamp_ns_str:
160-
timestamp_ns = int(timestamp_ns_str)
161-
timestamp = datetime.fromtimestamp(
162-
timestamp_ns / 1e9, tz=timezone.utc
163-
)
164-
else:
165-
timestamp = datetime.fromisoformat(
166-
log_fields["timestamp"].replace("Z", "+00:00")
140+
141+
if response.status_code != 200:
142+
logger.error(
143+
f"Failed to fetch logs from Datadog: "
144+
f"{response.status_code} - {response.text[:200]}"
167145
)
146+
break
168147

169-
severity = log_fields.get("status", "info").upper()
170-
log_severity = (
171-
LoggingLevels[severity]
172-
if severity in LoggingLevels.__members__
173-
else LoggingLevels.INFO
174-
)
148+
data = response.json()
149+
logs = data.get("data", [])
175150

176-
module = None
177-
if function_name:
178-
module = function_name
179-
elif filename:
180-
module = filename.rsplit("/", 1)[-1].replace(".py", "")
181-
182-
entry = LogEntry(
183-
message=message,
184-
level=log_severity,
185-
timestamp=timestamp,
186-
name=logger_name,
187-
filename=filename,
188-
lineno=lineno,
189-
module=module,
190-
)
151+
if not logs:
152+
break
153+
154+
for log in logs:
155+
entry = self._parse_log_entry(log)
156+
if entry:
157+
log_entries.append(entry)
158+
159+
remaining -= len(logs)
191160

192-
log_entries.append(entry)
161+
# Get cursor for next page
162+
cursor = data.get("meta", {}).get("page", {}).get("after")
163+
if not cursor:
164+
break
193165

194166
logger.debug(f"Fetched {len(log_entries)} logs from Datadog")
195167
return log_entries
196168

197169
except Exception as e:
198170
logger.exception(f"Error fetching logs from Datadog: {e}")
199-
return []
171+
return log_entries # Return what we have so far
172+
173+
def _parse_log_entry(self, log: Dict[str, Any]) -> Optional[LogEntry]:
174+
"""Parse a single log entry from Datadog's API response.
175+
176+
Args:
177+
log: The log data from Datadog's API.
178+
179+
Returns:
180+
A LogEntry object, or None if parsing fails.
181+
"""
182+
try:
183+
log_fields = log.get("attributes", {})
184+
message = log_fields.get("message", "")
185+
nested_attrs = log_fields.get("attributes", {})
186+
187+
if exc_info := nested_attrs.get("exception"):
188+
exc_message = exc_info.get("message")
189+
exc_type = exc_info.get("type")
190+
exc_stacktrace = exc_info.get("stacktrace")
191+
message += f"\n{exc_type}: {exc_message}\n{exc_stacktrace}"
192+
193+
code_info = nested_attrs.get("code", {})
194+
filename = code_info.get("file", {}).get("path")
195+
lineno = code_info.get("line", {}).get("number")
196+
function_name = code_info.get("function", {}).get("name")
197+
198+
otel_info = nested_attrs.get("otel", {})
199+
logger_name = otel_info.get("library", {}).get("name")
200+
201+
timestamp_ns_str = otel_info.get("timestamp")
202+
if timestamp_ns_str:
203+
timestamp_ns = int(timestamp_ns_str)
204+
timestamp = datetime.fromtimestamp(
205+
timestamp_ns / 1e9, tz=timezone.utc
206+
)
207+
else:
208+
timestamp = datetime.fromisoformat(
209+
log_fields["timestamp"].replace("Z", "+00:00")
210+
)
211+
212+
severity = log_fields.get("status", "info").upper()
213+
log_severity = (
214+
LoggingLevels[severity]
215+
if severity in LoggingLevels.__members__
216+
else LoggingLevels.INFO
217+
)
218+
219+
module = None
220+
if function_name:
221+
module = function_name
222+
elif filename:
223+
module = filename.rsplit("/", 1)[-1].replace(".py", "")
224+
225+
return LogEntry(
226+
message=message,
227+
level=log_severity,
228+
timestamp=timestamp,
229+
name=logger_name,
230+
filename=filename,
231+
lineno=lineno,
232+
module=module,
233+
)
234+
except Exception as e:
235+
logger.warning(f"Failed to parse log entry: {e}")
236+
return None
200237

201238
def cleanup(self) -> None:
202239
"""Cleanup the Datadog log store.

0 commit comments

Comments
 (0)