cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Pro Tip: Extract and process huge log lines

Maheedhar_T
Mentor

There might be cases when you have to process log content that's huge. Let's say you want to monitor the Oracle Linux patch events.
It's not always easier because the patch events produce huge line of sometimes 1000s of lines under same timestamp (consider it similar to the output that you get when you do sudo apt-get update from that at the end you have something that needs to be converted to a bizevent.
In our case the kernel version needs to be converted to a bizevent if that exists. The kernel version however would be at the end of the log content. 
It would look like this.

Installed:
  kernel-4.18.0-553.33.1.el8_10.x86_64                                          
  kernel-core-4.18.0-553.33.1.el8_10.x86_64                                     
  kernel-modules-4.18.0-553.33.1.el8_10.x86_64 

Almost 300000 characters and 2500+ lines before this Installed appears.


We can clearly not use Open-pipeline here. Though it supports the processing of huge data here it would be multi-line data so building up a processor that extracts this becomes a nightmare.
However,
The good part is that the whole log content is being captured without truncation.

The workaround for this is Open-pipeline's event extraction + Dynatrace workflows.

Here's what worked for us.
When we see a log line that comes from patch log file, we create a Davis-event from OpenPipeline with this configuration.

 

Maheedhar_T_1-1753341545149.png

 

 

Make sure of adding the last property dt.davis.is_problem_suppressed so as to avoid the problem noise. Whenever you get a log line detected in the Patch log this pipeline would be triggered and create a davis event.

Next part is extracting this and converting it to Bizevents.
Create a new workflow with the Davis event trigger. In this case it would go like this.

Maheedhar_T_3-1753341759329.png

Now another challenge here is even when you've defined the event description to be content of the log line using wildcard {content} the content will be truncated due to character limits.
So, when the workflow is triggered, we need to fetch the log again using DQL. For that we fetch the host on which this patch happened using DQL.

Next step of the workflow would be to get event details using DQL.

fetch dt.davis.events
| filter matchesPhrase(event.name,"Patch")
| sort timestamp desc
| limit 1

The limit 1 is to fetch data of one host but can be changed accordingly.
Next, we extract the host details programmatically using js.
Sample code:

import { executionsClient } from '@dynatrace-sdk/client-automation';

export default async function ({ execution_id }) {
  // Step: Get result of 'fetch_event'
  const fetchEventConfig = { executionId: execution_id, id: 'fetch_event' };
  try {
    const fetchEventResult = await executionsClient.getTaskExecutionResult(fetchEventConfig);

    // Extract 'dt.entity.host' from the first record
    const hostId = fetchEventResult.records?.[0]?.["dt.entity.host"];

    if (hostId) {
      const result = { "host": hostId };
      console.log('Extracted Host ID:', JSON.stringify(result, null, 2));
      return result;
    } else {
      console.warn('Host ID not found in fetch_event result.');
      return { "host": null };
    }
  } catch (error) {
    console.error('Error fetching fetch_event result:', error);
    return { "host": null };
  }
}

Then we get the log from that host using DQL:

fetch logs
| filter matchesPhrase(dt.entity.host,"{{ result("get_host_id").host }}")
| sort timestamp desc
| limit 1

Finally we extract the data and ingest it as Bizevents.

import { executionsClient } from '@dynatrace-sdk/client-automation';
import { businessEventsClient } from '@dynatrace-sdk/client-classic-environment-v2';

export default async function ({ execution_id }) {
  try {
    // Step 1: Fetch logs
    const logsResult = await executionsClient.getTaskExecutionResult({
      executionId: execution_id,
      id: 'get_log',
    });

    const content = logsResult?.records?.[0]?.content;
    if (!content) {
      console.warn('No content found in get_logs result.');
      return;
    }

    // Extract timestamp from the beginning
    const timestampMatch = content.match(/^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})/);
    const timestamp = timestampMatch ? timestampMatch[1] : null;

    // Extract Installed section
    const installedSectionMatch = content.match(/Installed:\s*([\s\S]*?)(?:\n\S|\r\n\S)/);
    const installedSection = installedSectionMatch ? installedSectionMatch[1] : '';

    // Extract versions from Installed section
    const kernelLine = installedSection.split('\n').find(line => line.includes('kernel-') && !line.includes('kernel-core') && !line.includes('kernel-modules'));
    const kernelCoreLine = installedSection.split('\n').find(line => line.includes('kernel-core-'));
    const kernelModulesLine = installedSection.split('\n').find(line => line.includes('kernel-modules-'));

    const kernelMatch = kernelLine?.match(/kernel-([\w\.\-]+\.x86_64)/);
    const kernelCoreMatch = kernelCoreLine?.match(/kernel-core-([\w\.\-]+\.x86_64)/);
    const kernelModulesMatch = kernelModulesLine?.match(/kernel-modules-([\w\.\-]+\.x86_64)/);

    if (!timestamp || !kernelMatch || !kernelCoreMatch || !kernelModulesMatch) {
      console.warn('Required kernel data not found in Installed section.');
      return;
    }

    // Step 2: Fetch hostname from fetch_event
    const fetchEventResult = await executionsClient.getTaskExecutionResult({
      executionId: execution_id,
      id: 'fetch_event',
    });

    const hostname = fetchEventResult?.records?.[0]?.["dt.entity.host"] || 'unknown-host';

    // Step 3: Construct and ingest business event
    const bizevent = {
      specversion: '1.0',
      source: 'patching.kernel.update',
      id: crypto.randomUUID().toString(),
      type: 'kernel.update.detected',
      data: {
        timestamp,
        hostname,
        kernel: kernelMatch[1],
        kernel_core: kernelCoreMatch[1],
        kernel_modules: kernelModulesMatch[1],
      },
    };

    await businessEventsClient.ingest({
      body: bizevent,
      type: 'application/cloudevent+json',
    });

    console.log('Business event ingested successfully:', JSON.stringify(bizevent, null, 2));
    return bizevent;
  } catch (error) {
    console.error('Failed to ingest business event:', error);
  }
}




Note:
Here I have added limit 1 to all DQL to avoid ambiguity. You can change it according to your need. The basic Idea is when you can't directly process a log record you can use this workaround.
Oh yes and finally in the open-pipeline you can have a rule that you just store the log record for one day.



 Regards,
@Maheedhar_T 




Maheedhar
1 REPLY 1

theharithsa
Dynatrace Champion
Dynatrace Champion

This is very useful, Maheedhar. 
Thanks for sharing. 

Love more, hate less; Technology for all, together we grow.

Featured Posts