cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Physical CPU cores & Logical CPU cores Information

Babar_Qayyum
DynaMight Guru
DynaMight Guru

Dear All,

How to obtain the physical/logical cores of each host/VM to make a report?

Regards,

Babar Qayyum

4 REPLIES 4

AntonPineiro
DynaMight Guru
DynaMight Guru

Hi,

Maybe using DQL, it was discussed in this thread.

Best regards

❤️ Emacs ❤️ Vim ❤️ Bash ❤️ Perl

Hello @AntonPineiro 

Thank you for your comments. We have Dynatrace Managed.

Regards,

Babar Qayyum

@Babar_Qayyum You can try the Monitored entities API and extract fields properties.cpuCores and properties.logicalCpuCores

https://docs.dynatrace.com/managed/discover-dynatrace/references/dynatrace-api/environment-api/entit...

 

Phani Devulapalli

t_pawlak
Pro

Hi @Babar_Qayyum 
you can use this script and send result as metrics:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# === KONFIGURACJA ===
DT_BASE_URL = "https://dynatrace.example.com/e/ENV-ID"  # Your Managed/SaaS base URL
DT_API_TOKEN = "dt0c01.xxxxx.yyyyy"                             # API token: entities.read

# (optional) entity selector filters:
MZ_NAME = None          # eg. "Production" or None
MZ_ID = None            # eg. "1234567890123456789" or None
TAG = None              # eg. "owner:team-a" lub "Linux" lub None
NAME_FILTER = None      # eg. "app-host*" -> startsWith; bez * -> equals; None to skip

# (optional) additional client-side "contains" filter:
CLIENT_FILTER_CONTAINS = None  # np. "k8s-node"

# exit:
OUTPUT_CSV = "cpu_cores.csv"
PAGE_SIZE = 500
VERIFY_TLS = True       # set to path to CA or False if you have a custom cert (not recommended)

# ====================

import csv
import sys
import time
from typing import Dict, Iterable, List, Optional

import requests

API_PATH = "/api/v2/entities"
TIMEOUT = 30
RETRIES = 3
BACKOFF = 2.0


def build_entity_selector(
    mz_name: Optional[str],
    mz_id: Optional[str],
    tag: Optional[str],
    name_filter: Optional[str],
) -> str:
    parts = ['type("HOST")']
    if mz_name:
        parts.append(f'mzName("{mz_name}")')
    if mz_id:
        parts.append(f'mzId({mz_id})')
    if tag:
        if any(c in tag for c in [' ', ':', '"']):
            parts.append(f'tag("{tag}")')
        else:
            parts.append(f'tag({tag})')
    if name_filter:
        if name_filter.endswith("*"):
            parts.append(f'entityName.startsWith("{name_filter[:-1]}")')
        else:
            parts.append(f'entityName.equals("{name_filter}")')
    return ",".join(parts)


def request_entities(session: requests.Session, base_url: str, token: str, params: Dict[str, str]):
    headers = {"Authorization": f"Api-Token {token}"}
    url = base_url.rstrip("/") + API_PATH
    last_err = None
    for attempt in range(1, RETRIES + 1):
        try:
            resp = session.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_TLS)
            if resp.status_code in (429, 500, 502, 503, 504):
                last_err = Exception(f"HTTP {resp.status_code}: {resp.text[:300]}")
                time.sleep(BACKOFF * attempt)
                continue
            resp.raise_for_status()
            return resp.json()
        except requests.RequestException as e:
            last_err = e
            time.sleep(BACKOFF * attempt)
    raise SystemExit(f"API request failed after {RETRIES} retries: {last_err}")


def fetch_all_hosts(base_url: str, token: str, entity_selector: str, page_size: int = PAGE_SIZE) -> Iterable[Dict]:
    fields = ",".join([
        "+properties.cpuCores",
        "+properties.logicalCpuCores",
        "+properties.osType",
        "+properties.monitoringMode",
        "+managementZones",
        "+lastSeenTms",
    ])
    params = {
        "entitySelector": entity_selector,
        "pageSize": str(page_size),
        "fields": fields,
    }
    session = requests.Session()

    data = request_entities(session, base_url, token, params)
    for e in data.get("entities", []):
        yield e

    next_key = data.get("nextPageKey")
    while next_key:
        data = request_entities(session, base_url, token, {"nextPageKey": next_key})
        for e in data.get("entities", []):
            yield e
        next_key = data.get("nextPageKey")


def to_int_safe(v) -> Optional[int]:
    try:
        return int(v)
    except Exception:
        return None


def main():
    if not DT_BASE_URL or not DT_API_TOKEN or DT_API_TOKEN.strip().startswith("dt0c01.E"):
        print("fill in the DATABASE_URL and DT API TOKEN correctly (with the entities.read scope) in the CONFIGURATION section.", file=sys.stderr)
        sys.exit(1)

    selector = build_entity_selector(MZ_NAME, MZ_ID, TAG, NAME_FILTER)

    rows: List[Dict] = []
    for ent in fetch_all_hosts(DT_BASE_URL, DT_API_TOKEN, selector, PAGE_SIZE):
        if ent.get("type") != "HOST":
            continue
        name = ent.get("displayName") or ""
        if CLIENT_FILTER_CONTAINS and CLIENT_FILTER_CONTAINS.lower() not in name.lower():
            continue

        props = ent.get("properties", {}) or {}
        cpu_phys = to_int_safe(props.get("cpuCores"))
        cpu_log = to_int_safe(props.get("logicalCpuCores"))

        mz_list = ent.get("managementZones", []) or []
        mz_names = ";".join(sorted({mz.get("name", "") for mz in mz_list if mz.get("name")}))

        rows.append({
            "hostId": ent.get("entityId"),
            "hostName": name,
            "osType": props.get("osType"),
            "monitoringMode": props.get("monitoringMode"),
            "cpuCores_physical": cpu_phys,
            "cpuCores_logical": cpu_log,
            "managementZones": mz_names,
            "lastSeenTms": ent.get("lastSeenTms"),
        })

    rows.sort(key=lambda r: (r.get("hostName") or "").lower())

    fieldnames = [
        "hostId",
        "hostName",
        "osType",
        "monitoringMode",
        "cpuCores_physical",
        "cpuCores_logical",
        "managementZones",
        "lastSeenTms",
    ]
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)

    print(f"Saved  {len(rows)} records to: {OUTPUT_CSV}")


if __name__ == "__main__":
    main()

you should recive csv like this:

t_pawlak_0-1761821850804.png

 



Featured Posts