@@ -17,7 +17,6 @@ import datadog.trace.bootstrap.instrumentation.api.Tags
1717import datadog.trace.common.writer.ListWriter
1818import datadog.trace.core.DDSpan
1919import datadog.trace.core.datastreams.StatsGroup
20- import datadog.trace.test.util.Flaky
2120import org.apache.kafka.clients.consumer.ConsumerConfig
2221import org.apache.kafka.clients.consumer.ConsumerRecord
2322import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -104,6 +103,32 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
104103 }
105104 }
106105
106+ private static class SortBatchKafkaTraces implements Comparator<List<DDSpan > > {
107+ @Override
108+ int compare (List<DDSpan > o1 , List<DDSpan > o2 ) {
109+ return Long . compare(batchSortKey(o1), batchSortKey(o2))
110+ }
111+ }
112+
113+ private static long batchSortKey (List<DDSpan > trace ) {
114+ assert ! trace. isEmpty()
115+ if (trace. get(0 ). localRootSpan. operationName. toString() == " parent" ) {
116+ return Long . MIN_VALUE
117+ }
118+ def deliverSpan = trace. find { it. operationName. toString() == " kafka.deliver" }
119+ return deliverSpan ? deliverSpan. parentId : trace. get(0 ). parentId
120+ }
121+
122+ private static List<DDSpan > producerSpans (List<List<DDSpan > > traces ) {
123+ def producerTrace = traces. find { trace ->
124+ ! trace. isEmpty() && trace. get(0 ). localRootSpan. operationName. toString() == " parent"
125+ }
126+ assert producerTrace != null
127+ return producerTrace
128+ .findAll { it. getTag(Tags . SPAN_KIND ) == Tags . SPAN_KIND_PRODUCER }
129+ .sort { it. spanId }
130+ }
131+
107132
108133 static {
109134 PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<> (3 )
@@ -835,7 +860,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
835860 producer.close()
836861 }
837862
838- @Flaky(" Repeatedly fails with a partition set to 1 but expects 0 https :// github.com/DataDog/dd-trace-java/issues/3864")
839863 def " test spring kafka template produce and batch consume" () {
840864 setup:
841865 def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
@@ -857,14 +881,14 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
857881 def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
858882 def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
859883 container.setupMessageListener(new BatchMessageListener<String, String>() {
860- @Override
861- void onMessage (List<ConsumerRecord<String , String > > consumerRecords ) {
862- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
863- consumerRecords. each {
864- records. add(it)
865- }
884+ @Override
885+ void onMessage(List<ConsumerRecord<String, String>> consumerRecords) {
886+ TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
887+ consumerRecords.each {
888+ records.add(it)
866889 }
867- })
890+ }
891+ })
868892 container.start()
869893 ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
870894
@@ -874,7 +898,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
874898 for (g in greetings) {
875899 kafkaTemplate.send(SHARED_TOPIC, g).addCallback({
876900 runUnderTrace(" producer callback" ) {}
877- }, { ex ->
901+ }, {
902+ ex ->
878903 runUnderTrace(" producer exception : " + ex) {}
879904 })
880905 }
@@ -888,17 +913,31 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
888913
889914 then:
890915 def receivedSet = greetings.toSet()
891- greetings. eachWithIndex { g , i ->
916+ def receivedRecords = []
917+ greetings.eachWithIndex {
918+ g, i ->
892919 def received = records.poll(5, TimeUnit.SECONDS)
893920 receivedSet.remove(received.value()) //maybe received out of order in case several partitions
894921 assert received.key() == null
895922
896923 def headers = received.headers()
897924 assert headers.iterator().hasNext()
925+ receivedRecords.add(received)
898926 }
899927 assert receivedSet.isEmpty()
900928
901- assertTraces(4 , SORT_TRACES_BY_ID ) {
929+ TEST_WRITER.waitForTraces(4)
930+ def traces = Arrays.asList(TEST_WRITER.toArray()) as List<List<DDSpan>>
931+ def produceSpans = producerSpans(traces)
932+ def spanIdToRecord = receivedRecords.collectEntries {
933+ record ->
934+ def header = record.headers().headers(" x- datadog- parent- id" ).iterator()
935+ assert header.hasNext()
936+ [(Long.parseLong(new String(header.next().value(), StandardCharsets.UTF_8))): record]
937+ }
938+
939+ // Batch listener delivery order can vary; match each consumer trace to its producer via the propagated parent ID.
940+ assertTraces(4, new SortBatchKafkaTraces()) {
902941 trace(7) {
903942 basicSpan(it, " parent" )
904943 basicSpan(it, " producer callback" , span(0))
@@ -910,46 +949,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
910949 }
911950
912951 if (hasQueueSpan()) {
913- trace(2 ) {
914- consumerSpan(it, consumerProperties, trace(1 )[1 ], 0 .. 0 )
915- queueSpan(it, trace(0 )[6 ])
916- }
917- trace(2 ) {
918- consumerSpan(it, consumerProperties, trace(2 )[1 ], 0 .. 1 )
919- queueSpan(it, trace(0 )[4 ])
920- }
921- trace(2 ) {
922- consumerSpan(it, consumerProperties, trace(3 )[1 ], 0 .. 1 )
923- queueSpan(it, trace(0 )[2 ])
952+ [0, 1, 2].each {
953+ i ->
954+ def expectedOffset = spanIdToRecord[produceSpans[i].spanId].offset()
955+ trace(2) {
956+ consumerSpan(it, consumerProperties, span(1), expectedOffset..expectedOffset)
957+ queueSpan(it, produceSpans[i])
958+ }
924959 }
925960 } else {
926- trace(1 ) {
927- consumerSpan(it, consumerProperties, trace(0 )[6 ], 0 .. 0 )
928- }
929- trace(1 ) {
930- consumerSpan(it, consumerProperties, trace(0 )[4 ], 0 .. 1 )
931- }
932- trace(1 ) {
933- consumerSpan(it, consumerProperties, trace(0 )[2 ], 0 .. 1 )
961+ [0, 1, 2].each {
962+ i ->
963+ def expectedOffset = spanIdToRecord[produceSpans[i].spanId].offset()
964+ trace(1) {
965+ consumerSpan(it, consumerProperties, produceSpans[i], expectedOffset..expectedOffset)
966+ }
934967 }
935968 }
936969 }
937970
938971 if (isDataStreamsEnabled()) {
939- StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
972+ StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find {
973+ it.parentHash == 0
974+ }
940975 verifyAll(first) {
941976 tags.hasAllTags(" direction :out" , " kafka_cluster_id :$clusterId" , " topic :$SHARED_TOPIC " .toString(), " type :kafka" )
942977 }
943978
944- StatsGroup second = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == first. hash }
979+ StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find {
980+ it.parentHash == first.hash
981+ }
945982 verifyAll(second) {
946983 tags.hasAllTags(
947- " direction:in" ,
948- " group:sender" ,
949- " kafka_cluster_id:$clusterId " ,
950- " topic:$SHARED_TOPIC " . toString(),
951- " type:kafka"
952- )
984+ " direction :in " ,
985+ " group :sender" ,
986+ " kafka_cluster_id :$clusterId" ,
987+ " topic :$SHARED_TOPIC " .toString(),
988+ " type :kafka"
989+ )
953990 }
954991 }
955992
@@ -981,16 +1018,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
9811018
9821019 // setup a Kafka message listener
9831020 container.setupMessageListener(new MessageListener<String, String>() {
984- @Override
985- void onMessage (ConsumerRecord<String , String > record ) {
986- TEST_WRITER . waitForTraces(1 ) // ensure consistent ordering of traces
987- records. add(record)
988- if (isDataStreamsEnabled()) {
989- // even if header propagation is disabled, we want data streams to work.
990- TEST_DATA_STREAMS_WRITER . waitForGroups(2 )
991- }
1021+ @Override
1022+ void onMessage(ConsumerRecord<String, String> record) {
1023+ TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
1024+ records.add(record)
1025+ if (isDataStreamsEnabled()) {
1026+ // even if header propagation is disabled, we want data streams to work.
1027+ TEST_DATA_STREAMS_WRITER.waitForGroups(2)
9921028 }
993- })
1029+ }
1030+ })
9941031
9951032 // start the container and underlying message listener
9961033 container.start()
@@ -1028,9 +1065,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10281065 def existingSpanId = 9876543210987654L
10291066 def headers = new RecordHeaders()
10301067 headers.add(new RecordHeader(" x- datadog- trace- id" ,
1031- String . valueOf(existingTraceId). getBytes(StandardCharsets . UTF_8 )))
1068+ String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
10321069 headers.add(new RecordHeader(" x- datadog- parent- id" ,
1033- String . valueOf(existingSpanId). getBytes(StandardCharsets . UTF_8 )))
1070+ String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
10341071
10351072 when:
10361073 def record = new ProducerRecord(SHARED_TOPIC, 0, null, " test- context- extraction" , headers)
@@ -1063,16 +1100,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10631100 def oldExtractorsByType = extractorsByTypeField.get(TEST_DATA_STREAMS_MONITORING)
10641101
10651102 def extractor = new DataStreamsTransactionExtractor() {
1066- String getName () {
1067- return " kafka-produce-test"
1068- }
1069- DataStreamsTransactionExtractor.Type getType () {
1070- return DataStreamsTransactionExtractor.Type . KAFKA_PRODUCE_HEADERS
1071- }
1072- String getValue () {
1073- return " x-transaction-id"
1074- }
1103+ String getName() {
1104+ return " kafka- produce- test"
1105+ }
1106+ DataStreamsTransactionExtractor.Type getType() {
1107+ return DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS
10751108 }
1109+ String getValue() {
1110+ return " x- transaction- id"
1111+ }
1112+ }
10761113 def extractorsByType = new EnumMap<>(DataStreamsTransactionExtractor.Type)
10771114 extractorsByType.put(DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS, [extractor])
10781115 extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, extractorsByType)
0 commit comments