Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ public final class ConfigDefaults {
static final int DEFAULT_METRICS_OTEL_TIMEOUT = 7_500; // ms
static final int DEFAULT_METRICS_OTEL_CARDINALITY_LIMIT = 2_000;

static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
public static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms

static final String DEFAULT_OTLP_HTTP_METRICS_ENDPOINT = "v1/metrics";
static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
static final String DEFAULT_OTLP_HTTP_PORT = "4318";
static final String DEFAULT_OTLP_GRPC_PORT = "4317";
public static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
public static final String DEFAULT_OTLP_HTTP_PORT = "4318";
public static final String DEFAULT_OTLP_GRPC_PORT = "4317";

static final int DEFAULT_DOGSTATSD_START_DELAY = 15; // seconds

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.common.writer;

import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import datadog.trace.core.otlp.trace.OtlpTraceCollector;
import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

final class OtlpPayloadDispatcher implements PayloadDispatcher {
private final OtlpTraceCollector collector;
private final OtlpSender sender;

OtlpPayloadDispatcher(OtlpSender sender) {
this(sender, OtlpTraceProtoCollector.INSTANCE);
}

OtlpPayloadDispatcher(OtlpSender sender, OtlpTraceCollector collector) {
this.sender = sender;
this.collector = collector;
}

@Override
public void addTrace(List<? extends CoreSpan<?>> trace) {
List<CoreSpan<?>> sampled = null;
for (CoreSpan<?> span : trace) {
if (shouldExport(span)) {
if (sampled == null) {
sampled = new ArrayList<>(trace.size());
}
sampled.add(span);
}
}
if (sampled != null) {
collector.addTrace(sampled);
}
}

@Override
public void flush() {
OtlpPayload payload = collector.collectTraces();
if (payload.getContentLength() > 0) {
sender.send(payload);
}
}

@Override
public void onDroppedTrace(int spanCount) {
// TODO: surface drop counts via HealthMetrics
}

@Override
public Collection<RemoteApi> getApis() {
return Collections.emptyList();
}

private static boolean shouldExport(CoreSpan<?> span) {
if (span.samplingPriority() > 0) {
return true;
}
return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package datadog.trace.common.writer;

import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;

import datadog.communication.ddagent.DroppingPolicy;
import datadog.trace.api.config.OtlpConfig;
import datadog.trace.common.sampling.SingleSpanSampler;
import datadog.trace.common.writer.ddagent.Prioritization;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpSender;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OtlpWriter extends RemoteWriter {

private static final int BUFFER_SIZE = 1024;
private static final String TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
private static final String DEFAULT_OTLP_HTTP_ENDPOINT =
"http://localhost:" + DEFAULT_OTLP_HTTP_PORT + TRACES_SIGNAL_PATH;

public static OtlpWriterBuilder builder() {
return new OtlpWriterBuilder();
}

private final OtlpSender sender;

OtlpWriter(
TraceProcessingWorker worker,
PayloadDispatcher dispatcher,
OtlpSender sender,
HealthMetrics healthMetrics,
int flushTimeout,
TimeUnit flushTimeoutUnit,
boolean alwaysFlush) {
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
this.sender = sender;
}

@Override
public void close() {
super.close();
sender.shutdown();
}

public static class OtlpWriterBuilder {
private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT;
private Map<String, String> headers = Collections.emptyMap();
private int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(10);
private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF;
private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE;
private int traceBufferSize = BUFFER_SIZE;
private HealthMetrics healthMetrics = HealthMetrics.NO_OP;
private int flushIntervalMilliseconds = 1000;
private int flushTimeout = 1;
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
private boolean alwaysFlush = false;
private SingleSpanSampler singleSpanSampler;
private OtlpSender sender;

public OtlpWriterBuilder endpoint(String endpoint) {
this.endpoint = endpoint;
return this;
}

public OtlpWriterBuilder headers(Map<String, String> headers) {
this.headers = headers;
return this;
}

public OtlpWriterBuilder timeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
return this;
}

public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) {
this.protocol = protocol;
return this;
}

public OtlpWriterBuilder compression(OtlpConfig.Compression compression) {
this.compression = compression;
return this;
}

public OtlpWriterBuilder traceBufferSize(int traceBufferSize) {
this.traceBufferSize = traceBufferSize;
return this;
}

public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) {
this.healthMetrics = healthMetrics;
return this;
}

public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) {
this.flushIntervalMilliseconds = flushIntervalMilliseconds;
return this;
}

public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
this.flushTimeout = flushTimeout;
this.flushTimeoutUnit = flushTimeoutUnit;
return this;
}

public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) {
this.alwaysFlush = alwaysFlush;
return this;
}

public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) {
this.singleSpanSampler = singleSpanSampler;
return this;
}

OtlpWriterBuilder sender(OtlpSender sender) {
this.sender = sender;
return this;
}

public OtlpWriter build() {
if (sender == null) {
sender =
protocol == OtlpConfig.Protocol.GRPC
? new OtlpGrpcSender(
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression)
: new OtlpHttpSender(
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression);
}

final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender);
final TraceProcessingWorker worker =
new TraceProcessingWorker(
traceBufferSize,
healthMetrics,
dispatcher,
DroppingPolicy.DISABLED,
Prioritization.ENSURE_TRACE,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
singleSpanSampler);

return new OtlpWriter(
worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.DD_INTAKE_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.LOGGING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.MULTI_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.OTLP_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.PRINTING_WRITER_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.TRACE_STRUCTURE_WRITER_TYPE;
import static datadog.trace.common.writer.ddagent.Prioritization.ENSURE_TRACE;
Expand Down Expand Up @@ -53,6 +54,8 @@ public static Writer createWriter(
final HealthMetrics healthMetrics,
String configuredType) {

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);

if (LOGGING_WRITER_TYPE.equals(configuredType)) {
return new LoggingWriter();
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
Expand All @@ -63,6 +66,17 @@ public static Writer createWriter(
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
return new MultiWriter(
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
} else if (OTLP_WRITER_TYPE.equals(configuredType)) {
return OtlpWriter.builder()
.endpoint(config.getOtlpTracesEndpoint())
.headers(config.getOtlpTracesHeaders())
.protocol(config.getOtlpTracesProtocol())
.compression(config.getOtlpTracesCompression())
.timeoutMillis(config.getOtlpTracesTimeout())
.healthMetrics(healthMetrics)
.spanSamplingRules(singleSpanSampler)
.flushIntervalMilliseconds(flushIntervalMilliseconds)
.build();
}

if (!DD_AGENT_WRITER_TYPE.equals(configuredType)
Expand All @@ -80,7 +94,6 @@ public static Writer createWriter(
"Using 'EnsureTrace' prioritization type. (Do not use this type if your application is running in production mode)");
}

int flushIntervalMilliseconds = Math.round(config.getTraceFlushIntervalSeconds() * 1000);
DDAgentFeaturesDiscovery featuresDiscovery = commObjects.featuresDiscovery(config);

// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody
import spock.lang.Ignore

class WriterFactoryTest extends DDSpecification {

Expand Down Expand Up @@ -165,6 +166,12 @@ class WriterFactoryTest extends DDSpecification {
"DDIntakeWriter" | false | false | true | DDIntakeWriter | [DDIntakeApi]
}

@Ignore("TODO: implement — verify that writerType=OtlpWriter plus OTLP config getters on Config produce an OtlpWriter with the expected endpoint/protocol/compression/headers/timeout wired through")
def "test writer creation for OtlpWriter"() {
expect:
true
}

Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
def endpoints = []
if (hasEvpProxy && evpProxySupportsCompression) {
Expand Down
Loading
Loading