cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Is there a way to fetch total number of times Problem notification is trigerred for an Application for a given Duration?

RuchiAshokSaini
Participant

Our teams are looking to understand how many problem notifications are triggered for a period of 30 days or so for a given application.

Is there a way to grab this information via any API?

4 REPLIES 4

Hi there, 

How I would do this: put your application in a management zone.  Include all the entities like hosts, services and databases etcetera.  

The problem notifications are bound to an alerting profile. IYou can use the problems V2 api to query for it using the problemSelector to filter on the alerting profile and the management zone. 

Now you could know how many time an alert went to a distribution channel.

KR.

Michiel 

#Performance matter!

tomaxp
Advisor

Yes — indirectly. Dynatrace doesn’t expose a dedicated “notification delivery log,” but at the problem level you can fetch the associated alerting profiles. If we assume alerting profile = notification, then you can simply count problems that have an alerting profile within your 30-day window (optionally scoped to your app/MZ).
I past script to do this:

#!/usr/bin/env python3
import os, sys, csv, time, requests
from urllib.parse import quote

DT_BASE   = os.getenv("DT_BASE", "https://{environmentid}.live.dynatrace.com/api/v2<br />(Managed: https://<cluster>/e/<ENV-ID>/api/v2)</p>
<p>Token: Read problems permission.</p>
<p>Pagination: when using nextPageKey, omit all other query params (except optional fields).</p>

#!/usr/bin/env python3
import os, sys, csv, time, requests
from urllib.parse import quote

DT_BASE   = os.getenv("DT_BASE", "https://DYNATRACE_URL/api/v2")
DT_TOKEN  = os.getenv("DT_TOKEN", "dt0c01.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
# timeframe
FROM      = os.getenv("FROM", "now-30d")
TO        = os.getenv("TO", "now")

# (Optionaly) you can filter by status etc
# empty, if u want all 
PROBLEM_SELECTOR = os.getenv("PROBLEM_SELECTOR", "")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "500"))
OUT_CSV   = os.getenv("OUT", "problems_with_alerting_profiles.csv")

headers = {"Authorization": f"Api-Token {DT_TOKEN}"}

def list_problems():
    next_key = None
    page = 0
    while True:
        page += 1
        params = {}
        if next_key:
            params["nextPageKey"] = next_key
        else:
            params.update({"from": FROM, "pageSize": PAGE_SIZE})
            if TO:
                params["to"] = TO
            if PROBLEM_SELECTOR:
                params["problemSelector"] = PROBLEM_SELECTOR

        r = requests.get(f"{DT_BASE}/problems", headers=headers, params=params, timeout=30)
        if not r.ok:
            try: print("[ERROR]", r.status_code, r.json())
            except Exception: print("[ERROR]", r.status_code, r.text)
            r.raise_for_status()

        data = r.json()
        if page == 1:
            tc = data.get("totalCount")
            print(f"[INFO] totalCount z API (strona 1): {tc}")

        probs = data.get("problems", [])
        print(f"[INFO] Strona {page}: {len(probs)} problemów")
        for p in probs:
            yield p

        next_key = data.get("nextPageKey")
        if not next_key:
            print("[INFO] Brak nextPageKey — koniec.")
            break

def get_problem_detail(pid: str):
    for attempt in range(3):
        r = requests.get(f"{DT_BASE}/problems/{pid}", headers=headers, timeout=30)
        if r.ok:
            return r.json()
        if r.status_code in (429, 500, 503):
            time.sleep(1.5 * (attempt + 1)); continue
        try: print(f"[WARN] details {pid}:", r.status_code, r.json())
        except Exception: print(f"[WARN] details {pid}:", r.status_code, r.text)
        r.raise_for_status()
    r.raise_for_status()

def main():
    found_any = False
    with open(OUT_CSV, "w", newline="", encoding="utf-8") as f:
        wr = csv.writer(f)
        wr.writerow([
            "problemId","displayId","startTime","endTime","status","severityLevel",
            "impactLevel","managementZones","alertingProfileId","alertingProfileName"
        ])

        for p in list_problems():
            det = get_problem_detail(p.get("problemId"))
            filters = det.get("problemFilters") or []
            if not filters:
                continue  # only with alerting profiles

            found_any = True
            mz = det.get("managementZones") or []
            mz_join = ";".join([m.get("name","") for m in mz])

            for flt in filters:
                wr.writerow([
                    det.get("problemId"), det.get("displayId"),
                    det.get("startTime"), det.get("endTime"),
                    det.get("status"), det.get("severityLevel"),
                    det.get("impactLevel"), mz_join,
                    flt.get("id",""), flt.get("name","")
                ])

    if not found_any:
        print("[INFO] not find problems with alerting profiles in timeframe.")
    else:
        print(f"[OK] Save to: {OUT_CSV}")

if __name__ == "__main__":
    main()

What the script does:

Lists problems for a given time window (from=now-30d, etc.) using Problems API v2 with nextPageKey pagination.

For each problem, it requests details (GET /problems/{problemId}) and inspects problemFilters[] — these are the alerting profiles tied to the problem.

Filters to include only problems with at least one alerting profile (per your requirement).

Writes a CSV with: problemId, displayId, timestamps, status, severity, impactLevel, managementZones, alertingProfileId, alertingProfileName.

How to get “# of notifications over 30 days”:

Easiest: count rows in the CSV (each row = problem × assigned alerting profile).

Per profile: group by alertingProfileId/alertingProfileName.

Scope to a specific application/MZ via PROBLEM_SELECTOR, e.g.

managementZones("My App MZ") (recommended),

or (if needed) affectedEntities("APPLICATION-XXXXXX").

 

Technical requirements:

Token: Read problems permission.

Featured Posts