02 Oct 2025
07:20 PM
- last edited on
31 Oct 2025
10:34 AM
by
Michal_Gebacki
Hi,
Is it possible to retrieve UpdateStatus field of an oneagent using DQL? I know that we can avail this using /api/v1/oneagents.
Solved! Go to Solution.
03 Oct 2025 12:44 AM
@SrikanthSamraj Not sure this field is exposed via DQL, your best bet would be API
03 Oct 2025 06:51 PM
Hi @SrikanthSamraj
I don't think there's an automated way to use DQL for get OneAgent and ActiveGate for obtain UpdateStatus .
But I think it could be possible if you capture both logs and input them into Dynatrace, with that way you can do explore information with DQL.
I hope it's helpful 💪
03 Oct 2025 07:42 PM
@SrikanthSamrajI also think that the only way is via ingest logs and then parse them as @PierreGutierrez suggested, but I'm wondering what is your use case? Why DQL and not the API?
10 Oct 2025 11:19 AM
Hi, u can use this script and send result as metrics/logs/event to dynatrace:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Dynatrace OneAgents UpdateStatus exporter
"""
import argparse
import csv
import json
import sys
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
import requests
# =========================
# CONFIGURATION (CHANGE HERE)
# =========================
DT_BASE_URL = "https://dynatrace/e/tenant"
DT_API_TOKEN = "dt0c01.xxxxxxxxxxxx"
# =========================
# MAIN SCRIPT
# =========================
def parse_args():
p = argparse.ArgumentParser(description="Export Dynatrace OneAgents update status.")
p.add_argument("--base-url", default=DT_BASE_URL,
help=f"Base URL of your Dynatrace environment (default: {DT_BASE_URL})")
p.add_argument("--api-token", default=DT_API_TOKEN,
help="Dynatrace API token (default: value from CONFIGURATION)")
p.add_argument("--output", default="oneagents.csv",
help="Output file path (ends with .csv or .json). Default: oneagents.csv")
p.add_argument("--page-size", type=int, default=500,
help="Page size for the API call. Default: 500")
p.add_argument("--verify", default="true",
help="TLS verification: 'true' (default), 'false', or path to CA bundle.")
p.add_argument("--timeout", type=int, default=30, help="HTTP timeout seconds. Default: 30")
return p.parse_args()
def make_session(token: str, verify_opt: str, timeout: int) -> requests.Session:
s = requests.Session()
s.headers.update({
"Authorization": f"Api-Token {token}",
"Accept": "application/json",
})
if verify_opt.lower() == "false":
s.verify = False
elif verify_opt.lower() == "true":
s.verify = True
else:
s.verify = verify_opt # path to CA bundle
s.timeout = timeout
return s
def fetch_oneagents(base_url: str,
session: requests.Session,
page_size: int = 500
) -> Iterable[Dict]:
"""
Generator yielding OneAgent records from /api/v1/oneagents across all pages.
"""
endpoint = f"{base_url.rstrip('/')}/api/v1/oneagents"
params = {"pageSize": page_size}
next_key = None
while True:
q = params.copy()
if next_key:
q["nextPageKey"] = next_key
resp = session.get(endpoint, params=q)
resp.raise_for_status()
data = resp.json() or {}
items = []
for key in ("oneagents", "agents", "items"):
if isinstance(data.get(key), list):
items = data[key]
break
if not items and isinstance(data, list):
items = data
for it in items:
yield it
next_key = data.get("nextPageKey")
if not next_key:
break
def normalize_record(raw: Dict) -> Dict:
def to_iso(ts):
if ts in (None, 0, "0"):
return None
try:
return datetime.fromtimestamp(int(ts)/1000, tz=timezone.utc).isoformat()
except Exception:
return str(ts)
return {
"hostId": raw.get("hostId") or raw.get("entityId") or raw.get("host", {}).get("id"),
"hostName": raw.get("hostName") or raw.get("displayName") or raw.get("host", {}).get("displayName"),
"ipAddresses": ",".join(raw.get("ipAddresses", []) or raw.get("networkAddresses", []) or []),
"osType": raw.get("osType"),
"oneAgentVersion": (raw.get("agentVersion") or raw.get("oneAgentVersion")
or raw.get("version") or raw.get("agent", {}).get("version")),
"updateStatus": raw.get("updateStatus"),
"needsReboot": raw.get("needsReboot"),
"availabilityState": raw.get("availabilityState"),
"managementZoneIds": ",".join(raw.get("managementZoneIds", []) or []),
"hostGroupId": raw.get("hostGroupId"),
"autoUpdate": raw.get("autoUpdate") or raw.get("autoUpdateSettings", {}).get("enabled"),
"lastSeen": to_iso(raw.get("lastSeenTimestamp") or raw.get("lastSeen")),
"agentType": raw.get("agentType") or raw.get("installerType"),
"environment": raw.get("environment"),
}
def write_csv(path: str, rows: List[Dict]):
headers = sorted({k for r in rows for k in r.keys()}) if rows else []
with open(path, "w", newline="", encoding="utf-8") as f:
if headers:
w = csv.DictWriter(f, fieldnames=headers)
w.writeheader()
w.writerows(rows)
def write_json(path: str, rows: List[Dict]):
with open(path, "w", encoding="utf-8") as f:
json.dump(rows, f, ensure_ascii=False, indent=2)
def main():
args = parse_args()
session = make_session(args.api_token, args.verify, args.timeout)
try:
raw_items = list(fetch_oneagents(
base_url=args.base_url,
session=session,
page_size=args.page_size
))
except requests.HTTPError as e:
print(f"[ERROR] HTTP {e.response.status_code}: {e.response.text}", file=sys.stderr)
sys.exit(1)
except requests.RequestException as e:
print(f"[ERROR] Request failed: {e}", file=sys.stderr)
sys.exit(2)
rows = [normalize_record(x) for x in raw_items]
if args.output.lower().endswith(".json"):
write_json(args.output, rows)
else:
write_csv(args.output, rows)
print(f"[OK] Exported {len(rows)} OneAgent records -> {args.output}")
if __name__ == "__main__":
main()