cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

Char encoding issue between Real Time BT Feed and Flume

a_selles
Newcomer

Hi everyone,

I'm installing a Flume instance to receive data from the dyntrace server via Real Time Business Transactions Feed, as a bridge to send it to elasticsearch.

I managed storing BTs in ES, but the messages are full of unreadable characters. Examples can bee found at the Flume log files, like the following:

21 abr 2017 10:50:08,509 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.client.DefaultRequestDirector.tryExecute:683)  - Attempt 1 to execute request
21 abr 2017 10:50:08,510 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:268) - Sending request: POST /_bulk HTTP/1.1
21 abr 2017 10:50:08,511 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "POST /_bulk HTTP/1.1[\r][\n]"
21 abr 2017 10:50:08,512 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "Content-Length: 757[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "Content-Type: text/plain; charset=ISO-8859-1[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "Host: <host_key>.eu-west-1.es.amazonaws.com:80[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "Connection: Keep-Alive[\r][\n]"
21 abr 2017 10:50:08,514 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "User-Agent: Apache-HttpClient/4.5 (Java/1.8.0_102)[\r][\n]"
21 abr 2017 10:50:08,514 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "[\r][\n]"
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:272) - >> POST /_bulk HTTP/1.1
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275) - >> Content-Length: 757
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275) - >> Content-Type: text/plain; charset=ISO-8859-1
21 abr 2017 10:50:08,516 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275) - >> Host: <host_key>.eu-west-1.es.amazonaws.com:80
21 abr 2017 10:50:08,516 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275) - >> Connection: Keep-Alive
21 abr 2017 10:50:08,517 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275) - >> User-Agent: Apache-HttpClient/4.5 (Java/1.8.0_102)
21 abr 2017 10:50:08,517 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "{"index":{"_index":"pp-2017-04-21","_type":"item"}}[\n]"
21 abr 2017 10:50:08,518 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72) - >> "{"@message":"\n!Motor Hotel Valoracion Reserva PP\u0010\u0000\u001A\u000Efront terceras*\u001FMotor Hotel Prebook - Proveedor*\u001DMotor Hotel Prebook - Destino2h\b?????+*!PT=78408;PA=1461627692;PS=3000787:\u0003PAV:\u0003QXUy\u0000\u0000\u0000?F??@?\u0001`u???\u0019b@?\u0001\u0000\u0000\u0000??:@?\u0001\u0000?????@?\u0001\u0000\u0000\u0000?F??@?\u0001\u00002i\b?????+*!PT=78431;PA=1461627692;PS=3000787:\u0003BAR:\u00046679y\u0000\u0000\u0000?d ?@?\u0001fg?s??\\@?\u0001\u0000\u0000`}???@?\u0001\u0000\u0000?p?\b?@?\u0001\u0000\u0000\u0000\u0000tL?@?\u0001\u0000:\nProduction","@fields":{"btType":"PUREPATH","server":"apm1","btName":"Motor Hotel Valoracion Reserva PP","systemProfile":"Production"}}[\n]"

I would like to know if someone else has dealt with this issue.

This is the Flume configuration (based on Big Data Business Transaction Bridge😞

# Name the components on this agent
agent1.sources = HTTPSource
agent1.sinks = PurePathSink UserActionSink VisitSink
agent1.channels = PurePathChannel UserActionChannel VisitChannel

# Describe/configure HTTPSource
agent1.sources.HTTPSource.type = org.apache.flume.source.http.HTTPSource
agent1.sources.HTTPSource.port = 4321
agent1.sources.HTTPSource.handler = com.dynatrace.diagnostics.btexport.flume.BtExportHandler

# Describe sinks
agent1.sinks.PurePathSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.PurePathSink.client = rest
agent1.sinks.PurePathSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.PurePathSink.indexName = pp
agent1.sinks.PurePathSink.indexType = item
agent1.sinks.PurePathSink.clusterName = <cluster>
agent1.sinks.PurePathSink.batchSize = 100
agent1.sinks.PurePathSink.serializer.charset = ISO-8859-1
agent1.sinks.PurePathSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtPurePathSerializerBuilder
agent1.sinks.PurePathSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

agent1.sinks.UserActionSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.UserActionSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.UserActionSink.client = rest
agent1.sinks.UserActionSink.indexName = ua
agent1.sinks.UserActionSink.indexType = item
agent1.sinks.UserActionSink.clusterName = <cluster>
agent1.sinks.UserActionSink.batchSize = 100
agent1.sinks.UserActionSink.serializer.charset = ISO-8859-1
agent1.sinks.UserActionSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtPageActionSerializerBuilder
agent1.sinks.UserActionSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

agent1.sinks.VisitSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.VisitSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.VisitSink.client = rest
agent1.sinks.VisitSink.indexName = vt
agent1.sinks.VisitSink.indexType = item
agent1.sinks.VisitSink.clusterName = <cluster>
agent1.sinks.VisitSink.batchSize = 100
agent1.sinks.VisitSink.serializer.charset = ISO-8859-1
agent1.sinks.VisitSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtVisitSerializerBuilder
agent1.sinks.VisitSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

# Use a channel which buffers events in memory
agent1.channels.PurePathChannel.type = memory
agent1.channels.PurePathChannel.capacity = 1000
agent1.channels.PurePathChannel.transactionCapactiy = 100

agent1.channels.UserActionChannel.type = memory
agent1.channels.UserActionChannel.capacity = 1000
agent1.channels.UserActionChannel.transactionCapactiy = 100

agent1.channels.VisitChannel.type = memory
agent1.channels.VisitChannel.capacity = 1000
agent1.channels.VisitChannel.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.HTTPSource.channels = PurePathChannel UserActionChannel VisitChannel
agent1.sinks.PurePathSink.channel = PurePathChannel
agent1.sinks.UserActionSink.channel = UserActionChannel
agent1.sinks.VisitSink.channel = VisitChannel

agent1.sources.HTTPSource.selector.type = multiplexing
agent1.sources.HTTPSource.selector.header = btType
agent1.sources.HTTPSource.selector.mapping.PUREPATH = PurePathChannel
agent1.sources.HTTPSource.selector.mapping.PAGE_ACTION = UserActionChannel
agent1.sources.HTTPSource.selector.mapping.USER_ACTION = UserActionChannel
agent1.sources.HTTPSource.selector.mapping.VISIT = VisitChannel
agent1.sources.HTTPSource.selector.default = PurePathChannel

Thanks.

1 REPLY 1

a_selles
Newcomer

Issue solved.

Config entry "agent1.sinks.PurePathSink.serializerBuilder" does not work, and makes the default serializer do the job, which means sendind to ES the request data as is.

Request data is formatted in a way that is parseable by com.dynatrace.diagnostics.core.realtime.export.BtExport.BusinessTransaction (available at Big Data Business Transaction Bridge). However, the serializer does not work to send the data to ES.

After understanding what was happening, we have written a new serializer that converts the request into JSON format (gracefully made by BtExport.BusinessTransaction), and then reads the JSON to build the ES message.

The goal of this solution is to send Business transactions to ES through flume, without any other layer.

A piece of the solution is:

public class BTSerializer implements ElasticSearchEventSerializer {

@Override
public void configure(Context arg0) {
}

@Override
public void configure(ComponentConfiguration arg0) {
}

@Override
public BytesStream getContentBuilder(Event event) throws IOException {
// Parse request data into JSON
BusinessTransaction parsedBt = BusinessTransaction.parseFrom(event.getBody());

XContentBuilder builder = XContentFactory.jsonBuilder().startObject();

// Build ES body message: the full content
builder.field("@message", parsedBt.getAllFields().toString());

// Build ES keys
appendIndexes(builder.startObject("@fields"), event, parsedBt).endObject();
builder.endObject();

return builder;
}

private XContentBuilder appendIndexes(XContentBuilder content, Event event, BusinessTransaction parsedBt) throws IOException {

// All the headers will be keys in ES
Map<String, String> headers = event.getHeaders();
for (String key : headers.keySet()) {
content.field(key, headers.get(key));
}

try {
for (Map.Entry<FieldDescriptor,Object> entry : parsedBt.getAllFields().entrySet()) {
FieldDescriptor field = entry.getKey();
Object value = entry.getValue();
String fieldName = field.getName();

// ApplicationName will be a ES-key
if ("application".equals(fieldName)) {
content.field(fieldName, value.toString());
} else if ("occurrences".equals(fieldName)) {

// It comes here as maximum the number of messages set as the Bulk Size. Must be set to 1 (improve pending)
@SuppressWarnings("unchecked")
List<com.google.protobuf.GeneratedMessage> listOfMsg = (List<com.google.protobuf.GeneratedMessage>) value;


for (com.google.protobuf.GeneratedMessage singleMsg: listOfMsg) {


// Desired BT fields might be managed here
for (Map.Entry<FieldDescriptor,Object> fieldEntry : singleMsg.getAllFields().entrySet()) {
FieldDescriptor afield = fieldEntry.getKey();
Object avalue = fieldEntry.getValue();
String afieldName = afield.getName();


// StartTime will be the ES-timestamp-key
if ("startTime".equals(afieldName)) {
Date dt = new Date((Long) avalue);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
df.setTimeZone(TimeZone.getDefault());
String date = df.format(dt);
content.field("timestamp", date);
}
}
}
}
}
} catch( Exception e) {
log.error("Something wrong indexing ES message: " + e.getMessage());
}


return content;
}


}

We are already working in it.