Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kubernetes/internal/controller/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

const (
AnnoAllocStatusKey = "sandbox.opensandbox.io/alloc-status"
AnnoAllocReleaseKey = "sandbox.opensandbox.io/alloc-release"
LabelBatchSandboxPodIndexKey = "batch-sandbox.sandbox.opensandbox.io/pod-index"
AnnoAllocStatusKey = "sandbox.opensandbox.io/alloc-status"
AnnoAllocReleaseKey = "sandbox.opensandbox.io/alloc-release"
AnnotationEndpointsLastTransTime = "sandbox.opensandbox.io/endpoints-last-trans-time"
LabelBatchSandboxPodIndexKey = "batch-sandbox.sandbox.opensandbox.io/pod-index"

FinalizerTaskCleanup = "batch-sandbox.sandbox.opensandbox.io/task-cleanup"
)
Expand Down
21 changes: 18 additions & 3 deletions kubernetes/internal/controller/batchsandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -86,6 +87,10 @@ type BatchSandboxReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile
func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
start := time.Now()
defer func() {
log.Info("Reconcile finished", "latencyMs", time.Since(start).Milliseconds())
}()
var aggErrors []error
defer func() {
_ = DurationStore.Pop(req.String())
Expand Down Expand Up @@ -186,11 +191,16 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
"metadata": map[string]any{
"annotations": map[string]string{
AnnotationSandboxEndpoints: string(raw),
// example 2026-04-15T12:19:11.696+08:00, use ms precision
AnnotationEndpointsLastTransTime: time.Now().Format("2006-01-02T15:04:05.000Z07:00"),
},
},
})
obj := &sandboxv1alpha1.BatchSandbox{ObjectMeta: metav1.ObjectMeta{Namespace: batchSbx.Namespace, Name: batchSbx.Name}}
if err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData)); err != nil {
start := time.Now()
err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData))
log.Info("Sync sandbox endpoint", "sandbox", klog.KObj(batchSbx), "latencyMs", time.Since(start).Milliseconds(), "success", strconv.FormatBool(err == nil))
if err != nil {
log.Error(err, "failed to patch annotation", "annotation", AnnotationSandboxEndpoints, "body", string(patchData))
aggErrors = append(aggErrors, err)
}
Expand Down Expand Up @@ -226,8 +236,13 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
})
if err != nil {
aggErrors = append(aggErrors, err)
} else if err := r.Status().Patch(ctx, batchSbx, client.RawPatch(types.MergePatchType, patchData)); err != nil {
aggErrors = append(aggErrors, err)
} else {
start := time.Now()
err := r.Status().Patch(ctx, batchSbx, client.RawPatch(types.MergePatchType, patchData))
log.Info("Update sandbox status", "sandbox", klog.KObj(batchSbx), "latencyMs", time.Since(start).Milliseconds(), "success", strconv.FormatBool(err == nil))
if err != nil {
aggErrors = append(aggErrors, err)
}
}
}

Expand Down
76 changes: 76 additions & 0 deletions kubernetes/internal/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package controller

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const (
summaryMaxAge = time.Second * 30
summaryAgeBuckets uint32 = 3
metricNamespace = "opensandbox-controller"
)

var (
allocatorScheduleDurationSummary = func() *prometheus.SummaryVec {
return prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: metricNamespace,
Subsystem: "allocator",
Name: "schedule_duration_ms",
MaxAge: summaryMaxAge,
AgeBuckets: summaryAgeBuckets,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{"namespace", "pool_name", "success"},
)
}()
allocatorPersistAllocationStateDurationSummary = func() *prometheus.SummaryVec {
return prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: metricNamespace,
Subsystem: "allocator",
Name: "persist_alloc_state_duration_ms",
MaxAge: summaryMaxAge,
AgeBuckets: summaryAgeBuckets,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{"namespace", "pool_name", "success"},
)
}()
allocatorSyncAllocResultDurationSummary = func() *prometheus.SummaryVec {
return prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: metricNamespace,
Subsystem: "allocator",
Name: "sync_alloc_result_duration_ms",
MaxAge: summaryMaxAge,
AgeBuckets: summaryAgeBuckets,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{"namespace", "pool_name", "success"},
)
}()
allocatorSyncSingleAllocResultDurationSummary = func() *prometheus.SummaryVec {
return prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: metricNamespace,
Subsystem: "allocator",
Name: "sync_single_alloc_result_duration_ms",
MaxAge: summaryMaxAge,
AgeBuckets: summaryAgeBuckets,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{"namespace", "pool_name", "sandbox_name", "success"},
)
}()
)

func init() {
metrics.Registry.MustRegister(allocatorScheduleDurationSummary)
metrics.Registry.MustRegister(allocatorPersistAllocationStateDurationSummary)
metrics.Registry.MustRegister(allocatorSyncAllocResultDurationSummary)
metrics.Registry.MustRegister(allocatorSyncSingleAllocResultDurationSummary)
}
41 changes: 30 additions & 11 deletions kubernetes/internal/controller/pool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ type PoolReconciler struct {

func (r *PoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
start := time.Now()
defer func() {
log.Info("Reconcile finished", "latencyMs", time.Since(start).Milliseconds())
}()
// Fetch the Pool instance
pool := &sandboxv1alpha1.Pool{}
if err := r.Get(ctx, req.NamespacedName, pool); err != nil {
Expand Down Expand Up @@ -319,7 +323,10 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp
Pool: pool,
Pods: pods,
}
start := time.Now()
allocStatus, pendingSyncs, poolDirty, err := r.Allocator.Schedule(ctx, spec)
schLatency := time.Since(start).Milliseconds()
allocatorScheduleDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(err == nil)).Observe(float64(schLatency))
if err != nil {
return nil, err
}
Expand All @@ -329,8 +336,9 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp
idlePods = append(idlePods, pod.Name)
}
}
log.Info("Schedule result", "pool", pool.Name, "allocated", len(allocStatus.PodAllocation),
"idlePods", len(idlePods), "supplement", allocStatus.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty)
log.Info("Allocator schedule result", "pool", pool.Name, "sandboxSize", len(batchSandboxes), "podSize", len(pods),
"allocated", len(allocStatus.PodAllocation), "idlePods", len(idlePods), "supplement", allocStatus.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty,
"latencyMs", schLatency)

schedResult := &ScheduleResult{
PodAllocation: allocStatus.PodAllocation,
Expand All @@ -341,28 +349,37 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp

// Persist allocation to memory store
if poolDirty {
if err := r.Allocator.PersistPoolAllocation(ctx, pool, &AllocStatus{PodAllocation: allocStatus.PodAllocation}); err != nil {
log.Error(err, "Failed to persist pool allocation")
return nil, err
start = time.Now()
err := r.Allocator.PersistPoolAllocation(ctx, pool, &AllocStatus{PodAllocation: allocStatus.PodAllocation})
persistLatency := time.Since(start).Milliseconds()
allocatorPersistAllocationStateDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(err == nil)).Observe(float64(persistLatency))
if err != nil {
return nil, fmt.Errorf("Failed to persist pool allocation, err %w", err)
}
log.Info("Allocator persist allocation state", "pool", pool.Name, "allocation", len(allocStatus.PodAllocation), "latencyMs", persistLatency)
}

// Sync to each BatchSandbox concurrently
errCh := make(chan error, len(pendingSyncs))
var wg sync.WaitGroup
sem := make(chan struct{}, syncSandboxAllocConcurrency)

start = time.Now()
for _, syncInfo := range pendingSyncs {
wg.Add(1)
sem <- struct{}{}
go func(info SandboxSyncInfo) {
defer wg.Done()
defer func() { <-sem }()
if err := r.Allocator.SyncSandboxAllocation(ctx, info.Sandbox, info.Pods); err != nil {
start0 := time.Now()
err := r.Allocator.SyncSandboxAllocation(ctx, info.Sandbox, info.Pods)
latencyMs0 := time.Since(start0).Milliseconds()
allocatorSyncSingleAllocResultDurationSummary.WithLabelValues(info.Sandbox.Namespace, info.Sandbox.Spec.PoolRef, info.Sandbox.Name, strconv.FormatBool(err == nil)).Observe(float64(latencyMs0))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Drop sandbox_name from sync latency labels

This metric uses info.Sandbox.Name as a label value, which creates a distinct Summary time series for every sandbox ever seen. Since the controller never deletes label values, a cluster with normal sandbox churn will continuously grow in-memory metric state and scrape payload size over process lifetime, which can become a reliability issue in production. Keep this metric at bounded cardinality (e.g., namespace/pool/success only) and use logs for per-sandbox detail.

Useful? React with 👍 / 👎.

if err != nil {
log.Error(err, "Failed to sync sandbox allocation", "sandbox", info.SandboxName)
errCh <- fmt.Errorf("failed to sync sandbox %s: %w", info.SandboxName, err)
} else {
log.Info("Successfully sync Sandbox allocation", "sandbox", info.SandboxName, "pods", info.Pods)
log.Info("Successfully sync Sandbox allocation", "sandbox", info.SandboxName, "pods", info.Pods, "latencyMs", latencyMs0)
}
}(syncInfo)
}
Expand All @@ -373,11 +390,13 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp
for err := range errCh {
syncErrs = append(syncErrs, err)
}

if err := gerrors.Join(syncErrs...); err != nil {
return nil, err
syncAggErr := gerrors.Join(syncErrs...)
syncAllocLatency := time.Since(start).Milliseconds()
log.Info("Sync sandbox allocation", "total", len(pendingSyncs), "failed", len(syncErrs), "latencyMs", syncAllocLatency)
allocatorSyncAllocResultDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(syncAggErr == nil)).Observe(float64(syncAllocLatency))
if syncAggErr != nil {
return nil, syncAggErr
}

return schedResult, nil
}

Expand Down
28 changes: 27 additions & 1 deletion kubernetes/internal/utils/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package logging

import (
"os"
"time"

"github.com/go-logr/logr"
zap2 "go.uber.org/zap"
Expand All @@ -24,6 +25,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

// localTimeEncoder encodes time in local timezone with nanosecond precision
func localTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Local().Format("2006-01-02T15:04:05.000000000Z07:00"))
}

// Options contains configuration for the logger
type Options struct {
// Development configures the logger to use a development config
Expand Down Expand Up @@ -63,6 +69,22 @@ func DefaultOptions() Options {
// NewLoggerWithZapOptions creates a logger using controller-runtime's zap options
// and adds file output support
func NewLoggerWithZapOptions(opts Options) logr.Logger {
// Create encoder config with nanosecond timestamp
encoderConfig := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
TimeKey: "ts",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: localTimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}

// Add AddCaller option to include file and line number in logs
if opts.ZapOptions.ZapOpts == nil {
opts.ZapOptions.ZapOpts = []zap2.Option{}
Expand All @@ -71,7 +93,10 @@ func NewLoggerWithZapOptions(opts Options) logr.Logger {

// If file output is not enabled, use the default zap logger
if !opts.EnableFileOutput {
return zap.New(zap.UseFlagOptions(&opts.ZapOptions))
return zap.New(
zap.UseFlagOptions(&opts.ZapOptions),
zap.Encoder(zapcore.NewJSONEncoder(encoderConfig)),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep zap CLI encoder settings effective

Hard-coding a JSON encoder here overrides encoder-related options parsed from opts.BindFlags, so operators can no longer rely on documented flags like --zap-encoder (and related time-encoding choices) to control log format. This is a behavior regression from the previous implementation, where zap.UseFlagOptions determined the encoder.

Useful? React with 👍 / 👎.

)
}

// Create file writer with rotation
Expand All @@ -93,6 +118,7 @@ func NewLoggerWithZapOptions(opts Options) logr.Logger {
// Create logger with multi-writer
return zap.New(
zap.UseFlagOptions(&opts.ZapOptions),
zap.Encoder(zapcore.NewJSONEncoder(encoderConfig)),
zap.WriteTo(multiWriter),
)
}
Loading