@@ -11,36 +11,33 @@ import (
1111 "github.com/memodb-io/Acontext/internal/config"
1212 amqp "github.com/rabbitmq/amqp091-go"
1313 "go.opentelemetry.io/otel"
14- "go.opentelemetry.io/otel/attribute"
14+ "go.opentelemetry.io/otel/propagation"
15+ semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
1516 "go.opentelemetry.io/otel/trace"
1617 "go.uber.org/zap"
1718)
1819
19- // tableCarrier adapts amqp.Table to TextMapCarrier for OpenTelemetry propagation
20- type tableCarrier struct {
21- table amqp.Table
22- }
23-
24- func (c tableCarrier ) Get (key string ) string {
25- if val , ok := c .table [key ]; ok {
26- if str , ok := val .(string ); ok {
27- return str
28- }
29- return fmt .Sprintf ("%v" , val )
20+ // injectTraceContext injects trace context into AMQP headers
21+ func injectTraceContext (ctx context.Context , headers amqp.Table ) {
22+ carrier := propagation.MapCarrier {}
23+ otel .GetTextMapPropagator ().Inject (ctx , carrier )
24+ for k , v := range carrier {
25+ headers [k ] = v
3026 }
31- return ""
32- }
33-
34- func (c tableCarrier ) Set (key , value string ) {
35- c .table [key ] = value
3627}
3728
38- func ( c tableCarrier ) Keys () [] string {
39- keys := make ([] string , 0 , len ( c . table ))
40- for k := range c . table {
41- keys = append ( keys , k )
29+ // extractTraceContext extracts trace context from AMQP headers
30+ func extractTraceContext ( ctx context. Context , headers amqp. Table ) context. Context {
31+ if headers == nil {
32+ return ctx
4233 }
43- return keys
34+ carrier := propagation.MapCarrier {}
35+ for k , v := range headers {
36+ if str , ok := v .(string ); ok {
37+ carrier [k ] = str
38+ }
39+ }
40+ return otel .GetTextMapPropagator ().Extract (ctx , carrier )
4441}
4542
4643// DialFunc is a function type for establishing RabbitMQ connections
@@ -207,21 +204,22 @@ func (p *Publisher) PublishJSON(ctx context.Context, exchangeName string, routin
207204 return err
208205 }
209206
210- // Create a span for the publish operation
207+ // Create producer span using semantic conventions
211208 tracer := otel .Tracer (p .cfg .App .Name )
212- ctx , span := tracer .Start (ctx , "rabbitmq.publish" ,
209+ ctx , span := tracer .Start (ctx , fmt .Sprintf ("%s publish" , exchangeName ),
210+ trace .WithSpanKind (trace .SpanKindProducer ),
213211 trace .WithAttributes (
214- attribute .String ("messaging.system" , "rabbitmq" ),
215- attribute .String ("messaging.destination" , exchangeName ),
216- attribute .String ("messaging.destination_kind" , "exchange" ),
217- attribute .String ("messaging.rabbitmq.routing_key" , routingKey ),
212+ semconv .MessagingSystemRabbitmq ,
213+ semconv .MessagingDestinationName (exchangeName ),
214+ semconv .MessagingRabbitmqDestinationRoutingKey (routingKey ),
215+ semconv .MessagingOperationPublish ,
216+ semconv .MessagingMessageBodySize (len (b )),
218217 ))
219218 defer span .End ()
220219
221220 // Inject trace context into message headers
222221 headers := make (amqp.Table )
223- propagator := otel .GetTextMapPropagator ()
224- propagator .Inject (ctx , tableCarrier {table : headers })
222+ injectTraceContext (ctx , headers )
225223
226224 publishing := amqp.Publishing {
227225 ContentType : "application/json" ,
@@ -238,13 +236,11 @@ func (p *Publisher) PublishJSON(ctx context.Context, exchangeName string, routin
238236 return fmt .Errorf ("failed to get channel: %w" , err )
239237 }
240238
241- err = ch .PublishWithContext (ctx , exchangeName , routingKey , false , false , publishing )
242- if err != nil {
239+ if err := ch .PublishWithContext (ctx , exchangeName , routingKey , false , false , publishing ); err != nil {
243240 span .RecordError (err )
244241 return err
245242 }
246243
247- span .SetAttributes (attribute .Int ("messaging.message.body.size" , len (b )))
248244 return nil
249245}
250246
@@ -276,7 +272,6 @@ func (c *Consumer) Handle(ctx context.Context, handler func([]byte) error) error
276272 }
277273
278274 tracer := otel .Tracer (c .cfg .App .Name )
279- propagator := otel .GetTextMapPropagator ()
280275
281276 for {
282277 select {
@@ -287,34 +282,39 @@ func (c *Consumer) Handle(ctx context.Context, handler func([]byte) error) error
287282 return errors .New ("consumer channel closed" )
288283 }
289284
290- // Extract trace context from message headers
291- msgCtx := ctx
292- if m .Headers != nil {
293- msgCtx = propagator .Extract (ctx , tableCarrier {table : m .Headers })
294- }
285+ c .handleMessage (ctx , m , tracer , handler )
286+ }
287+ }
288+ }
295289
296- // Create a span for the consume operation
297- // Note: We don't use the returned context since handler doesn't accept context
298- _ , span := tracer . Start ( msgCtx , "rabbitmq.consume" ,
299- trace . WithAttributes (
300- attribute . String ( "messaging.system" , "rabbitmq" ) ,
301- attribute . String ( "messaging.destination" , c . q . Name ) ,
302- attribute . String ( "messaging.destination_kind" , "queue" ),
303- attribute . String ( "messaging.operation" , "receive" ),
304- attribute . Int ( "messaging.message.body.size" , len ( m . Body )),
305- ))
306- defer span . End ()
307-
308- // Execute handler with trace context
309- // Note: handler receives []byte, not context, so trace context is propagated via span
310- if err := handler ( m . Body ); err != nil {
311- span . RecordError ( err )
312- _ = m . Nack ( false , true ) // Processing failed, requeue.
313- c . log . Sugar (). Errorw ( "consume error" , "err" , err )
314- continue
315- }
290+ // handleMessage processes a single message with tracing
291+ func ( c * Consumer ) handleMessage (
292+ ctx context. Context ,
293+ m amqp. Delivery ,
294+ tracer trace. Tracer ,
295+ handler func ([] byte ) error ,
296+ ) {
297+ // Extract trace context from message headers
298+ msgCtx := extractTraceContext ( ctx , m . Headers )
299+
300+ // Create consumer span using semantic conventions
301+ _ , span := tracer . Start ( msgCtx , fmt . Sprintf ( "%s receive" , c . q . Name ),
302+ trace . WithSpanKind ( trace . SpanKindConsumer ),
303+ trace . WithAttributes (
304+ semconv . MessagingSystemRabbitmq ,
305+ semconv . MessagingDestinationName ( c . q . Name ),
306+ semconv . MessagingOperationReceive ,
307+ semconv . MessagingMessageBodySize ( len ( m . Body )),
308+ ))
309+ defer span . End ()
316310
317- _ = m .Ack (false )
318- }
311+ // Execute handler
312+ if err := handler (m .Body ); err != nil {
313+ span .RecordError (err )
314+ _ = m .Nack (false , true ) // Processing failed, requeue.
315+ c .log .Sugar ().Errorw ("consume error" , "err" , err )
316+ return
319317 }
318+
319+ _ = m .Ack (false )
320320}
0 commit comments