Skip to content

Commit 1ae057c

Browse files
authored
fix: fix trace ctx should not be overwrite (#648)
1 parent 59ac5e6 commit 1ae057c

3 files changed

Lines changed: 58 additions & 16 deletions

File tree

internal/core/local_runtime/environment_python.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
func (p *LocalPluginRuntime) InitPythonEnvironment() error {
1212
// root span for python env init
13-
_, span := p.startSpan("python.init_env", attribute.String("plugin.identity", p.Config.Identity()))
13+
_, span := p.startSpan("python.init_env", attribute.String("plugin.identity", p.Config.Identity()))
1414
defer span.End()
1515

1616
// prepare uv environment
@@ -25,7 +25,10 @@ _, span := p.startSpan("python.init_env", attribute.String("plugin.identity", p.
2525
case ErrVirtualEnvironmentInvalid:
2626
// remove the venv and rebuild it
2727
log.Warn("virtual environment for %s is invalid; deleting and recreating", p.Config.Identity())
28-
p.deleteVirtualEnvironment()
28+
err = p.deleteVirtualEnvironment()
29+
if err != nil {
30+
return err
31+
}
2932

3033
// create virtual environment
3134
venv, err = p.createVirtualEnvironment(uvPath)

internal/core/local_runtime/otel_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,12 @@ import (
1212
)
1313

1414
func TestPythonInitSpansReuseUpstreamTrace(t *testing.T) {
15-
// setup tracer provider with span recorder
1615
sr := tracetest.NewSpanRecorder()
1716
tp := sdktrace.NewTracerProvider()
1817
tp.RegisterSpanProcessor(sr)
1918
gootel.SetTracerProvider(tp)
2019
gootel.SetTextMapPropagator(propagation.TraceContext{})
2120

22-
// create parent span with known trace id
2321
ctx := context.Background()
2422
tr := gootel.Tracer("test")
2523
ctx, parent := tr.Start(ctx, "parent")
@@ -34,11 +32,44 @@ func TestPythonInitSpansReuseUpstreamTrace(t *testing.T) {
3432
spans := sr.Ended()
3533
require.Len(t, spans, 2)
3634

37-
// ensure child trace id matches parent trace id
3835
parentSpan := spans[0]
3936
childSpan := spans[1]
4037
if parentSpan.Name() == "python.init_env" {
4138
parentSpan, childSpan = childSpan, parentSpan
4239
}
4340
require.Equal(t, parentSpan.SpanContext().TraceID(), childSpan.SpanContext().TraceID())
4441
}
42+
43+
func TestStartSpanDoesNotAffectSubsequentSpans(t *testing.T) {
44+
sr := tracetest.NewSpanRecorder()
45+
tp := sdktrace.NewTracerProvider()
46+
tp.RegisterSpanProcessor(sr)
47+
gootel.SetTracerProvider(tp)
48+
gootel.SetTextMapPropagator(propagation.TraceContext{})
49+
50+
ctx := context.Background()
51+
tr := gootel.Tracer("test")
52+
ctx, parent := tr.Start(ctx, "parent")
53+
54+
runtime := &LocalPluginRuntime{}
55+
runtime.SetTraceContext(ctx)
56+
57+
ctx1, span1 := runtime.startSpan("span1")
58+
span1.End()
59+
60+
ctx2, span2 := runtime.startSpan("span2")
61+
62+
require.False(t, ctx1.Err() != nil, "ctx1 should not be canceled yet")
63+
require.False(t, ctx2.Err() != nil, "ctx2 should not be canceled")
64+
65+
span2.End()
66+
parent.End()
67+
68+
spans := sr.Ended()
69+
require.Len(t, spans, 3)
70+
71+
traceID := spans[0].SpanContext().TraceID()
72+
for _, span := range spans {
73+
require.Equal(t, traceID, span.SpanContext().TraceID(), "all spans should share the same trace ID")
74+
}
75+
}

internal/core/local_runtime/setup_python_environment.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,14 @@ func (p *LocalPluginRuntime) ensureTraceCtx() context.Context {
4444
}
4545

4646
func (p *LocalPluginRuntime) startSpan(name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) {
47-
ctx := p.ensureTraceCtx()
48-
ctx, sp := p.otelTracer().Start(ctx, name)
47+
rootCtx := p.ensureTraceCtx()
48+
ctx, sp := p.otelTracer().Start(rootCtx, name)
4949
if id, ok := log.IdentityFromContext(ctx); ok && id.TenantID != "" {
5050
sp.SetAttributes(attribute.String("tenant_id", id.TenantID))
5151
}
5252
if len(attrs) > 0 {
5353
sp.SetAttributes(attrs...)
5454
}
55-
// keep last context for potential child spans
56-
p.traceCtx = ctx
5755
return ctx, sp
5856
}
5957

@@ -205,11 +203,15 @@ func (p *LocalPluginRuntime) installDependencies(
205203

206204
defer func() {
207205
if cmd.Process != nil {
208-
cmd.Process.Kill()
206+
err := cmd.Process.Kill()
207+
if err != nil {
208+
log.Warn("failed to kill python process", "error", err)
209+
}
209210
}
210211
}()
211212

212213
var errMsg strings.Builder
214+
var errMsgMu sync.Mutex
213215
var wg sync.WaitGroup
214216
wg.Add(2)
215217

@@ -220,14 +222,12 @@ func (p *LocalPluginRuntime) installDependencies(
220222
routinepkg.RoutineLabelKeyMethod: "InitPythonEnvironment",
221223
}, func() {
222224
defer wg.Done()
223-
// read stdout
224225
buf := make([]byte, 1024)
225226
for {
226227
n, err := stdout.Read(buf)
227228
if err != nil {
228229
break
229230
}
230-
// FIXME: move the log to separated layer
231231
log.Info("installing plugin", "plugin", p.Config.Identity(), "output", string(buf[:n]))
232232
lastActiveAt = time.Now()
233233
}
@@ -238,20 +238,23 @@ func (p *LocalPluginRuntime) installDependencies(
238238
routinepkg.RoutineLabelKeyMethod: "InitPythonEnvironment",
239239
}, func() {
240240
defer wg.Done()
241-
// read stderr
242241
buf := make([]byte, 1024)
243242
for {
244243
n, err := stderr.Read(buf)
245-
if err != nil && err != os.ErrClosed {
244+
if err != nil && !errors.Is(err, os.ErrClosed) {
246245
lastActiveAt = time.Now()
246+
errMsgMu.Lock()
247247
errMsg.WriteString(string(buf[:n]))
248+
errMsgMu.Unlock()
248249
break
249-
} else if err == os.ErrClosed {
250+
} else if errors.Is(err, os.ErrClosed) {
250251
break
251252
}
252253

253254
if n > 0 {
255+
errMsgMu.Lock()
254256
errMsg.WriteString(string(buf[:n]))
257+
errMsgMu.Unlock()
255258
lastActiveAt = time.Now()
256259
}
257260
}
@@ -271,11 +274,16 @@ func (p *LocalPluginRuntime) installDependencies(
271274
if time.Since(lastActiveAt) > time.Duration(
272275
p.appConfig.PythonEnvInitTimeout,
273276
)*time.Second {
274-
cmd.Process.Kill()
277+
err := cmd.Process.Kill()
278+
errMsgMu.Lock()
279+
if err != nil {
280+
errMsg.WriteString(fmt.Sprintf("failed to kill python process: %s", err))
281+
}
275282
errMsg.WriteString(fmt.Sprintf(
276283
"init process exited due to no activity for %d seconds",
277284
p.appConfig.PythonEnvInitTimeout,
278285
))
286+
errMsgMu.Unlock()
279287
break
280288
}
281289
}

0 commit comments

Comments
 (0)