cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

This product reached the end of support date on March 31, 2021.

corrupted purepath with threads in Java

gilgi
Mentor

Hi all,

I have this sample code:

import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
import org.zeromq.ZMQ.Poller;
import com.dynatrace.adk.DynaTraceADKFactory;


import com.dynatrace.adk.Tagging;
import java.util.Random;

public class asyncsrv
{
//This is our server task.
//It uses the multithreaded server model to deal requests out to a pool
//of workers and route replies back to clients. One worker can handle
//one request at a time but one client can talk to multiple workers at
//once.

private static Tagging tagging;
private static native void initialize();
private static native void uninitialize();
private static native void setTag(byte [] tag);
private static native void startServerPurePath();
private static native void endServerPurePath();

private static Random rand = new Random(System.nanoTime());
private static class server_task implements Runnable {
public void run() {
ZContext ctx = new ZContext();
doLogic(ctx);
ctx.destroy();
}

public void doLogic(ZContext ctx) {
Socket frontend = ctx.createSocket(ZMQ.ROUTER);
frontend.bind("tcp://*:5570");
Socket backend = ctx.createSocket(ZMQ.DEALER);
backend.bind("inproc://backend");


// Launch pool of worker threads, precise number is not critical
for (int threadNbr = 0; threadNbr < 5; threadNbr++)
new Thread(new server_worker(ctx)).start();

// Connect backend to frontend via a proxy
ZMQ.proxy(frontend, backend, null);
}
}


//Each worker task works on one request at a time and sends a random number
//of replies back, with random delays between replies:


private static class server_worker implements Runnable {
private ZContext ctx;
public server_worker(ZContext ctx) {
this.ctx = ctx;
}
public void run() {
Socket worker = ctx.createSocket(ZMQ.DEALER);
worker.connect("inproc://backend");
while (!Thread.currentThread().isInterrupted()) {
server_worker_singular_logic(worker);
}
ctx.destroy();
}


public void server_worker_singular_logic(Socket worker) {
// The DEALER socket gives us the address envelope and message
ZMsg msg = ZMsg.recvMsg(worker);
ZFrame address = msg.pop();
ZFrame content = msg.pop();
assert (content != null);
msg.destroy();


// Send 0..4 replies back
int replies = rand.nextInt(5);
for (int reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
try {
Thread.sleep(rand.nextInt(1000) + 1);
} catch (InterruptedException e) {
}
address.send(worker, ZFrame.REUSE + ZFrame.MORE);
content.send(worker, ZFrame.REUSE);
}
address.destroy();
content.destroy();
}
}


//The main thread simply starts several clients, and a server, and then
//waits for the server to finish.


public static void main(String[] args) throws Exception {
ZContext ctx = new ZContext();
new Thread(new server_task()).start();


// Run for 5 seconds then quit
Thread.sleep(5 * 1000);
ctx.destroy();
}
}

and we want to monitor it using AppMon, however ...

Since this is a server process (demo only), it does not have a "solid" shutdown mechanism and it is just being shut down by using CTRL-C. as a result of that all purepaths are ending as corrupted because "An involved agent has been shut down before it could send all data for the PurePath". we are trying to monitor only the server_worker_singular_logic method and what's happening inside it.

a sensor is defined on this method and is marked as "active and start purepath".

Any thoughts ?

2 REPLIES 2

peter_karlhuber
Dynatrace Pro
Dynatrace Pro

Just guessing, but it's probably waiting for some JMS logic to finish. Unless you need JMS tagging functionality in this use case, try unplacing the related sensors. BR, Peter

adam_gardner
Dynatrace Champion
Dynatrace Champion

I'm wondering if you could intercept the Ctrl + C command and use the Java ADK to close the purepath?

tagging.endServerPurePath();