20 Oct 2025 08:03 AM
Dear All,
How to obtain the physical/logical cores of each host/VM to make a report?
Regards,
Babar Qayyum
Solved! Go to Solution.
21 Oct 2025 01:45 AM
@Babar_Qayyum You can try the Monitored entities API and extract fields properties.cpuCores and properties.logicalCpuCores
30 Oct 2025 10:57 AM
Hi @Babar_Qayyum
you can use this script and send result as metrics:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# === KONFIGURACJA ===
DT_BASE_URL = "https://dynatrace.example.com/e/ENV-ID" # Your Managed/SaaS base URL
DT_API_TOKEN = "dt0c01.xxxxx.yyyyy" # API token: entities.read
# (optional) entity selector filters:
MZ_NAME = None # eg. "Production" or None
MZ_ID = None # eg. "1234567890123456789" or None
TAG = None # eg. "owner:team-a" lub "Linux" lub None
NAME_FILTER = None # eg. "app-host*" -> startsWith; bez * -> equals; None to skip
# (optional) additional client-side "contains" filter:
CLIENT_FILTER_CONTAINS = None # np. "k8s-node"
# exit:
OUTPUT_CSV = "cpu_cores.csv"
PAGE_SIZE = 500
VERIFY_TLS = True # set to path to CA or False if you have a custom cert (not recommended)
# ====================
import csv
import sys
import time
from typing import Dict, Iterable, List, Optional
import requests
API_PATH = "/api/v2/entities"
TIMEOUT = 30
RETRIES = 3
BACKOFF = 2.0
def build_entity_selector(
mz_name: Optional[str],
mz_id: Optional[str],
tag: Optional[str],
name_filter: Optional[str],
) -> str:
parts = ['type("HOST")']
if mz_name:
parts.append(f'mzName("{mz_name}")')
if mz_id:
parts.append(f'mzId({mz_id})')
if tag:
if any(c in tag for c in [' ', ':', '"']):
parts.append(f'tag("{tag}")')
else:
parts.append(f'tag({tag})')
if name_filter:
if name_filter.endswith("*"):
parts.append(f'entityName.startsWith("{name_filter[:-1]}")')
else:
parts.append(f'entityName.equals("{name_filter}")')
return ",".join(parts)
def request_entities(session: requests.Session, base_url: str, token: str, params: Dict[str, str]):
headers = {"Authorization": f"Api-Token {token}"}
url = base_url.rstrip("/") + API_PATH
last_err = None
for attempt in range(1, RETRIES + 1):
try:
resp = session.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_TLS)
if resp.status_code in (429, 500, 502, 503, 504):
last_err = Exception(f"HTTP {resp.status_code}: {resp.text[:300]}")
time.sleep(BACKOFF * attempt)
continue
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
last_err = e
time.sleep(BACKOFF * attempt)
raise SystemExit(f"API request failed after {RETRIES} retries: {last_err}")
def fetch_all_hosts(base_url: str, token: str, entity_selector: str, page_size: int = PAGE_SIZE) -> Iterable[Dict]:
fields = ",".join([
"+properties.cpuCores",
"+properties.logicalCpuCores",
"+properties.osType",
"+properties.monitoringMode",
"+managementZones",
"+lastSeenTms",
])
params = {
"entitySelector": entity_selector,
"pageSize": str(page_size),
"fields": fields,
}
session = requests.Session()
data = request_entities(session, base_url, token, params)
for e in data.get("entities", []):
yield e
next_key = data.get("nextPageKey")
while next_key:
data = request_entities(session, base_url, token, {"nextPageKey": next_key})
for e in data.get("entities", []):
yield e
next_key = data.get("nextPageKey")
def to_int_safe(v) -> Optional[int]:
try:
return int(v)
except Exception:
return None
def main():
if not DT_BASE_URL or not DT_API_TOKEN or DT_API_TOKEN.strip().startswith("dt0c01.E"):
print("fill in the DATABASE_URL and DT API TOKEN correctly (with the entities.read scope) in the CONFIGURATION section.", file=sys.stderr)
sys.exit(1)
selector = build_entity_selector(MZ_NAME, MZ_ID, TAG, NAME_FILTER)
rows: List[Dict] = []
for ent in fetch_all_hosts(DT_BASE_URL, DT_API_TOKEN, selector, PAGE_SIZE):
if ent.get("type") != "HOST":
continue
name = ent.get("displayName") or ""
if CLIENT_FILTER_CONTAINS and CLIENT_FILTER_CONTAINS.lower() not in name.lower():
continue
props = ent.get("properties", {}) or {}
cpu_phys = to_int_safe(props.get("cpuCores"))
cpu_log = to_int_safe(props.get("logicalCpuCores"))
mz_list = ent.get("managementZones", []) or []
mz_names = ";".join(sorted({mz.get("name", "") for mz in mz_list if mz.get("name")}))
rows.append({
"hostId": ent.get("entityId"),
"hostName": name,
"osType": props.get("osType"),
"monitoringMode": props.get("monitoringMode"),
"cpuCores_physical": cpu_phys,
"cpuCores_logical": cpu_log,
"managementZones": mz_names,
"lastSeenTms": ent.get("lastSeenTms"),
})
rows.sort(key=lambda r: (r.get("hostName") or "").lower())
fieldnames = [
"hostId",
"hostName",
"osType",
"monitoringMode",
"cpuCores_physical",
"cpuCores_logical",
"managementZones",
"lastSeenTms",
]
with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(rows)} records to: {OUTPUT_CSV}")
if __name__ == "__main__":
main()you should recive csv like this: