Skip to main content

Observability

Aster provides production-grade observability out of the box: structured logging, OpenTelemetry metrics and tracing, and request correlation. All features are configurable via environment variables or TOML config.

Logging

Configuration

Environment variableTOML (aster.toml)ValuesDefault
ASTER_LOG_FORMAT[logging] formatjson, texttext
ASTER_LOG_LEVEL[logging] leveldebug, info, warning, errorinfo
ASTER_LOG_MASK[logging] masktrue, falsetrue

Text mode (development)

Human-readable, colored output for local development:

14:30:45.123 INFO  server — connection opened [svc=Hello method=sayHello req=abc12345]
14:30:45.128 DEBUG server — rpc completed [svc=Hello method=sayHello duration_ms=4.2]

JSON mode (production)

Structured JSON for log aggregation (ELK, Datadog, Splunk, CloudWatch):

{"ts":"2026-04-06T14:30:45.123Z","level":"info","logger":"aster.server","msg":"connection opened","service":"Hello","method":"sayHello","request_id":"abc12345"}
{"ts":"2026-04-06T14:30:45.128Z","level":"debug","logger":"aster.server","msg":"rpc completed","service":"Hello","method":"sayHello","request_id":"abc12345","duration_ms":4.2,"status_code":"OK"}

Standard log fields

Every log entry can include these fields (when available in context):

FieldTypeDescription
tsstringISO 8601 timestamp with milliseconds
levelstringdebug, info, warning, error
loggerstringModule path (e.g., aster.server)
msgstringHuman-readable message
servicestringRPC service name
methodstringRPC method name
request_idstringCorrelation ID for the call
peerstringRemote endpoint ID (masked)
duration_msfloatCall duration in milliseconds
status_codestringRPC status (OK, NOT_FOUND, etc.)
errorstringError message (on failure)
error_typestringException class name (on failure)

Sensitive field masking

When ASTER_LOG_MASK=true (default), sensitive values are automatically masked:

  • Secrets (secret_key, private_key, signing_key, signature, credential_json) — replaced with ***
  • Identifiers (root_pubkey, endpoint_id, node_id, contract_id, nonce) — truncated to abc1234...5678

Disable masking for debugging with ASTER_LOG_MASK=false.

Request correlation

Every RPC call automatically sets a correlation context (via Python contextvars). All log messages within the call's async scope include service, method, request_id, and peer — even from deeply nested code.

Metrics

OpenTelemetry integration

Aster provides RED metrics (Rate, Errors, Duration) via the MetricsInterceptor:

from aster import AsterServer
from aster.interceptors import MetricsInterceptor

server = AsterServer(
services=[MyService()],
interceptors=[MetricsInterceptor()],
)

Metric names

MetricTypeLabelsDescription
aster.rpc.startedCounterservice, method, patternTotal RPC calls started
aster.rpc.completedCounterservice, method, statusTotal RPC calls completed
aster.rpc.durationHistogram (seconds)service, methodCall latency distribution

The status label uses the RPC status code name: OK, NOT_FOUND, PERMISSION_DENIED, INTERNAL, etc.

In-memory fallback

When OpenTelemetry is not installed, MetricsInterceptor still collects in-memory counters accessible via snapshot():

metrics = MetricsInterceptor()
# ... after some calls ...
print(metrics.snapshot())
# {"started": 100, "succeeded": 95, "failed": 5, "in_flight": 0}

Tracing

Distributed tracing with OpenTelemetry

When the OpenTelemetry SDK is installed and configured, MetricsInterceptor creates a span for each RPC call:

  • Span name: {service}/{method} (e.g., HelloService/sayHello)
  • Span kind: SERVER
  • Attributes:
AttributeExample
rpc.systemaster
rpc.serviceHelloService
rpc.methodsayHello
rpc.aster.patternunary
rpc.aster.idempotentfalse
rpc.aster.error_codeNOT_FOUND (on error)

Span conventions

Aster follows the OpenTelemetry RPC semantic conventions. The rpc.system attribute is always aster, distinguishing Aster spans from gRPC or other RPC systems in the same trace.

Kubernetes deployment

# deployment.yaml
env:
- name: ASTER_LOG_FORMAT
value: "json"
- name: ASTER_LOG_LEVEL
value: "info"
- name: ASTER_LOG_MASK
value: "true"

Log aggregation

JSON logs are directly parseable by:

  • Fluentd / Fluent Bit — no regex needed, direct JSON parsing
  • Datadog Agent — auto-detects JSON format
  • CloudWatch Logs Insights — query by service, method, duration_ms
  • Elasticsearch / Kibana — index on structured fields

Example queries

Slow RPCs (Kibana/Elasticsearch):

level:info AND duration_ms:>1000

Error rate by service (CloudWatch Insights):

filter level = "error"
| stats count(*) as errors by service

Health and readiness endpoints

Aster includes a lightweight HTTP health server for Kubernetes probes and load balancer health checks. It runs on a separate port from the QUIC RPC endpoint.

Setup

The health server is disabled by default (port 0). Enable it explicitly:

from aster import AsterServer
from aster.health import HealthServer

async with AsterServer(services=[...]) as srv:
health = HealthServer(srv, port=8080)
await health.start()
await srv.serve()

Or via environment variable (no code changes needed):

ASTER_HEALTH_PORT=8080 python my_service.py
Environment variableDefaultDescription
ASTER_HEALTH_PORT0 (disabled)Port for health HTTP server. Set to enable.
ASTER_HEALTH_HOST127.0.0.1Bind address. Set 0.0.0.0 for k8s pod probes.
Bind address security

The default bind address is 127.0.0.1 (localhost only). Only set 0.0.0.0 when running in a container with network isolation (e.g., Kubernetes pod). The health endpoint exposes operational metrics that should not be publicly accessible.

Endpoints

EndpointSuccessFailureDescription
GET /healthz200503Liveness probe — server is running
GET /readyz200503Readiness probe — contracts published, accepting traffic
GET /metrics200Full metrics snapshot (JSON)
GET /metrics/prometheus200Metrics in Prometheus text exposition format

Response examples

/healthz:

{"status": "ok", "uptime_s": 1234.5}

/readyz:

{"status": "ready", "services": 3, "registry": true}

/metrics:

{
"health": {"status": "ok", "uptime_s": 1234.5},
"ready": {"status": "ready", "services": 3, "registry": true},
"connections": {
"active_connections": 5,
"total_connections": 142,
"active_streams": 3,
"total_streams": 8901
},
"admission": {
"consumer_admitted": 42,
"consumer_denied": 3,
"consumer_errors": 0,
"producer_admitted": 2,
"producer_denied": 0,
"producer_errors": 0,
"last_admission_ms": 12.3
},
"rpc": {
"started": 8901,
"succeeded": 8850,
"failed": 51,
"in_flight": 0
}
}

Kubernetes probes

# deployment.yaml
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10

readinessProbe:
httpGet:
path: /readyz
port: 8080
initialDelaySeconds: 10
periodSeconds: 5

Exec probes (no HTTP server)

If you prefer not to run an HTTP server, use exec probes:

# health_check.py
from aster.health import check_health, check_ready
import sys
sys.exit(0 if check_health(server) else 1)

Connection and admission metrics

Connection metrics

MetricTypeDescription
active_connectionsGaugeCurrently open peer connections
total_connectionsCounterTotal connections since startup
active_streamsGaugeCurrently active RPC streams
total_streamsCounterTotal streams since startup

Admission metrics

MetricTypeDescription
consumer_admittedCounterConsumers successfully admitted
consumer_deniedCounterConsumer admission denied
consumer_errorsCounterConsumer admission errors
producer_admittedCounterProducers successfully admitted
producer_deniedCounterProducer admission denied
last_admission_msGaugeDuration of last admission handshake

Grafana dashboard

A pre-built Grafana dashboard template is included at ops/grafana-dashboard.json. Import it into Grafana for instant visibility:

Panels included:

  • Request rate (by service and method)
  • Error rate and error percentage gauge
  • Latency percentiles (p50, p95, p99)
  • In-flight requests
  • Active connections
  • Admission decisions over time
  • Request rate by service (bar chart)

To import:

  1. In Grafana, go to Dashboards > Import
  2. Upload ops/grafana-dashboard.json
  3. Select your Prometheus data source
  4. The dashboard uses OTel metric names (aster_rpc_started_total, etc.)

Rate limiting

The RateLimitInterceptor enforces token-bucket rate limits at multiple granularities. Add it to your server's interceptor chain:

from aster.interceptors import RateLimitInterceptor

rate_limiter = RateLimitInterceptor(
global_rps=1000, # whole-server cap
per_service_rps=500, # per service name
per_method_rps=100, # per individual method
per_peer_rps=50, # per connected peer
)

server = AsterServer(
services=[MyService()],
interceptors=[rate_limiter, MetricsInterceptor()],
)

Each bucket refills independently at its configured rate. When a request exceeds any applicable limit, the interceptor returns RESOURCE_EXHAUSTED with a Retry-After hint in the error metadata.

Graceful shutdown

AsterServer supports graceful shutdown so in-flight RPCs can complete before the process exits.

async with AsterServer(services=[MyService()]) as srv:
# Install handlers for SIGTERM and SIGINT
srv.install_signal_handlers()
await srv.serve()

When a signal arrives, srv.drain() is called automatically:

  1. The server stops accepting new connections and new RPC calls.
  2. In-flight RPCs are given a grace period (default 30 seconds) to complete.
  3. After the grace period, remaining calls are cancelled and the server shuts down.

You can also trigger a drain programmatically:

await srv.drain(timeout_s=15)

TOML configuration example

# aster.toml

[logging]
format = "json"
level = "info"
mask = true