cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Mule Tagging Sensor

willl
Newcomer

Hi Team,

We use mule and in some of the flows we're doing asynchronous processing. By "doing asynchronous processing, the receiver thread is used only to place the message on a staged event-driven architectur (SEDA) queue, at which point the message is transferred to a flow thread." I guess that's why In Dynatrace, we can only see the execution of receiver threads in purepath tree but not that of flow and dispatcher threads.

https://docs.mulesoft.com/mule-user-guide/v/3.6/tuning-performance


If you are doing asynchronous processing, the receiver thread is used only to place the message on a staged event-driven architectur (SEDA) queue, at which point the message is transferred to a flow thread, and the receiver thread is released back into the receiver thread pool so it can carry another message.

My question is, is there existing any tagging sensor pack available that can add tracetag to a job before Mule queuing the job and read the tracetag from the job after Mule dequeuing the job?

If there is no one, any suggestion on how to create one ourselves?

I found an article (https://community.dynatrace.com/community/display/... quite useful. I did a little study of the Mule source code and I believe SedaStageInterceptingMessageProcessor.java is the class in Mule that does the enqueue and dequeue (in a different thread). My question is is there a way that we can let Dynatrace instrument the methods on this class and add our tagging code? Or do you suggest me add custom tagging code in every mule app of ours?

http://codenav.org/code.html?project=/org/mule/mule-core/3.3-M1&path=/Source%20Packages/org.mule.pro...

public class SedaStageInterceptingMessageProcessor extends AsyncInterceptingMessageProcessor
implements Work, Lifecycle, Pausable, Resumable
{
...
protected Queue queue;
...
protected void enqueue(MuleEvent event) throws Exception
{
...
queue.put(event);
...
}

protected MuleEvent dequeue() throws Exception
{
...
MuleEvent event = (MuleEvent)queue.poll(queueTimeout);
...
return event;
}
...
public void run()
{
...
while (!lifecycleManager.getState().isStopped())
{
try
{
...
event = (DefaultMuleEvent)dequeue();
}
...
if (event != null)
{
final MuleEvent eventToProcess = event;
TransactionalErrorHandlingExecutionTemplate executionTemplate = TransactionalErrorHandlingExecutionTemplate.createMainExecutionTemplate(muleContext, event.getFlowConstruct().getExceptionListener());
ExecutionCallback<MuleEvent> processingCallback = new ExecutionCallback<MuleEvent>()
{

@Override
public MuleEvent process() throws Exception
{
...
AsyncMessageProcessorWorker work = new AsyncMessageProcessorWorker(eventToProcess);
try
{
// TODO Remove this thread handoff to ensure Zero Message Loss
workManagerSource.getWorkManager().scheduleWork(work, WorkManager.INDEFINITE,
null, new AsyncWorkListener(next));
}
...
return null;
}
};

try
{
executionTemplate.execute(processingCallback);
}
...
}
}
...
}
}
2 REPLIES 2

werner_segers1
Visitor

Hi Will,

Did you manage to find a solution to this?

Cheers,

Werner

willl
Newcomer

Yes, by calling tagging ADK explicitly in the Mule apps that we care most. (PS: Use AspectJ to inject the tagging code into SedaStageInterceptingMessageProcessor may be better, but I am a bit lazy here.) Wonder will Dynatrace support this out of box...