cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Management zone to identify orphan entities

tmathew
Frequent Guest

Is it possible to create a management zone that identifies all entities not included in any existing management zone? I’d like this zone to capture entities across multiple types—such as Applications, Hosts, Synthetic Tests, HTTP Checks, and Custom Devices—so we can easily review any uncovered resources.

Thanks

3 REPLIES 3

AntonPineiro
DynaMight Guru
DynaMight Guru

Hi,

I would say creating those management zones base on tagging rules.

Then, you can define that logic base on tags:

  • If tag is present, MZ A.
  • If tag is not present, MZ B.

Best regards

❤️ Emacs ❤️ Vim ❤️ Bash ❤️ Perl

I would say this would definitely work. Tagging resources is imo the most important step for ordering entities and metrics. So you would simply base your management zones upon tags.


Let's say you have created a Management Zone based upon an Application tag:MyApplication.
You can than negate the management zone like this for hosts:

 

type("HOST"), not(tag("MyApplication"))

 

You could  also just create a notebook / dashboard  where you write an DQL statement that pulls these metrics and combine it with other usefull stuff like updates failed, hosts not in host groups / network zones etc.

#Performance matter!

tomaxp
Contributor

Hi,
unfortunately, there is no such option from the GUI.
The only thing I can suggest is retrieving this data via the API.
First, list all entities by type, then for each entityID check if it has an assigned MZ.
Below, I’m pasting a script that will list this for you.

#!/usr/bin/env python3

import os, csv, json, time, re, argparse
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
# =========================
# CONFIGURATION (CHANGE HERE)
# =========================
DT_BASE_URL = "https://dynatrace/e/tenantid"  # WITHOUT /api/v2 at the end
DT_API_TOKEN = "dt0c01. API TOKEN"  # token with "Monitored entities – read" permission (API v2)

# Types you want to check
TYPES = ["APPLICATION", "HOST", "SYNTHETIC_TEST", "HTTP_CHECK", "CUSTOM_DEVICE"]

# Files
OUT_CSV = "unassigned_entities.csv"
OUT_JSON = "unassigned_entities.json"

# Technical parameters
PAGE_SIZE   = 500
TIMEOUT_SEC = 30
MAX_WORKERS = 8  
# =========================


def _normalize_base_url(u: str) -> str:
    if not u:
        raise RuntimeError("DT_BASE_URL not set")
    u = re.sub(r"/api/v2/?$", "", u.strip())  
    return u.rstrip("/")


def _headers():
    return {
        "Authorization": f"Api-Token {DT_API_TOKEN}",
        "accept": "application/json; charset=utf-8",
    }


def list_entity_ids(base_url: str, entity_type: str):
    """Step 1: Returns a list of entityIds for a given type"""
    ids = []
    next_key = None
    url = f"{base_url}/api/v2/entities"

    while True:
        if next_key:
            params = {"nextPageKey": next_key}
        else:

            params = {"entitySelector": f'type("{entity_type}")', "pageSize": str(PAGE_SIZE)}

        r = requests.get(url, headers=_headers(), params=params, timeout=TIMEOUT_SEC)
        if r.status_code == 429:
            time.sleep(1.0)
            continue
        r.raise_for_status()

        data = r.json()
        for e in data.get("entities", []):
            eid = e.get("entityId")
            if eid:
                ids.append(eid)

        next_key = data.get("nextPageKey")
        if not next_key:
            break

    return ids


def get_entity_details(base_url: str, entity_id: str):
    url = f"{base_url}/api/v2/entities/{entity_id}"
    params = {"fields": "+managementZones,+tags,+firstSeenTms,+lastSeenTms"}  # <= FIX

    for attempt in range(5): 
        r = requests.get(url, headers=_headers(), params=params, timeout=TIMEOUT_SEC)
        if r.status_code == 429:
            time.sleep(0.5 * (attempt + 1))
            continue
        if not r.ok:
            return {
                "entityId": entity_id,
                "type": "",
                "displayName": "",
                "firstSeenTms": None,
                "lastSeenTms": None,
                "managementZonesCount": None,
                "error": f"{r.status_code} {r.text[:1000]}",
            }

        data = r.json()
        mz = data.get("managementZones") or []
        return {
            "entityId": data.get("entityId", entity_id),
            "type": data.get("type", ""),
            "displayName": data.get("displayName", ""),
            "firstSeenTms": data.get("firstSeenTms"),
            "lastSeenTms": data.get("lastSeenTms"),
            "managementZonesCount": len(mz),
        }

    return {
        "entityId": entity_id,
        "type": "",
        "displayName": "",
        "firstSeenTms": None,
        "lastSeenTms": None,
        "managementZonesCount": None,
        "error": "Too many 429s",
    }

def main():
    base_url = _normalize_base_url(DT_BASE_URL)
    if not DT_API_TOKEN or DT_API_TOKEN.startswith("dt0c01.xxxxx"):
        raise RuntimeError("Set the correct DT_API_TOKEN in the script header")

    all_ids = []
    per_type_counts = {}

    # Krok 1 — listing ID
    for t in TYPES:
        print(f"[INFO] Step1: listing the ID for the type: {t}")
        ids = list_entity_ids(base_url, t)
        per_type_counts[t] = len(ids)
        all_ids.extend(ids)
        print(f"[OK] {t}: {len(ids)} entity")

    # Krok 2 — szczegóły
    print(f"[INFO] Step2: fetching details of {len(all_ids)} entities in parallel ({MAX_WORKERS} threads)")
    details = []
    unassigned = []

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = {ex.submit(get_entity_details, base_url, eid): eid for eid in all_ids}
        for f in as_completed(futures):
            rec = f.result()
            details.append(rec)

            if rec.get("error"):
                print(f"[WARN] {rec['entityId']}: {rec['error']}")
                continue

            if rec.get("managementZonesCount", 0) == 0:
                unassigned.append(rec)

    # Save
    fields = ["entityId", "type", "displayName", "firstSeenTms", "lastSeenTms", "managementZonesCount"]
    with open(OUT_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fields)
        w.writeheader()
        for r in unassigned:
            w.writerow(r)

    with open(OUT_JSON, "w", encoding="utf-8") as f:
        json.dump(unassigned, f, ensure_ascii=False, indent=2)

    # Summary
    print("\n=== SUMMARY ===")
    for t, c in per_type_counts.items():
        print(f"{t}: {c} (all)")
    print(f"without MZ: {len(unassigned)}")
    print(f"Pliki: {OUT_CSV}  |  {OUT_JSON}")


if __name__ == "__main__":
    main()

Featured Posts